本文介绍了转换IEnumerable< Task< T>>到10b可观察到的T.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用反应性扩展(Rx)来缓冲完成的任务枚举.有人知道这样做是否有一种干净的内置方法吗? ToObservable 扩展方法只会使 IObservable< Task< T>> ,这不是我想要的,我想要一个 IObservable< T> ,这样我就可以在 Buffer 上使用了.

I'm trying to use the Reactive Extensions (Rx) to buffer an enumeration of Tasks as they complete. Does anyone know if there is a clean built-in way of doing this? The ToObservable extension method will just make an IObservable<Task<T>>, which is not what I want, I want an IObservable<T>, that I can then use Buffer on.

人为的例子:

//Method designed to be awaitable
public static Task<int> makeInt()
{
     return Task.Run(() => 5);
}

//In practice, however, I don't want to await each individual task
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer
public static void Main()
{
    //Make a bunch of tasks
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt());

    //Is there a built in way to turn this into an Observable that I can then buffer?
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //????

    buffered.Subscribe(ints => {
        Console.WriteLine(ints.Count()); //Should be 15
    });
}

推荐答案

您可以使用以下事实:可以使用的另一个重载.

You can use the fact that Task can be converted to observable using another overload of ToObservable().

当您具有(单个项目)可观察项的集合时,可以使用.

When you have a collection of (single-item) observables, you can create a single observable that contains the items as they complete using Merge().

因此,您的代码应如下所示:

So, your code could look like this:

futureInts.Select(t => t.ToObservable())
          .Merge()
          .Buffer(15)
          .Subscribe(ints => Console.WriteLine(ints.Count));

这篇关于转换IEnumerable&lt; Task&lt; T&gt;&gt;到10b可观察到的T.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 01:28