问题描述
我一直在寻找新的rx java 2,我不太确定我理解 backpressure
的想法......
I have been looking at new rx java 2 and I'm not quite sure I understand the idea of backpressure
anymore...
我知道我们有 Observable
没有 backpressure
支持和 Flowable
有它。
I'm aware that we have Observable
that does not have backpressure
support and Flowable
that has it.
所以根据例子,假设我有可流动
with interval
:
So based on example, lets say I have flowable
with interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
这会在大约128个值之后崩溃,这很明显我消耗的速度比获取物品慢。
This is going to crash after around 128 values, and thats pretty obvious I am consuming slower than getting items.
但是我们和 Observable相同
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
这根本不会崩溃,即使我延迟消耗它仍然有效。要使 Flowable
工作,我可以说我把 onBackpressureDrop
运算符,崩溃消失了,但并不是所有的值都被发出。
This will not crash at all, even when I put some delay on consuming it still works. To make Flowable
work lets say I put onBackpressureDrop
operator, crash is gone but not all values are emitted either.
所以我目前无法找到答案的基本问题是为什么我可以使用 backpressure
plain Observable
仍然接收所有值而不管理缓冲区
?或者从另一方面来看, backpressure
有什么好处让我支持管理和处理消费?
So the base question I can not find answer currently in my head is why should I care about backpressure
when I can use plain Observable
still receive all values without managing the buffer
? Or maybe from the other side, what advantages do backpressure
give me in favour of managing and handling the consuming?
推荐答案
在实践中有什么背压表明是有界缓冲区, Flowable.observeOn
有一个128个元素的缓冲区,它可以像下游那样快速耗尽接受。您可以单独增加此缓冲区大小以处理突发源,并且所有背压管理实践仍适用于1.x. Observable.observeOn
有一个无限制的缓冲区,不断收集元素,你的应用可能会耗尽内存。
What backpressure manifests in practice is bounded buffers, Flowable.observeOn
has a buffer of 128 elements that gets drained as fast as the dowstream can take it. You can increase this buffer size individually to handle bursty source and all the backpressure-management practices still apply from 1.x. Observable.observeOn
has an unbounded buffer that keeps collecting the elements and your app may run out of memory.
你可以使用 Observable
例如:
- 处理GUI事件
- 使用短序列(总共少于1000个元素)
您可以使用 Flowable
例如:
- 冷和非定时来源
- 生成器之类来源
- 网络和数据库访问者
这篇关于Observable vs Flowable rxJava2的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!