我正试图使背压流动。
我的想法是,直到当前项目之一完成其处理,才会发射可流动的新项目。我正在使用ResourceSubscriber和subscriptionWith()方法来实现这一点。
flowable的每个元素都在单独的线程池上进行异步处理。 (我通过使用flatMap / subscribeOn实现)
我希望第二个元素会在调用该订户的onNext方法之后发出。但是,当我尝试运行此代码时,Flowable不可控制地发出元素。背压不起作用。
有代码可以重现此问题:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class RxTest2 {

    private static final Logger log = LoggerFactory.getLogger(RxTest.class);

    static AtomicInteger integer = new AtomicInteger();

    public static void main(String[] args) {
        Flowable.generate(emitter -> {
            final int i1 = integer.incrementAndGet();
            if (i1 >= 20) {
                Thread.sleep(10000);
                System.exit(0);
            }
            emitter.onNext(i1);
        })
                .doOnNext(i -> log.info("Published: " + i))
                .flatMap(i -> Flowable.defer(() -> {
                    log.info("Starting consuming {}", i);
                    Thread.sleep(100);
                    log.info("Finished consuming {}", i);
                    return Flowable.just(i);
                }).subscribeOn(Schedulers.computation()))
                .doOnNext(i -> log.info("Consuming finished, result: " + i))
                .subscribeWith(new BackpressureSubscriber(2));
    }
}

class BackpressureSubscriber extends ResourceSubscriber<Object> {

    private static final Logger log = LoggerFactory.getLogger(BackpressureSubscriber.class);

    private final long initialRequest;

    public BackpressureSubscriber(final long initialRequest) {
        this.initialRequest = initialRequest;
    }

    @Override
    protected void onStart() {
        super.onStart();
        log.info("Starting execution with {} initial requests", initialRequest);
        request(initialRequest);
    }

    @Override
    public void onNext(final Object message) {
        log.info("On next for {}", message);
        request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        log.error("Unhandled error: ", throwable);
    }

    @Override
    public void onComplete() {
        log.info("On Complete");
    }
}
预期输出如下:
[main] INFO RxTest - Published: 1
[main] INFO RxTest - Published: 2
[RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
[RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
[RxComputationThreadPool-1] INFO RxTest -  On next for 1
[main] INFO RxTest - Published: 3
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 2
实际输出:
11:30:32.166 [main] INFO BackpressureSubscriber - Starting execution with 2 initial requests
11:30:32.170 [main] INFO RxTest - Published: 1
11:30:32.189 [main] INFO RxTest - Published: 2
11:30:32.189 [RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
11:30:32.189 [RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
11:30:32.189 [main] INFO RxTest - Published: 3
11:30:32.190 [main] INFO RxTest - Published: 4
11:30:32.190 [RxComputationThreadPool-3] INFO RxTest - Starting consuming 3
11:30:32.190 [main] INFO RxTest - Published: 5
11:30:32.190 [RxComputationThreadPool-4] INFO RxTest - Starting consuming 4
11:30:32.190 [main] INFO RxTest - Published: 6
11:30:32.190 [RxComputationThreadPool-5] INFO RxTest - Starting consuming 5
11:30:32.190 [main] INFO RxTest - Published: 7
11:30:32.191 [RxComputationThreadPool-6] INFO RxTest - Starting consuming 6
11:30:32.191 [main] INFO RxTest - Published: 8
11:30:32.191 [RxComputationThreadPool-7] INFO RxTest - Starting consuming 7
11:30:32.191 [main] INFO RxTest - Published: 9
11:30:32.191 [RxComputationThreadPool-8] INFO RxTest - Starting consuming 8
11:30:32.191 [main] INFO RxTest - Published: 10
11:30:32.191 [RxComputationThreadPool-9] INFO RxTest - Starting consuming 9
11:30:32.191 [main] INFO RxTest - Published: 11
11:30:32.191 [RxComputationThreadPool-10] INFO RxTest - Starting consuming 10
11:30:32.192 [main] INFO RxTest - Published: 12
11:30:32.192 [RxComputationThreadPool-11] INFO RxTest - Starting consuming 11
11:30:32.192 [main] INFO RxTest - Published: 13
11:30:32.192 [main] INFO RxTest - Published: 14
11:30:32.192 [RxComputationThreadPool-12] INFO RxTest - Starting consuming 12
11:30:32.192 [main] INFO RxTest - Published: 15
11:30:32.192 [main] INFO RxTest - Published: 16
11:30:32.192 [main] INFO RxTest - Published: 17
11:30:32.192 [main] INFO RxTest - Published: 18
11:30:32.192 [main] INFO RxTest - Published: 19
11:30:32.294 [RxComputationThreadPool-2] INFO RxTest - Finished consuming 2
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Consuming finished, result: 1
11:30:32.294 [RxComputationThreadPool-1] INFO BackpressureSubscriber - On next for 1
经过库版本测试:
2.2.19
2.1.2
据我了解ReactiveX文档,我认为它是RX Bug。但是我可能是错的,如果您指出我将不胜感激

最佳答案

flatMap实际上是从上游批量请求的,并将缓冲项目直到下游请求它们为止。这个事实足以描述您所看到的行为。如果将bufferSize设置为1,则可能会看到您期望的行为。有一个重载,可让您设置bufferSize
另外,flatMap具有maxConcurrent参数,如果您意识到flatMap实际上是map,则易于理解,然后将merge应用于map给定的流。 merge一次只能实际订阅有限数量的源,即maxConcurrentbufferSizemaxConcurrent的默认值为128。
这里要记住,当合并步骤从下游接收到一个请求时,它不知道需要多少个流(要注意这里我们正在处理一个流)来满足请求!前10个流根本不会返回任何值。如果第一个流什么都没有返回并且没有在1小时内完成,并且我们有maxConcurrent = 1,那么即使第2个流和第3个流准备向我们发送东西,我们也不会在第一个小时收到任何事件。由于这些原因,我们必须为bufferSizemaxConcurrent选择通用默认值,并且通常选择这些值可以优化某些基准情况下的性能,并最大程度地减少许多极端情况下的问题。

10-02 23:48