我正在尝试创建一个Flowable,它发出有关背压的事件以避免内存问题,同时并行运行转换的每个阶段以提高效率。我创建了一个简单的测试程序,以推断程序不同步骤的行为以及发出事件与等待事件的时间。

我的程序如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
      .stream().map(i -> {
        System.out.println("emitting:" + i);
        return i;
      });

  Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
  System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));

  Long count = flowable.onBackpressureBuffer(10)
      .buffer(10)
      .flatMap(buf -> {
        System.out.println("Sleeping 500 for batch");
        Thread.sleep(500);
        System.out.println("Got batch of events");
        return Flowable.fromIterable(buf);
      }, 1)
      .map(x -> x + 1)
      .doOnNext(i -> {
        System.out.println(String.format("Sleeping : %d", i));
        Thread.sleep(100);
        System.out.println(i);
      })
      .count()
      .blockingGet();

  System.out.println("count: " + count);
}

当我运行此命令时,我得到的输出符合预期的背压,其中将一系列事件发射到buffer中的大小,然后将它们映射在一起,最后采取一些措施,一张一张地打印它们:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
Sleeping 500 for batch
Got batch of events
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
10
emitting:10
emitting:11
emitting:12
emitting:13
emitting:14
emitting:15
emitting:16
emitting:17
emitting:18
emitting:19
Sleeping 500 for batch
Got batch of events
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13

但是,如果我尝试通过向.observeOn(Schedulers.computation())添加一些调用来并行化操作的不同阶段,则似乎我的程序不再尊重背压。我的代码现在看起来像:
public static void main(String[] args) throws ExecutionException, InterruptedException {
  Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
      .stream().map(i -> {
        System.out.println("emitting:" + i);
        return i;
      });

  Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
  System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));

  Long count = flowable.onBackpressureBuffer(10)
      .buffer(10)
      .observeOn(Schedulers.computation())
      .flatMap(buf -> {
        System.out.println("Sleeping 500 for batch");
        Thread.sleep(500);
        System.out.println("Got batch of events");
        return Flowable.fromIterable(buf);
      }, 1)
      .map(x -> x + 1)
      .observeOn(Schedulers.computation())
      .doOnNext(i -> {
        System.out.println(String.format("Sleeping : %d", i));
        Thread.sleep(100);
        System.out.println(i);
      })
      .observeOn(Schedulers.computation())
      .count()
      .blockingGet();

  System.out.println("count: " + count);
}

我的输出是以下内容,其中所有事件都是预先发出的,而不是考虑各个执行阶段指定的背压和缓冲区:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
emitting:10
Sleeping 500 for batch
emitting:11
emitting:12
... everything else is emitted here ...
emitting:998
emitting:999
Got batch of events
Sleeping 500 for batch
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
Got batch of events
Sleeping 500 for batch
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
Got batch of events
Sleeping 500 for batch
10
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
13
Sleeping : 14
14
Sleeping : 15
Got batch of events
Sleeping 500 for batch
15
Sleeping : 16
16
Sleeping : 17
17
Sleeping : 18
18
Sleeping : 19
19
Sleeping : 20
Got batch of events
Sleeping 500 for batch
20
Sleeping : 21
21
Sleeping : 22
22
Sleeping : 23
23
Sleeping : 24
24
Sleeping : 25
Got batch of events
Sleeping 500 for batch
25

假装我的批处理阶段正在调用外部服务,但是由于延迟,我希望它们并行运行。我还希望在给定时间控制内存中的项目数,因为最初发出的项目数可能会高度可变,并且批量运行的阶段要比事件的初始发出要慢得多。

我如何让Flowable尊重Scheduler上的背压?为什么在我加入observeOn调用时似乎只不尊重背压?

最佳答案



实际上,应用onBackpressureBuffer会使它上方的源与下游施加的任何背压断开连接,因为它是不受限制的运算符。您不需要它,因为Flowable.fromIterable(并且顺便说一句,RxJava具有range运算符)支持并遵守背压。



在第一个示例中,发生了自然的背压,称为调用堆栈阻塞。 RxJava默认情况下是同步的,并且大多数运算符不会引入异步,就像在第一个示例中一样。
observeOn引入了异步边界,因此从理论上讲,各个阶段可以彼此并行运行。它具有默认的128个元素的预取缓冲区,可以通过其重载之一对其进行调整。但是,在您的情况下,buffer(10)实际上会将预取量放大为1280,这仍然可能导致一次性消耗1000个元素长的源。

关于java - 为什么在使用observeOn时,我的RxJava Flowable不尊重背压?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45176982/

10-10 19:55