我的项目中有一个环形缓冲区,许多发布者都将在其中发布事件(例如500个发布者),并且我有3个EventProcessor应该顺序处理事件。所有事件都应通过这种方式:

{很多发行商}-> {UpStreamProcessor}-> {DownStreamProcessor}-> {logProcessor}

问题是我在发布和UpStreamProcessor的开始之间以及UpStreamProcessor的结束到DownStreamProcessor的开始之间传递事件上浪费了很多时间。

例如,当我有500个发布者时,在UpStreamProcessor和DownStreamProcessor中进行处理的平均时间为1毫秒,而在UpStreamProcessor完成时间与DownStreamProcessor开始时间之间的持续时间为400ms。

这是用于构造环形缓冲区和处理器的代码:

SequenceBarrier sequenceBarrier;

receiveBuffer = new RingBuffer<>(
    MessageContext.FACTORY,
    new MultiThreadedLowContentionClaimStrategy(inputBufferSize),
    new YieldingWaitStrategy()
);

upStreamAgentProcessor = new BatchEventProcessor<>(
    receiveBuffer,
    receiveBuffer.newBarrier(),
    new UpStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
    upStreamAgentProcessor.getSequence()
);

downStreamAgentProcessor = new BatchEventProcessor<MessageContext>(
    receiveBuffer,
    sequenceBarrier,
    new DownStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
    downStreamAgentProcessor.getSequence()
);

logMapAgentProcessor = new BatchEventProcessor<MessageContext>(
    receiveBuffer,
    sequenceBarrier,
    LogMap.getInstance()
);


receiveBuffer.setGatingSequences(logMapAgentProcessor.getSequence());

operationalExecutor.submit(upStreamAgentProcessor);
operationalExecutor.submit(downStreamAgentProcessor);
operationalExecutor.submit(logMapAgentProcessor);

最佳答案

Disruptor设计用于处理耗时0.0001 ms的消息,如果1 ms甚至0.1 ms的延迟不困扰您,我将使用普通的ExecutorService。如果看到延迟或超过0.001毫秒,则不太可能成为干扰者,并且正在执行的任务太长。

这是有关协调遗漏的很好的介绍。 http://www.infoq.com/presentations/latency-pitfalls坏消息是,如果您有一个瓶颈,如您所看到的那样减慢了生产者的速度,则延迟可能比您要衡量的要差得多。

09-04 02:51