本文介绍了无法使用自定义用户监护人在 ActorSystem 上从外部创建顶级演员 [clusterSingletonManager]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一项使用 Akka Persistence 进行事件溯源的服务.到目前为止,我们已经成功地将事件存储在 Cassandra 日志中.现在我们想通过利用 Akka Persistence Query 来实现 CQRS.作为第一种方法,我们尝试遵循集群单例模式,让参与者按标签流式传输存储的事件.现在我们将这个相当简单的 actor 包装成一个单例:

I'm working on a service which uses Akka Persistence for event sourcing. So far we've been successfully storing the events in a Cassandra journal. Now we want to implement CQRS by leveraging Akka Persistence Query. As a first approach we are trying to follow the cluster singleton pattern to have an actor streaming the stored events by tag. For now we have this rather simple actor to be wrapped as a singleton:

public class EventProcessor extends AbstractLoggingActor {
  private static final Logger LOG = LoggerFactory.getLogger(EventProcessor.class);

  private final CassandraReadJournal journal;

  public EventProcessor(ActorSystem system) {
    journal =
        PersistenceQuery.get(system)
            .getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());

    journal
        .eventsByTag(OnBoardingBehavior.ENTITY_TYPE_KEY.name(), Offset.noOffset())
        .map(EventEnvelope::persistenceId)
        .to(Sink.foreach(this::logMessage))
        .run(system);
  }

  private void logMessage(String id) {
    LOG.info(String.format("########## Received persistenceId %s", id));
  }

  @Override
  public Receive createReceive() {
    return null;
  }
}

这就是我们将演员包裹在监护人中的方式:

And this is how we wrap the actor inside the guardian:

    akka.actor.ActorSystem classicSystem = context.getSystem().classicSystem();

    ClusterSingletonManagerSettings settings =
        ClusterSingletonManagerSettings.create(classicSystem);

    Props clusterSingletonManagerProps =
        ClusterSingletonManager.props(
            Props.create(EventProcessor.class, classicSystem),
            PoisonPill.getInstance(),
            settings);

    classicSystem.actorOf(clusterSingletonManagerProps, "clusterSingletonManager");

当我们运行服务时,我们得到以下异常(在 actorOf 行上):

When we run the service, we get the following exception (on the actorOf line):

java.lang.UnsupportedOperationException: cannot create top-level actor [clusterSingletonManager] from the outside on ActorSystem with custom user guardian
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:900)
at r.service.onboarding.actor.Guardian.initializeEventProcessor(Guardian.java:95)
at r.service.onboarding.actor.Guardian.<init>(Guardian.java:56)
at r.service.onboarding.actor.Guardian.lambda$create$745d95f3$1(Guardian.java:66)
at akka.actor.typed.javadsl.Behaviors$.$anonfun$setup$1(Behaviors.scala:47)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at a.a.t.i.InterceptorImpl$$anon$1.start(InterceptorImpl.scala:48)
at akka.actor.typed.BehaviorInterceptor.aroundStart(BehaviorInterceptor.scala:55)
at a.a.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:71)
at a.a.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:28)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:275)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at a.a.t.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
at a.a.t.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
... 5 frames excluded

我想提一下,我们可以从同一个监护人中生成其他演员,例如:

I'd like to mention that we are able to spawn other actor from within the same guardian like:

ActorRef<Command> actorRef =
        context.spawn(OnBoardingBehavior.create(uuid), "OnBoardingBehavior-" + uuid);

我对 Akka 完全陌生,因此非常感谢您的帮助!

I'm completely new to Akka so any help would be much appreciated!

推荐答案

终于想通了.显然,在有类型的系统中运行时,我不能将旧的无类型方法与 ClusterSingletonManager.props 一起使用.我在这里找到了在类型系统上的正确方法:https://doc.akka.io/docs/akka/current/typed/cluster-singleton.html

Finally figured it out. Apparently when running in a typed system, I cannot use the old untyped approach with ClusterSingletonManager.props. I found here the proper way on a typed system:https://doc.akka.io/docs/akka/current/typed/cluster-singleton.html

所以我的演员现在是一个抽象行为:

So my actor is now an AbstractBehavior:

public class EventProcessor extends AbstractBehavior<Void>

这就是我将它包装为单例的方式:

And this is the way I wrap it as a singleton:

    ClusterSingleton singleton = ClusterSingleton.get(context.getSystem());

    singleton.init(
        SingletonActor.of(EventProcessor.create(identityRequestAdapter), "eventProcessor"));

这篇关于无法使用自定义用户监护人在 ActorSystem 上从外部创建顶级演员 [clusterSingletonManager]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 22:38