我有一个定期发出项目的生产者和一个有时很慢的消费者。消费者只能使用最近的物品,这一点很重要。我认为onBackpressureLatest()是解决此问题的完美解决方案。因此,我编写了以下测试代码:

PublishProcessor<Integer> source = PublishProcessor.create();
source
        .onBackpressureLatest()
        .observeOn(Schedulers.from(Executors.newCachedThreadPool()))
        .subscribe(i -> {
            System.out.println("Consume: " + i);
            Thread.sleep(100);
        });

for (int i = 0; i < 10; i++) {
    System.out.println("Produce: " + i);
    source.onNext(i);
}

我希望它记录如下内容:
Produce: 0
...
Produce: 9
Consume: 0
Consume: 9

相反,我得到
Produce: 0
...
Produce: 9
Consume: 0
Consume: 1
...
Consume: 9

onBackpressureLatest()和onBackpressureDrop()都不起作用。仅onBackpressureBuffer(i)会导致异常。

我使用rxjava 2.1.9。任何想法是什么问题/我的误解是什么?

最佳答案

observeOn具有内部缓冲区(默认为128个元素),该缓冲区将立即轻松拾取所有源项目,因此onBackpressureLatest始终被完全消耗。

编辑:

您可以创建的最小缓冲区是1,它应该提供所需的模式:

source.onBackpressureLatest()
      .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
      .subscribe(v -> { /* ... */ });

(早期的delay + rebatchRequest组合实际上与此等效)。

10-08 06:59