来自C#,当我使用RX并且存在背压时,项目将不断添加到内部队列中,直到应用程序内存不足为止(据我所知)。
在ReactiveX(RXJava)中,当背压开始建立时,通过抛出异常,他们似乎采取了不同的立场。
这意味着我必须使用诸如onBackpressureBuffer()
之类的东西,并且在对subscribe()
的调用中传递一个Subscriber<? super T>
,这使得请求上升到流中以释放压力。
也许是因为我使用了RX.NET方法,但这对我来说似乎是明智的。
首先,我是否正确理解了这一点?
其次,无论如何,我是否可以“禁用”此功能,以便它的行为与RX.NET相同,因为我不想通过检查是否已经实现了这些反压运算符之一来使subscribe()
调用复杂化看看我是否必须调用request()
。
最佳答案
在scala中(我不知道Java语法,但是方法调用是相同的),您只需要打开fastHotObservable.subscribe(next => slowFunction(next))
进入fastHotObservable.onBackpressureBuffer.subscribe(next => slowFunction(next))
那应该做。当然,在运行它时,必须要有一段时间不 Activity ,因此该过程有时需要一些时间来追赶和处理缓冲的元素。
我认为这并不介意,我觉得很不错,您可以选择自己应对 Unresolved 背压的策略,而不必强制自己为自己选择一种方法。我也更喜欢必须明确地指定它。
实际上,RX.net使用的策略并不总是最好的。最近,我一直在使用一些onBackpressureDrop
调用,只是忘记了我没有时间处理的鼠标移动,而我很高兴可以避免让它们如此轻松地进行缓冲。