假设我有一组股票代码:

StockTicker stockTicker =
  new StockTicker("MSFT", "APPL", "YHOO", "GOOG");
stockTicker.OnPriceChanged += (sender, args ) =>
{
  Console.WriteLine(
    "{0}: Price - {1}  Volume - {2}",
    args.StockSymbol, args.Price, args.Volume);
};


我可以订阅事件并以hot IObservable的形式获取:

IObservable<PriceChangedEventArgs> priceChangedObservable =
  Observable.FromEventPattern<PriceChangedEventArgs>(
    eventHandler => stockTicker.OnPriceChanged += eventHandler,
    eventHandler => stockTicker.OnPriceChanged -= eventHandler )
      .Select( eventPattern => eventPattern.EventArgs );

priceChangedObservable.Subscribe(args => Console.WriteLine(
  "{0}: Price - {1}  Volume - {2}",
    args.StockSymbol, args.Price, args.Volume ) );


随着新引号的出现,这将打印每个符号的引号:依次每个事件是单个qute YHOO 25.33, MSFT 127, AAPL 175, GOOG 1126等。

如何修改上面的代码,使每个“引号”都是每个引号的所有当前引号的组合? (YHOO 25.33, MSFT 127, GOOG 1126, AAPL 175)。现在,“引号”是从所有引号中看到的最后一个引号的状态,用作一个“引号”。

我看到Rx具有Zip运算符,但其语义似乎需要n IObservables进行压缩。这里有多个引号通过同一个IObservable吗?

因此,Console.WriteLine会将所有订阅的n个引号打印为一个单事件(并且仅在所有它们都有一个值时才打印)

最佳答案

如果这些随机事件以任何顺序到达并且您仅在任何时间都对最新值感兴趣,那么您可能希望使用Scan运算符对字典进行更新:

source
    /* keep track of the latest value for each symbol */
    .Scan(new Dictionary<string, decimal>(), (a, b) => a[b.StockSymbol] = b.Price)

    /* example logic to wait until they all have values */
    .Where(dict => stockTicker.StockSymbols.All(dict.ContainsKey))

    .Subscribe(dict => { ... });




如果要缓冲的事件数保持不变,并且您知道源在事件之间循环,则Buffer运算符可能对您有用:

source
    /* burst values periodically */
    .Buffer(count)

    .Subscribe(list => { ... })

09-26 17:33
查看更多