简而言之,是否有任何解决方案可以解决RxJava中的背压而不求助于删除项目,序列化操作或无限制的缓冲?
考虑以下任务作为何时可能有用的示例。
从磁盘读取数据到内存
压缩数据
通过网络传输压缩数据
直接的方法是在单个后台线程上依次执行所有任务,如下所示:
observeBlocksOfFileContents(file).
.subscribeOn(backgroundScheduler)
.map(compressBlock)
.subscribe(transmitBlock);
尽管这没有问题,但是从性能角度来看,它不是最佳的,因为运行时是所有三个操作的总和,而不是并行运行时它们的最大值:
observeBlocksOfFileContents(file).
.subscribeOn(diskScheduler)
.observeOn(cpuScheduler)
.map(compressBlock)
.observeOn(networkScheduler)
.subscribe(transmitBlock);
但是,如果从磁盘读取数据的速度快于压缩和传输数据的速度,则可能由于背压而失败。通常的背压解决方案是不受欢迎的,原因如下:
删除项:文件必须完整传输,且不得丢失
在单线程上序列化:流水线的性能改进已失去
调用堆栈阻塞:not supported in RxJava
增加observeOn缓冲区:内存消耗可能变成文件大小的几倍
在没有MissingBackpressureException的情况下重新实现observeOn:大量工作并中断了流畅的API
还有其他解决方案吗?还是这根本不适合ReactiveX可观察模型?
最佳答案
6)实现observeBlocksOfFileContents,使其支持背压。
文件系统已经是基于拉的(InputStream.read()发生在您想要的时候,而不是抛出它),因此请考虑一个合理的块大小,并在每个请求中读取它:
Observable.create(SyncOnSubscribe.createStateful(
() -> new FileInputStream("file.dat")
(in, out) -> {
byte[] buf = new byte[4096];
int r = in.read(buf);
if (r < 0) {
out.onCompleted();
} else {
if (r == buf.length) {
out.onNext(buf);
} else {
byte[] buf2 = new byte[r];
System.arraycopy(buf, 0, buf2, 0, r);
out.onNext(buf2);
}
}
},
in -> in.close()
));
(为简便起见,省略了try-catching。)