我正在将项目从Rx v1转换为Rx v2,目前正在将某些v1 Observable更改为v2 Flowable的过程中。

(在Android项目中,使用Spock用Groovy编写的单元测试)

通常,我只是使用钩子覆盖计划程序。我仍然可以在v2中通过注册调度程序处理程序来执行此操作。这总是通过使用(new?)Observable来使Schedulers.single()同步。但是,由于背压机制(?),Flowable仍然是异步的。

我尝试使用以下方法解决该问题:

Flowable<LogEntry> flowable = Flowable.create(new FlowableOnSubscribe<LogEntry>() {
    @Override
    void subscribe(FlowableEmitter<LogEntry> emitter) throws Exception {
        for (def log : logs) {
            emitter.onNext(log)
        }

        emitter.onComplete()
    }
}, FlowableEmitter.BackpressureMode.NONE);

但这仍然使它们异步。

我已经像这样重写了调度程序:
RxJavaPlugins.reset()
RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.single()
    }
})

RxAndroidPlugins.reset()
RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    Scheduler apply(Scheduler scheduler) throws Exception {
        return Schedulers.from(new Executor() {
            @Override
            void execute(Runnable command) {
                command.run()
            }
        })
    }
})

我似乎无法弄清楚Observable为何如此同步,但是Flowable却没有(在背压机制旁边)

最佳答案

Schedulers.single()是一个单线程异步调度程序。您需要Schedulers.trampoline()保留在同一线程上。

10-04 23:05
查看更多