我正在运行一个简单的测试,将消息从4个线程发布到TopicProcessor,然后在订阅服务器中将消息添加到集合中。代码如下:
@Test
public void testProcessingMessages() throws Exception {
int numberOfMessages = 1000;
TopicProcessor<Integer> processor = TopicProcessor.create();
ExecutorService executorService = Executors.newFixedThreadPool(4);
Queue<Integer> messages = new ConcurrentLinkedQueue<>();
processor.subscribe(messages::add);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < numberOfMessages; i++) {
executorService.submit(() -> {
processor.onNext(counter.incrementAndGet());
});
}
Thread.sleep(10000);
assertEquals(numberOfMessages, messages.size());
}
但是最后的断言通常以大约980-990条实际消息而不是预期的1000条失败。
我想念什么吗?
最佳答案
问题在于TopicProcessor.create
创建了一个处理器,该处理器希望从单个线程进行发布。从多个线程进行生产时,应使用TopicProcessor.share
。