我正在尝试编写一个调用HTTP REST API的actor。其余的API需要一个查询参数,该参数将从调用Actor传递。 official documentation的示例使用preStart方法通过管道将消息传递到自身来实现上述目的:
import akka.actor.{ Actor, ActorLogging }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString
class Myself extends Actor
with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
override def preStart() = {
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.pipeTo(self)
}
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
log.info("Got response, body: " + body.utf8String)
}
case resp @ HttpResponse(code, _, _, _) =>
log.info("Request failed, response code: " + code)
resp.discardEntityBytes()
}
}
上面的方法有效,但是URL是硬编码的。我想要实现的是一个REST客户端参与者,我可以将参数作为消息发送给该对象并获取调用结果。我修改了上面的代码以将参数作为消息接收(伪代码):
def receive = {
case param: RESTAPIParameter => {
http.singleRequest(HttpRequest(URI("http://my-rest-url").withQuery("name", "value"))
.pipeTo(self)
}
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
log.info("Got response, body: " + body.utf8String)
sender! body.utf8String //Will not work
}
case resp @ HttpResponse(code, _, _, _) =>
log.info("Request failed, response code: " + code)
resp.discardEntityBytes()
}
上面的方法应该可以工作,但是不能真正用于将响应发送回客户端,因为当REST调用的结果通过管道返回到
sender
时,self
参考丢失了。我想我可以尝试将发件人本地存储在一个变量中,然后使用它将响应传递回去,但是我认为这不是一个好主意。
那么,处理这种情况的正确方法是什么?
编辑:@ PH88下面建议的解决方案有效,但我想在外循环中保持
HttpResponse
上的模式匹配。编辑2 :之所以要将响应通过管道传递回
self
是因为我想实现一种状态机。状态根据参与者收到的消息类型而改变。举个例子:becomes
awaitingResult。数据通过管道传输到自身以进行进一步处理。 becomes
dataRecevied
。数据再次通过管道传输到self
进行更多处理。 希望可以澄清意图。任何其他建议/设计,以实现简洁/简单的设计,欢迎:-)
最佳答案
您可以将HttpResponse
与您自己的case类包装在一起,并将发送方与其 bundle 在一起:
case class ServerResponse(requester: ActorRef, resp: HttpResponse)
然后:
def receive = {
case param: RESTAPIParameter => {
val requester = sender
http.singleRequest(HttpRequest(URI("http://my-rest-url").withQuery("name", "value"))
.map(httpResp =>
// This will execute in some other thread, thus
// it's important to NOT use sender directly
ServerResponse(requester, httpResp)
)
.pipeTo(self)
}
case ServerResponse(requester, HttpResponse(...)) =>
val result = ...
requester ! result
...
}
关于scala - 如何在Actor中包装REST API客户端,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43739767/