我已经注册了回拨服务,现在我希望将其公开为Flowable
,并具有某些要求/限制:
不应阻止线程接收回调(应将工作移交给观察者指定的其他线程/调度程序)
不会因为消费者的流失而引发任何异常
多个消费者可以彼此独立订阅
消费者可以选择缓冲所有项目,以便不会丢失任何内容,但是不应将其缓冲在“生产者”类中
以下是我目前所拥有的
class MyBroadcaster {
private PublishProcessor<Packet> packets = PublishProcessor.create();
private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();
public MyBroadcaster() {
//this is actually different to my exact use but same conceptually
registerCallback(packets::onNext);
}
public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
return backpressuredPackets.observeOn(scheduler);
}
}
我不确定这是否真的符合我的要求。关于
onBackpressureLatest
的observeOn
javadoc上有一条我不理解的注释:请注意,由于背压请求如何通过subscribeOn / observeOn进行传播的性质,从下游请求多个请求并不能保证onNext事件的连续传递
我还有其他问题:
onBackpressureLatest
调用是否使它不再多播?如何测试我的要求?
奖励:如果我有多个这样的发布者(在同一个班级或其他地方),则使相同模式可重用的最佳方法是什么。使用委派/额外方法创建自己的Flowable吗?
最佳答案
我不确定这是否真的符合我的要求。
它不是。分别应用onBackpressureLatest
或onBackpressureBuffer
,然后分别在observeOn
和observeSomePacketsOn
中应用observeAllPacketsOn
。
是否通过onBackpressureLatest调用使其不再多播?
多播由PublishProcessor
完成,并且不同的订户将独立地建立到该通道的通道,其中onBackpressureXXX
和observeOn
运算符将在单个订户的基础上生效。
如何测试我的要求?
通过具有Flowable
(TestSubscriber
)的有损或无损Flowable.test()
进行订阅,将一组已知的数据包馈入packets
,并查看它们是否全部通过TestSubscriber.assertValueCount()
或TestSubscriber.values()
到达。有损之一应为1 .. N,无损之一应在宽限期后具有N值。
奖励:如果我有多个这样的发布者(在同一个班级或其他地方),则使相同模式可重用的最佳方法是什么。使用委派/额外方法创建自己的Flowable吗?
您可以将observeAllPacketsOn
转换为FlowableTransformer
,而不是在MyBroadcaster上调用方法,而是使用compose
,例如:
class MyTransformers {
public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
return f -> f.onBackpressureLatest().observeOn(s);
}
}
new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);
关于java - 使用RxJava2正确处理背压和并发,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50477057/