我想生成一个可观察的文件,以便可以随时取消对文件名的发现。出于本示例的原因,取消将在1秒钟内自动发生。

这是我当前的代码:

class Program
{
    static void Main()
    {
        try
        {
            RunAsync(@"\\abc\xyz").GetAwaiter().GetResult();
        }
        catch (Exception exc)
        {
            Console.Error.WriteLine(exc);
        }
        Console.Write("Press Enter to exit");
        Console.ReadLine();
    }

    private static async Task RunAsync(string path)
    {
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
        await GetFileSource(path, cts);
    }

    private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
    {
        return Observable.Create<string>(obs => Task.Run(async () =>
        {
            Console.WriteLine("Inside Before");
            foreach (var file in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).Take(50))
            {
                cts.Token.ThrowIfCancellationRequested();
                obs.OnNext(file);
                await Task.Delay(100);
            }
            Console.WriteLine("Inside After");
            obs.OnCompleted();
            return Disposable.Empty;
        }, cts.Token))
        .Do(Console.WriteLine);
    }
}


我不喜欢实现的两个方面(如果还有更多方面,请随时指出):


我有一个枚举的文件,但是我手动遍历每个文件。我可以使用ToObservable扩展名吗?
我不知道如何利用传递给cts.TokenTask.Run。必须使用从外部上下文(cts参数)捕获的GetFileSource。在我看来很难看。


这是应该怎么做?一定是更好的方法。

最佳答案

我仍然不确定这确实是一个Reactive问题,您是在要求生产者施加反压力,这实际上与Reactive应该如何工作无关。

话虽这么说,如果您要以这种方式进行操作,您应该意识到非常细粒度的时间操作几乎应该总是委派给Scheduler,而不是尝试与TasksCancellationTokens进行协调。因此,我将重构为如下所示:

public static IObservable<string> GetFileSource(string path, Func<string, Task<string>> processor, IScheduler scheduler = null) {

  scheduler = scheduler ?? Scheduler.Default;

  return Observable.Create<string>(obs =>
  {
    //Grab the enumerator as our iteration state.
    var enumerator = Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories)
                              .GetEnumerator();
    return scheduler.Schedule(enumerator, async (e, recurse) =>
    {
      if (!e.MoveNext())
      {
         obs.OnCompleted();
         return;
      }

      //Wait here until processing is done before moving on
      obs.OnNext(await processor(e.Current));

      //Recursively schedule
      recurse(e);
    });
  });

}


然后,使用TakeUntil而不是传递取消令牌:

var source = GetFileSource(path, x => {/*Do some async task here*/; return x; })
 .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1));


您还可以看到async Generate method实现的更高级的示例。

关于c# - Rx.NET产生可取消的可观察文件名的方法是什么?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31522809/

10-12 00:42
查看更多