在我试图写一个股票交易者 IObserver 的过程中,我遇到了三个错误,这些错误主要是从Reactive Extensions库中引发的。

我有以下CompanyInfo类:

public class CompanyInfo
{
    public string Name { get; set; }

    public double Value { get; set; }
}

还有一个称为IObservable<CompanyInfo>StockMarket:
public class StockMarket : IObservable<CompanyInfo>

我的Observer如下所示:
public class StockTrader : IObserver<CompanyInfo>
{
    public void OnCompleted()
    {
        Console.WriteLine("Market Closed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error);
    }

    public void OnNext(CompanyInfo value)
    {
        WriteStock(value);
    }

    private void WriteStock(CompanyInfo value) { ... }
}

我运行以下代码:
StockMarket market = GetStockMarket();
StockTrader trader = new StockTrader();

IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
    .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
    .SelectMany(x => x                  //4, 8, 2, 3
        .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
        .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
        .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
        {
            Name = x.Key,
            Value = (y[1].Value - y[0].Value) / y[0].Value
        })                                      //[F, +100%]; [S, -20%]
    );

using (IDisposable subscription = differential.Subscribe(trader))
{
    Observable.Wait(market);
}

发生以下三个错误之一:
  • ArgumentException 内部抛出以下Reactive Extensions::

  • 以下IndexOutOfRangeException:

  • Console的文本偶尔进行调整(颜色应一致):

  • c# - 引发可观察到的LINQ不一致异常-LMLPHP

    是什么导致那些怪异的症状?

    最佳答案

    关于Reactive Extensions概念的最伟大的事情之一就是能够订阅发生在“某处”的“出现”( IObservable )并在该“出现”上应用面向对象的概念-无需知道那个“某处”在哪里的能力。 。

    这样,Reactive Extensions简化了面向event的编程,并简化了 producer-consumer problems 的工作。



    在不知道观察到的数据源的情况下订阅 IObservable 的能力迫使订户认为通知是不可预测的。换句话说,当观察 IObservable 时,您应该假定可以同时发送通知

    由于Reactive Externsions的行为契约,因此IObservables一次应产生一项。通常,这就是发生的情况,但有时外部实现不遵循该契约(Contract)。

    让我们看一下三个问题中的每一个:

    GroupBy 不是线程安全的

    GroupBy 通过返回IObservable<IGroupedObservable<T>>来工作,其OnNext方法使用与当前通知匹配的IObservable调用外部OnNextIGroupedObservable<T>。这样做是通过为IGroupedObservable<T>中的每个键保留一个Subject<T>(更准确地说是一个Dictionary)来完成的,这并不奇怪-不是ConcurrentDictionary 。这意味着两个最近的通知可能会导致重复插入

    Select 并不孤单

    Select 的线程安全性由其提供的委托(delegate)确定。在上述情况下,提供给 Select 方法的委托(delegate)取决于Buffer(2, 1)将提供大小为2的列表。 Buffer包含Queue,它不是并发的,因此从多个线程进行迭代时为-Buffer'的Queue可以为我们提供一些意想不到的结果

    如果将Exception提供给NullReferenceException,则可以出于相同原因抛出的另一个y null ,或者可以在迭代时修改InvalidOperationException Queue

    即使是基本观察也不安全

    最后但并非最不重要的一点是,即使仅进行基本观察,StockTraderOnNext方法也正在以非atomic operation修改控制台,这会导致文本布局困惑。

    所以,你可以做什么?

    c# - 引发可观察到的LINQ不一致异常-LMLPHP

    存在 Synchronize 方法使您能够验证您是否正在订阅线性 IObservable<T> ,这意味着不能同时进行一次OnNext方法的调用。

    由于即使 GroupBy 扩展方法也不是线程安全的,因此需要在链的开头调用 Synchronize 方法:

    IObservable<CompanyInfo> differential = market  //[F, 1], [S, 5], [S, 4], [F, 2]
        .Synchronize()
        .GroupBy(x => x.Name)                       //[F, 1], [F, 2]; [S, 5], [S, 4]
        .SelectMany(x => x                  //4, 8, 2, 3
            .Buffer(2, 1)                   //(4, 8), (8, 2), (2, 3), (3)
            .SkipLast(1)                    //(4, 8), (8, 2), (2, 3)
            .Select(y => new CompanyInfo    //(+100%), (-75%), (+50%)
            {
                Name = x.Key,
                Value = (y[1].Value - y[0].Value) / y[0].Value
            })
        );                                          //[F, +100%]; [S, -20%]
    

    请注意,Synchronize向您的查询添加了另一个代理Observable,因此会使查询变慢一点,因此在不需要使用时应避免使用它。

    关于c# - 引发可观察到的LINQ不一致异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35141376/

    10-10 16:22