我通过随机回答问题并毫不羞耻地问愚蠢的新手问题,来教自己进行反应式编程。在弄清楚线程调度如何工作时,我设法陷入困境。虽然我很确定此代码没有逻辑意义,但我也无法理解发生了什么。弄清楚这一点可能会帮助我。这是代码:

var testScheduler = new TestScheduler();
var newThreadScheduler = new NewThreadScheduler();

var emitter = new Subject<string>();
testScheduler.Schedule(TimeSpan.FromSeconds(0.1), () => emitter.OnNext("one"));
testScheduler.Schedule(TimeSpan.FromSeconds(0.2), () => emitter.OnCompleted());

var subscription = emitter.SubscribeOn(newThreadScheduler)
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );

testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);

Console.WriteLine("DONE.");
Console.ReadLine();


我所期望的可能是:

one
DONE.
Complete!


通过可能的交错,我不太确定SubscribeOn()会做什么。相反,我得到的是:

DONE.
Complete!


这里到底发生了什么?为什么在完成之前没有生产出该物品?在这种情况下,ObserveOn()的工作与我预期的一样,并且我理解为什么:它在其他线程上运行委托,并且它们可以与“ DONE”交织。那么,SubscribeOn()到底在做什么?

最佳答案

您在这里拥有的只是一个竞赛条件。

如果我们将所有代码都撤回至

var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();

var subscription = emitter
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );



Console.WriteLine("DONE.");
Console.ReadLine();


我们得到相同的结果。
通过使用Subject<T>,您将不会获得任何缓存行为,但OnCompleted通知除外。

SubscribeOn运算符将安排在提供的IScheduler实例上进行的所有订阅工作。
订阅Subject<T>的情况下,几乎没有工作要做。
这几乎与将回调注册到回调列表一样简单。

将工作安排在NewThreadScheduler上将创建一个新线程,然后创建一个内部事件循环以处理计划的工作。
这非常快,但是确实需要创建一个新线程,一个EventloopScheduler并执行上下文切换到该新线程。

在您的示例中,您将OnNextOnCompleted通知安排在TestScheduler上。
然后,用SubscribeOn NewThreadScheduler
之后,您开始处理TestScheduler实例的所有计划工作。
这些虚拟计划项目的处理只是迭代计划项目,执行委托并推进虚拟时钟。
这是非常快的。

更具体地说,下面的代码类似于您编写的代码

var newThreadScheduler =  new NewThreadScheduler();

var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));

foreach (var callback in callbacks)
{
    callback("one");
}

Console.WriteLine("Done");


在这里,我们仅列出了回调操作(将其称为订阅者或观察者)。
然后,我们在新线程上异步调度添加这些回调之一。
然后立即迭代回调并将字符串“ one”发送给每个回调。
结果是

Done


在主线程可以遍历集合之前,NewThreadScheduler只是没有足够的时间来启动新线程,安排动作,然后执行该动作。

因此,我认为您有两个准则无法遵循:
 1)避开主题;-)
 2)不要混合使用线程测试和单元测试。我认为TestScheduler的存在是因为您正在对此进行测试。但是,您可以使用两个TestScheduler实例,例如背景和前景实例。

为了提供更多帮助,我将提供积极的建议,建议您从测试中删除第二个调度程序。
TestScheduler运算符中使用SubscribeOn实例。

接下来,我建议使用TestScheduler的Observable序列工厂方法(即CreateColdObservable)代替使用subject + scheduling。
最后,我不知道前进到1s的特定时间是否仅通过Start方法获得了任何收益。
我认为这将减少噪音并降低魔术值1s的使用率。

var testScheduler = new TestScheduler();

var source = testScheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
    ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));

var subscription = source.SubscribeOn(testScheduler)
                            .Subscribe(
                                item => Console.WriteLine(item),
                                error => Console.WriteLine(error),
                                () => Console.WriteLine("Complete!")
                            );

testScheduler.Start();

Console.WriteLine("DONE.");
Console.ReadLine();


现在唯一的问题是SubscribeOn调用非常多余。

仅供参考:NewThreadScheduler的代码-
https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs

08-26 09:24