目标

我试图在处理程序之间创建某种程度上是循环的依赖关系,但我还不太清楚如何正确处理它。我要实现的是producer -> [handlers 1-3] -> handler 4的变体。

因此,disruptor.handleEventsWith(h1, h2, h3).then(h4);。但是我还有其他要求


尽管处理程序1-3并行处理消息,但它们都没有一个开始处理下一条消息,直到它们全部完成了上一条消息为止。
在第一个消息之后,处理程序1-3等待处理程序4完成最新的消息,然后再处理下一条消息。


使用单个事件处理程序的等效执行逻辑可以是:

disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
  Arrays.asList(h1, h2, h3).parallelStream()
        .forEach(h -> h.onEvent(event, sequence, endOfBatch));
  h4.onEvent(event, sequence, endOfBatch);
});


语境

设计上下文是处理程序1-3每个都根据消息更新其自己的状态,并且在三个消息中的每一个处理完消息后,它们都处于一致状态。然后,处理程序4根据处理程序1-3更新的状态运行一些逻辑。因此,处理程序4应该只看到1-3维护的数据结构的一致状态,这意味着处理程序1-3在处理程序4完成之前不应该处理下一条消息。

(尽管目标绝对是使用Disruptor来管理并发,而不是java.util.Stream。)

不确定是否重要,但也可能是处理程序4的逻辑可以分为两部分,一个部分要求不更新任何处理程序1-3,而第二个要求处理程序4的第一部分已完成。因此,处理程序1-3可以在处理程序4的第二部分仍在执行时处理消息。

有没有办法做到这一点?还是我的设计有缺陷?我觉得应该有一种方法可以通过SequenceBarrier来做到这一点,但我不太了解如何实现此自定义障碍。对于处理程序1-3,我想我想用逻辑handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence()设置一个障碍,但是我不确定在哪里放置该逻辑。

谢谢!

最佳答案

我会考虑使处理程序是无状态的,并使用它们处理的消息来包含系统的状态。这样,您根本就不需要同步处理程序。

10-06 09:17