假设我有3个发布者和1个处理器。发布者以{key: <integer>, value: <object>, publisher_id: <string>}
形式发出项目。
发布者进行IO操作,因此:
一方面,我希望发布者在给定的时刻(大约)处理N
项目。
另一方面,我希望消费者将项目合并到一个记录中(即{key: <integer>, values: <list>}
)
实际上,我已经实现了具有内部存储器(FluxProcessor
)来保存所有项目的ConcurrentHashMap
。每当未达到容量时,它会手动request()
新项目。
我想知道是否有内置功能可以通过RxJava(2)/ Spring Reactor API做到这一点?
最佳答案
在RxJava 2中使用merge,rebatchRequests和toMultimap:
Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...
Flowable.merge(
source1.rebatchRequests(N),
source2.rebatchRequests(N),
source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));