我是第一次尝试RX,但有几个问题。

1)有没有更好的方法来完成我的收藏的异步?

2)我需要在线程上阻塞,直到所有异步任务完成为止,该怎么办?

class Program
{

    internal class MyClass
    {
        private readonly List<int> _myData = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        private readonly Random random = new Random();

        public int DoSomething(int j)
        {
            int i = random.Next(j * 1000) - (j * 200);
            i = i < 0 ? 1000 : i;
            Thread.Sleep(i);
            Console.WriteLine(j);
            return j;
        }

        public IObservable<int> DoSomethingAsync(int j)
        {
            return Observable.CreateWithDisposable<int>(
                o => Observable.ToAsync<int, int>(DoSomething)(j).Subscribe(o)
                );
        }

        public void CreateTasks()
        {
            _myData.ToObservable(Scheduler.NewThread).Subscribe(
            onNext: (i) => DoSomethingAsync(i).Subscribe(),
            onCompleted: () => Console.WriteLine("Completed")
                );
        }
    }

    static void Main(string[] args)
    {
        MyClass test = new MyClass();

        test.CreateTasks();

        Console.ReadKey();
    }
}


(注意:我知道我可以将Observable.Range用于我的Int列表,但在实际程序中该列表不是Int类型)。

最佳答案

我可能会尝试

public void CreateTasks()
{
    _myData.ToObservable(Scheduler.NewThread)
        .SelectMany(i => Observable.Start(() => DoSomething(i)))
        .Subscribe(j => Console.WriteLine("j is {0}", j),
                  () => Console.WriteLine("Completed"));
}


因此,首先我更改了DoSomethingAsync,使其使用Observable.Start。 Observable.Start将异步运行DoSomething方法,并在该方法完成时通过IObservable.OnNext返回值。

然后,CreateTasks方法像以前一样在集合中的每个项目上运行,但是将每个值馈入SelectMany,并继续调用DoSomethingAsync方法。结果是,您将为每个完成的DoSomethingAsync调用接收一个OnNext,并在它们全部完成后获得一个OnComplete

关于c# - 如何使用List <>阻止ToObservable?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/5058531/

10-13 07:32