问题描述
我有一个用例,其中我有一个演员层次结构
i have a use case in which i have a actor hierarchy
parent -> childABC -> workerchild
现在工作子工作并将其结果发送到其父级(childABC,它是父级的子级),并且子actor(childABC)将结果发送回父actor我正在使用 pipeTo
和在这里收到死信是我的代码
Now the worker child works and send its result to its parent(childABC which is a child of parent) and that child actor(childABC) send the result back to parent actor I am using pipeTo
and getting dead letters here is my code
parent
演员:
final case object GetFinalValue
class MyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetFinalValue=>
ask(myManageActor,GetValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
childABC
(根据我上面给出的例子)
childABC
(acc to example I gave above)
final case object GetValue
class ManagerMyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetValue=>
ask(myTokenActor,CalculateValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
child
演员:
final case object CalculateValue
class TokenMyActor2 extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
override def receive: Receive = {
case CalculateValue=>
val future = Future{ "get the string"
}
val bac = future.map{result =>
sender ! result
}//.pipeTo(sender())
case message =>
log.warn("Actor MyActor: Unhandled message received : {}", message)
unhandled(message)
}
}
def main(args: Array[String]): Unit = {
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
val myActor = system.actorOf(Props[MyActor],"myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.map {str =>
log.info ("string is {}",str)
}
这里是日志:
[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
请指导我哪里弄错了,或者pipeTo
不应该这样使用?如果是这样,我该怎么做才能让它发挥作用
Please guide me where am I mistaken, or pipeTo
should not be used like this? if so what should i do to make it work
推荐答案
不确定是否有意,但 ask(myManageActor,GetValue).pipeTo(sender())
可以实现为 .
Not sure if it's intended or not but ask(myManageActor,GetValue).pipeTo(sender())
can be implemented as forward
.
class MyActor extends Actor {
lazy val myManageActor: ActorRef = ???
override def receive: Receive = {
case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
forward
与 tell
相同,但它保留消息的原始发件人.
forward
is the same as tell
but it preserves the original sender of the messages.
这可以应用于 MyActor
和 ManagerMyActor
.
This can be applied to MyActor
and ManagerMyActor
.
在TokenMyActor2
的情况下,你不应该使用
In the case of TokenMyActor2
, you should not use
future.map{ result =>
sender ! result
}
因为它破坏了 akka 上下文封装,如 文档
as it it breaks akka context encapsulation, as specified in docs
当使用未来的回调时,比如 onComplete,或者 map 之类的thenRun 或 thenApply 需要小心避免的内部actor关闭包含actor的引用,即不调用方法或从内部访问封闭参与者上的可变状态打回来.这会破坏 actor 封装并可能引入同步错误和竞争条件,因为回调将同时调度给封闭的演员.不幸的是有目前还没有一种在编译时检测这些非法访问的方法.看还有:Actors 和共享可变状态
您应该使用 Future(???).pipeTo(sender())
,它可以安全地与 sender()
一起使用.
You should instead rely on Future(???).pipeTo(sender())
, which is safe to use with sender()
.
应用这些更改后,代码确实按预期工作
After applying these changes, the code does work as expected
case object GetFinalValue
case object GetValue
case object CalculateValue
class MyActor extends Actor {
private val myManageActor: ActorRef =
context.actorOf(Props[ManagerMyActor], "myManageActor")
override def receive: Receive = { case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
class ManagerMyActor extends Actor {
private val myTokenActor =
context.actorOf(Props[TokenMyActor2], "toknMyActor2")
override def receive: Receive = { case GetValue =>
myTokenActor.forward(CalculateValue)
}
}
class TokenMyActor2 extends Actor {
import context.dispatcher
override def receive: Receive = { case CalculateValue =>
val future = Future { "get the string" }
future.pipeTo(sender())
}
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
println(s"got $str")
}
产生得到字符串
.
最后一点,我建议不要在 actor 中使用 ask
模式.ask
的基本功能可以通过 tell
和 forward
轻松实现.此外,代码更短,并且不会因 隐式 val 超时
As a final note, I'd advise not to use ask
pattern within actors. The basic functionality of ask
can be easily achieved with just tell
and forward
. Also the code is shorter and not overloaded with constant need of implicit val timeout
这篇关于在两个演员之间使用 pipeTo 时收到 akka 死信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!