假设我有3个发布者和1个处理器。发布者以{key: <integer>, value: <object>, publisher_id: <string>}形式发出项目。

发布者进行IO操作,因此:


一方面,我希望发布者在给定的时刻(大约)处理N项目。
另一方面,我希望消费者将项目合并到一个记录中(即{key: <integer>, values: <list>}


实际上,我已经实现了具有内部存储器(FluxProcessor)来保存所有项目的ConcurrentHashMap。每当未达到容量时,它会手动request()新项目。

我想知道是否有内置功能可以通过RxJava(2)/ Spring Reactor API做到这一点?

最佳答案

在RxJava 2中使用merge,rebatchRequests和toMultimap:

Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...

Flowable.merge(
    source1.rebatchRequests(N),
    source2.rebatchRequests(N),
    source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));

10-08 18:36