我正在尝试将Executor与SubscribeOn和obsereOn一起使用,以使异步任务完成后回到主线程。
我最终得到了这段代码,但它不起作用
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
// while (tasks.size() != 0) {
// tasks.take().run();
// }
System.out.println("done");
}
private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private void processValue(int value) throws InterruptedException {
Observable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(Schedulers.from(command -> tasks.add(command)))
.subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
tasks.take().run();
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
知道如何完成我想要的吗?
我跑步时只打印
Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done
问候
最佳答案
您的方法的问题在于,您不知道在给定的时间应该执行多少个任务,也无法在解除阻塞主线程后等待应该发生的任务时死锁。
我知道1.x的任何扩展都不支持返回Java主线程。对于2.x,扩展项目中的BlockingScheduler允许您执行以下操作:
public static void main(String[] args) {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> {
Flowable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
});
System.out.println("BlockingScheduler finished");
}
请注意对
scheduler.shutdown()
的调用,最终将被调用以释放主线程,否则您的程序可能永远不会终止。关于java - 主线程中的SubscribeOn和observeOn,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45648686/