在我试图写一个股票交易者 IObserver
的过程中,我遇到了三个错误,这些错误主要是从Reactive Extensions
库中引发的。
我有以下CompanyInfo
类:
public class CompanyInfo
{
public string Name { get; set; }
public double Value { get; set; }
}
还有一个称为
IObservable<CompanyInfo>
的StockMarket
:public class StockMarket : IObservable<CompanyInfo>
我的
Observer
如下所示:public class StockTrader : IObserver<CompanyInfo>
{
public void OnCompleted()
{
Console.WriteLine("Market Closed");
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(CompanyInfo value)
{
WriteStock(value);
}
private void WriteStock(CompanyInfo value) { ... }
}
我运行以下代码:
StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
}) //[F, +100%]; [S, -20%]
);
using (IDisposable subscription = differential.Subscribe(trader))
{
Observable.Wait(market);
}
发生以下三个错误之一:
ArgumentException
内部抛出以下Reactive Extensions
::IndexOutOfRangeException
:Console
的文本偶尔进行调整(颜色应一致):是什么导致那些怪异的症状?
最佳答案
关于Reactive Extensions
概念的最伟大的事情之一就是能够订阅发生在“某处”的“出现”( IObservable
)并在该“出现”上应用面向对象的概念-无需知道那个“某处”在哪里的能力。 。
这样,Reactive Extensions
简化了面向event
的编程,并简化了 producer-consumer
problems 的工作。
在不知道观察到的数据源的情况下订阅 IObservable
的能力迫使订户认为通知是不可预测的。换句话说,当观察 IObservable
时,您应该假定可以同时发送通知。
由于Reactive Externsions
的行为契约,因此IObservables
一次应产生一项。通常,这就是发生的情况,但有时外部实现不遵循该契约(Contract)。
让我们看一下三个问题中的每一个:
GroupBy
不是线程安全的
GroupBy
通过返回IObservable<IGroupedObservable<T>>
来工作,其OnNext
方法使用与当前通知匹配的IObservable
调用外部OnNext
的IGroupedObservable<T>
。这样做是通过为IGroupedObservable<T>
中的每个键保留一个Subject<T>
(更准确地说是一个Dictionary
)来完成的,这并不奇怪-不是ConcurrentDictionary
。这意味着两个最近的通知可能会导致重复插入。
Select
并不孤单
Select
的线程安全性由其提供的委托(delegate)确定。在上述情况下,提供给 Select
方法的委托(delegate)取决于Buffer(2, 1)
将提供大小为2的列表。 Buffer
包含Queue
,它不是并发的,因此从多个线程进行迭代时为-Buffer
'的Queue
可以为我们提供一些意想不到的结果。
如果将Exception
提供给NullReferenceException
,则可以出于相同原因抛出的另一个y
是 null
,或者可以在迭代时修改InvalidOperationException
的 Queue
。
即使是基本观察也不安全
最后但并非最不重要的一点是,即使仅进行基本观察,StockTrader
的OnNext
方法也正在以非atomic operation修改控制台,这会导致文本布局困惑。
所以,你可以做什么?
存在 Synchronize
方法使您能够验证您是否正在订阅线性 IObservable<T>
,这意味着不能同时进行一次OnNext
方法的调用。
由于即使 GroupBy
扩展方法也不是线程安全的,因此需要在链的开头调用 Synchronize
方法:
IObservable<CompanyInfo> differential = market //[F, 1], [S, 5], [S, 4], [F, 2]
.Synchronize()
.GroupBy(x => x.Name) //[F, 1], [F, 2]; [S, 5], [S, 4]
.SelectMany(x => x //4, 8, 2, 3
.Buffer(2, 1) //(4, 8), (8, 2), (2, 3), (3)
.SkipLast(1) //(4, 8), (8, 2), (2, 3)
.Select(y => new CompanyInfo //(+100%), (-75%), (+50%)
{
Name = x.Key,
Value = (y[1].Value - y[0].Value) / y[0].Value
})
); //[F, +100%]; [S, -20%]
请注意,
Synchronize
向您的查询添加了另一个代理Observable
,因此会使查询变慢一点,因此在不需要使用时应避免使用它。关于c# - 引发可观察到的LINQ不一致异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35141376/