我正在运行一个简单的测试,将消息从4个线程发布到TopicProcessor,然后在订阅服务器中将消息添加到集合中。代码如下:

@Test
public void testProcessingMessages() throws Exception {
    int numberOfMessages = 1000;

    TopicProcessor<Integer> processor = TopicProcessor.create();

    ExecutorService executorService = Executors.newFixedThreadPool(4);

    Queue<Integer> messages = new ConcurrentLinkedQueue<>();

    processor.subscribe(messages::add);

    AtomicInteger counter = new AtomicInteger(0);
    for (int i = 0; i < numberOfMessages; i++) {
        executorService.submit(() -> {
            processor.onNext(counter.incrementAndGet());
        });
    }

    Thread.sleep(10000);

    assertEquals(numberOfMessages, messages.size());
}


但是最后的断言通常以大约980-990条实际消息而不是预期的1000条失败。
我想念什么吗?

最佳答案

问题在于TopicProcessor.create创建了一个处理器,该处理器希望从单个线程进行发布。从多个线程进行生产时,应使用TopicProcessor.share

10-06 14:46