我已经注册了回拨服务,现在我希望将其公开为Flowable,并具有某些要求/限制:


不应阻止线程接收回调(应将工作移交给观察者指定的其他线程/调度程序)
不会因为消费者的流失而引发任何异常
多个消费者可以彼此独立订阅
消费者可以选择缓冲所有项目,以便不会丢失任何内容,但是不应将其缓冲在“生产者”类中


以下是我目前所拥有的

class MyBroadcaster {
    private PublishProcessor<Packet> packets = PublishProcessor.create();
    private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();

    public MyBroadcaster() {
       //this is actually different to my exact use but same conceptually
       registerCallback(packets::onNext);
    }

    public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
        return backpressuredPackets.observeOn(scheduler);
    }
}


我不确定这是否真的符合我的要求。关于onBackpressureLatestobserveOn javadoc上有一条我不理解的注释:


  请注意,由于背压请求如何通过subscribeOn / observeOn进行传播的性质,从下游请求多个请求并不能保证onNext事件的连续传递


我还有其他问题:


onBackpressureLatest调用是否使它不再多播?
如何测试我的要求?
奖励:如果我有多个这样的发布者(在同一个班级或其他地方),则使相同模式可重用的最佳方法是什么。使用委派/额外方法创建自己的Flowable吗?

最佳答案

我不确定这是否真的符合我的要求。


它不是。分别应用onBackpressureLatestonBackpressureBuffer,然后分别在observeOnobserveSomePacketsOn中应用observeAllPacketsOn


  是否通过onBackpressureLatest调用使其不再多播?


多播由PublishProcessor完成,并且不同的订户将独立地建立到该通道的通道,其中onBackpressureXXXobserveOn运算符将在单个订户的基础上生效。


  如何测试我的要求?


通过具有FlowableTestSubscriber)的有损或无损Flowable.test()进行订阅,将一组已知的数据包馈入packets,并查看它们是否全部通过TestSubscriber.assertValueCount()TestSubscriber.values()到达。有损之一应为1 .. N,无损之一应在宽限期后具有N值。


  奖励:如果我有多个这样的发布者(在同一个班级或其他地方),则使相同模式可重用的最佳方法是什么。使用委派/额外方法创建自己的Flowable吗?


您可以将observeAllPacketsOn转换为FlowableTransformer,而不是在MyBroadcaster上调用方法,而是使用compose,例如:

class MyTransformers {
    public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
        return f -> f.onBackpressureLatest().observeOn(s);
    }
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);

关于java - 使用RxJava2正确处理背压和并发,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50477057/

10-10 17:15