简而言之,是否有任何解决方案可以解决RxJava中的背压而不求助于删除项目,序列化操作或无限制的缓冲?

考虑以下任务作为何时可能有用的示例。


从磁盘读取数据到内存
压缩数据
通过网络传输压缩数据


直接的方法是在单个后台线程上依次执行所有任务,如下所示:

observeBlocksOfFileContents(file).
    .subscribeOn(backgroundScheduler)
    .map(compressBlock)
    .subscribe(transmitBlock);


尽管这没有问题,但是从性能角度来看,它不是最佳的,因为运行时是所有三个操作的总和,而不是并行运行时它们的最大值:

observeBlocksOfFileContents(file).
    .subscribeOn(diskScheduler)
    .observeOn(cpuScheduler)
    .map(compressBlock)
    .observeOn(networkScheduler)
    .subscribe(transmitBlock);


但是,如果从磁盘读取数据的速度快于压缩和传输数据的速度,则可能由于背压而失败。通常的背压解决方案是不受欢迎的,原因如下:


删除项:文件必须完整传输,且不得丢失
在单线程上序列化:流水线的性能改进已失去
调用堆栈阻塞:not supported in RxJava
增加observeOn缓冲区:内存消耗可能变成文件大小的几倍
在没有MissingBackpressureException的情况下重新实现observeOn:大量工作并中断了流畅的API


还有其他解决方案吗?还是这根本不适合ReactiveX可观察模型?

最佳答案

6)实现observeBlocksOfFileContents,使其支持背压。

文件系统已经是基于拉的(InputStream.read()发生在您想要的时候,而不是抛出它),因此请考虑一个合理的块大小,并在每个请求中读取它:

Observable.create(SyncOnSubscribe.createStateful(
    () -> new FileInputStream("file.dat")
    (in, out) -> {
        byte[] buf = new byte[4096];
        int r = in.read(buf);
        if (r < 0) {
            out.onCompleted();
        } else {
            if (r == buf.length) {
                out.onNext(buf);
            } else {
                byte[] buf2 = new byte[r];
                System.arraycopy(buf, 0, buf2, 0, r);
                out.onNext(buf2);
            }
        }

    },
    in -> in.close()
));


(为简便起见,省略了try-catching。)

10-07 16:32