如果观察者正在使用watch_on(rxcpp::observe_on_new_thread()),等待所有观察者on_completed被调用的正确方法是:例如:{ Foo foo; auto generator = [&](rxcpp::subscriber<int> s) { s.on_next(1); // ... s.on_completed(); }; auto values = rxcpp::observable<>::create<int>(generator).publish(); auto s1 = values.observe_on(rxcpp::observe_on_new_thread()) .subscribe([&](int) { slow_function(foo); })); auto lifetime = rxcpp::composite_subscription(); lifetime.add([&](){ wrapper.log("unsubscribe"); }); auto s2 = values.ref_count().as_blocking().subscribe(lifetime); // hope to call something here to wait for the completion of // s1's on_completed function}// the program usually crashes here when foo goes out of scope because// the slow_function(foo) is still working on foo. I also noticed that// s1's on_completed never got called.我的问题是如何等待直到s1的on_completed完成而无需设置和轮询某些变量。使用observe_on()的动机是因为通常在值上有多个观察者,我希望每个观察者可以同时运行。也许有不同的方法可以实现相同的目标,我愿意接受您的所有建议。 最佳答案 合并两者将允许单个阻止订阅等待两者均完成。{ Foo foo; auto generator = [&](rxcpp::subscriber<int> s) { s.on_next(1); s.on_next(2); // ... s.on_completed(); }; auto values = rxcpp::observable<>::create<int>(generator).publish(); auto work = values. observe_on(rxcpp::observe_on_new_thread()). tap([&](int c) { slow_function(foo); }). finally([](){printf("s1 completed\n");}). as_dynamic(); auto start = values. ref_count(). finally([](){printf("s2 completed\n");}). as_dynamic(); // wait for all to finish rxcpp::observable<>::from(work, start). merge(rxcpp::observe_on_new_thread()). as_blocking().subscribe();}几点。该流必须返回相同的类型才能进行合并。如果合并不同类型的流,请改用Combine_latest。observable ::from()中observable的顺序很重要,起始流具有ref_count,因此必须最后调用它,以便以下合并将在启动生成器之前订阅了工作。合并有两个线程调用它。这要求使用线程安全的协调。 rxcpp是按使用付费的。默认情况下,运算符(operator)假定所有调用均来自同一线程。任何从多个线程获得调用的运算符(operator)都必须进行线程安全的协调,该运算符(operator)可以使用该协调来实现线程安全的状态管理和输出调用。如果需要,可以将相同的协调器实例用于两者。 07-27 13:36