来自C#,当我使用RX并且存在背压时,项目将不断添加到内部队列中,直到应用程序内存不足为止(据我所知)。

在ReactiveX(RXJava)中,当背压开始建立时,通过抛出异常,他们似乎采取了不同的立场。

这意味着我必须使用诸如onBackpressureBuffer()之类的东西,并且在对subscribe()的调用中传递一个Subscriber<? super T>,这使得请求上升到流中以释放压力。

也许是因为我使用了RX.NET方法,但这对我来说似乎是明智的。

首先,我是否正确理解了这一点?

其次,无论如何,我是否可以“禁用”此功能,以便它的行为与RX.NET相同,因为我不想通过检查是否已经实现了这些反压运算符之一来使subscribe()调用复杂化看看我是否必须调用request()

最佳答案

在scala中(我不知道Java语法,但是方法调用是相同的),您只需要打开
fastHotObservable.subscribe(next => slowFunction(next))
进入
fastHotObservable.onBackpressureBuffer.subscribe(next => slowFunction(next))
那应该做。当然,在运行它时,必须要有一段时间不 Activity ,因此该过程有时需要一些时间来追赶和处理缓冲的元素。

我认为这并不介意,我觉得很不错,您可以选择自己应对 Unresolved 背压的策略,而不必强制自己为自己选择一种方法。我也更喜欢必须明确地指定它。

实际上,RX.net使用的策略并不总是最好的。最近,我一直在使用一些onBackpressureDrop调用,只是忘记了我没有时间处理的鼠标移动,而我很高兴可以避免让它们如此轻松地进行缓冲。

10-07 18:57
查看更多