我的项目中有一个环形缓冲区,许多发布者都将在其中发布事件(例如500个发布者),并且我有3个EventProcessor应该顺序处理事件。所有事件都应通过这种方式:
{很多发行商}-> {UpStreamProcessor}-> {DownStreamProcessor}-> {logProcessor}
问题是我在发布和UpStreamProcessor的开始之间以及UpStreamProcessor的结束到DownStreamProcessor的开始之间传递事件上浪费了很多时间。
例如,当我有500个发布者时,在UpStreamProcessor和DownStreamProcessor中进行处理的平均时间为1毫秒,而在UpStreamProcessor完成时间与DownStreamProcessor开始时间之间的持续时间为400ms。
这是用于构造环形缓冲区和处理器的代码:
SequenceBarrier sequenceBarrier;
receiveBuffer = new RingBuffer<>(
MessageContext.FACTORY,
new MultiThreadedLowContentionClaimStrategy(inputBufferSize),
new YieldingWaitStrategy()
);
upStreamAgentProcessor = new BatchEventProcessor<>(
receiveBuffer,
receiveBuffer.newBarrier(),
new UpStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
upStreamAgentProcessor.getSequence()
);
downStreamAgentProcessor = new BatchEventProcessor<MessageContext>(
receiveBuffer,
sequenceBarrier,
new DownStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
downStreamAgentProcessor.getSequence()
);
logMapAgentProcessor = new BatchEventProcessor<MessageContext>(
receiveBuffer,
sequenceBarrier,
LogMap.getInstance()
);
receiveBuffer.setGatingSequences(logMapAgentProcessor.getSequence());
operationalExecutor.submit(upStreamAgentProcessor);
operationalExecutor.submit(downStreamAgentProcessor);
operationalExecutor.submit(logMapAgentProcessor);
最佳答案
Disruptor设计用于处理耗时0.0001 ms的消息,如果1 ms甚至0.1 ms的延迟不困扰您,我将使用普通的ExecutorService。如果看到延迟或超过0.001毫秒,则不太可能成为干扰者,并且正在执行的任务太长。
这是有关协调遗漏的很好的介绍。 http://www.infoq.com/presentations/latency-pitfalls坏消息是,如果您有一个瓶颈,如您所看到的那样减慢了生产者的速度,则延迟可能比您要衡量的要差得多。