问题描述
如果在 main()
方法中,我执行这个
If, in a main()
method, I execute this
Flux.just(1,2)
.log()
.subscribe();
我在控制台中得到了这个:
I get this in the console:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onComplete()
如果不是 just()
我使用 interval()
方法:
If instead of just()
I use the interval()
method:
Flux.interval(Duration.ofMillis(100))
.take(2)
.log()
.subscribe();
元素不会被记录,除非我添加 Thread.sleep()
这给了我:
the elements are not logged, unless I add Thread.sleep()
which gives me:
[ INFO] (main) onSubscribe(FluxTake.TakeSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onComplete()
问题是:为什么我需要暂停一个线程才能真正触发订阅?
The question is: why do I require to pause a thread to actually trigger the subscription ?
推荐答案
您需要等待主线程并让执行完成.您的主线程在生成下一个元素之前终止.您的第一个元素是在 100 ms
之后生成的,因此您需要等待/阻塞主线程.试试这个:
You need to wait on main thread and let the execution complete. Your main thread terminates before next element is generated. Your first element is generated after 100 ms
so you need to wait/block the main thread. Try this:
CountDownLatch latch = new CountDownLatch(1);
Flux.interval(Duration.ofMillis(100))
.take(2)
.doOnComplete(latch::countDown)
.log()
.subscribe();
latch.await(); // wait for doOnComplete
一种同步辅助,允许一个或多个线程等待其他线程中正在执行的一组操作完成.
这篇关于为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!