本文介绍了如何窗口/缓冲IObservable< T>根据Func< T>分成大块.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上课:

class Foo { DateTime Timestamp {get; set;} }

...和一个IObservable<Foo>,并且保证单调递增 Timestamp s,我如何才能基于这些Timestamp生成一个IObservable<IList<Foo>>分块到列表中?

...and an IObservable<Foo>, with guaranteed monotonically increasing Timestamps, how can I generate an IObservable<IList<Foo>> chunked into Lists based on those Timestamps?

即每个IList<Foo>应该有五秒钟的事件,或任何其他事件.我知道可以将BufferTimeSpan重载一起使用,但是我需要从事件本身中抽出时间,而不是挂钟. (除非这里有巧妙的方法提供IScheduler并将IObservable本身用作.Now的来源?)

I.e. each IList<Foo> should have five seconds of events, or whatever. I know I can use Buffer with a TimeSpan overload, but I need to take the time from the events themselves, not the wall clock. (Unless there a clever way of providing an IScheduler here which uses the IObservable itself as the source of .Now?)

如果我尝试像这样使用Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)重载:

If I try to use the Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries) overload like so:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
    x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();

pub.Buffer(windows).Subscribe(x => t.Dump()));  // linqpad
pub.Connect();

...然后IList实例包含导致窗口关闭的项目,但是我真的希望该项目进入下一个窗口/缓冲区.

...then the IList instances contain the item that causes the window to be closed, but I really want this item to go into the next window/buffer.

例如带有时间戳[0, 1, 10, 11, 15],您将获得[[0], [1, 10], [11, 15]]的块,而不是[[0, 1], [10, 11], [15]]

E.g. with timestamps [0, 1, 10, 11, 15] you will get blocks of [[0], [1, 10], [11, 15]] instead of [[0, 1], [10, 11], [15]]

推荐答案

这是个主意.组键条件是窗口号",我使用GroupByUntil.这样,您便可以在示例中获得所需的输出(并且我已经像该示例一样使用了int流-但您可以替换需要为窗口编号的任何内容).

Here's an idea. The group key condition is the "window number" and I use GroupByUntil. This gives you the desired output in your example (and I've used an int stream just like that example - but you can substitute whatever you need to number your windows).

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateHotObservable<int>(
            OnNext(0, 0),
            OnNext(1, 1),
            OnNext(10, 10),
            OnNext(11, 11),
            OnNext(15, 15),
            OnCompleted(16, 0));

        xs.Publish(ps =>                                // (1)
            ps.GroupByUntil(
                p => p / 5,                             // (2)
                grp => ps.Where(p => p / 5 != grp.Key)) // (3)
            .SelectMany(x => x.ToList()))               // (4)
        .Subscribe(Console.WriteLine);

        scheduler.Start();
    }
}

注释

  1. 我们发布源流,因为我们将订阅不止一次.
  2. 这是创建组密钥的功能-使用此密钥可根据您的商品类型生成一个窗口编号.
  3. 这是组终止条件-使用此条件检查源流中另一个窗口中的项目.请注意,这意味着直到窗口外部的元素到达或源流终止,窗口才会关闭.如果您考虑一下,这是显而易见的-窗口结束后,所需的输出需要考虑下一个元素.请注意,如果您的源与实时有任何关系,则可以将其与Observable.Timer+Select合并,该Observable.Timer+Select输出您的术语的null/默认实例,以更早地终止流.
  4. SelectMany将组放入列表中并拉平流.
  1. We publish the source stream because we will subscribe more than once.
  2. This is a function to create a group key - use this to generate a window number from your item type.
  3. This is the group termination condition - use this to inspect the source stream for an item in another window. Note that means a window won't close until an element outside of it arrives, or the source stream terminates. This is obvious if you think about it - your desired output requires consideration of next element after a window ends. Note if your source bears any relation to real time, you could merge this with an Observable.Timer+Select that outputs a null/default instance of your term to terminate the stream earlier.
  4. SelectMany puts the groups into lists and flattens the stream.

如果您包含nuget包rx-testing,则此示例将在LINQPad中很好地运行.新建一个Tests实例,然后运行Test()方法.

This example will run in LINQPad quite nicely if you include nuget package rx-testing. New up a Tests instance and just run the Test() method.

这篇关于如何窗口/缓冲IObservable&lt; T&gt;根据Func&lt; T&gt;分成大块.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 08:34