我想生成一个可观察的文件,以便可以随时取消对文件名的发现。出于本示例的原因,取消将在1秒钟内自动发生。
这是我当前的代码:
class Program
{
static void Main()
{
try
{
RunAsync(@"\\abc\xyz").GetAwaiter().GetResult();
}
catch (Exception exc)
{
Console.Error.WriteLine(exc);
}
Console.Write("Press Enter to exit");
Console.ReadLine();
}
private static async Task RunAsync(string path)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await GetFileSource(path, cts);
}
private static IObservable<string> GetFileSource(string path, CancellationTokenSource cts)
{
return Observable.Create<string>(obs => Task.Run(async () =>
{
Console.WriteLine("Inside Before");
foreach (var file in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).Take(50))
{
cts.Token.ThrowIfCancellationRequested();
obs.OnNext(file);
await Task.Delay(100);
}
Console.WriteLine("Inside After");
obs.OnCompleted();
return Disposable.Empty;
}, cts.Token))
.Do(Console.WriteLine);
}
}
我不喜欢实现的两个方面(如果还有更多方面,请随时指出):
我有一个枚举的文件,但是我手动遍历每个文件。我可以使用
ToObservable
扩展名吗?我不知道如何利用传递给
cts.Token
的Task.Run
。必须使用从外部上下文(cts
参数)捕获的GetFileSource
。在我看来很难看。这是应该怎么做?一定是更好的方法。
最佳答案
我仍然不确定这确实是一个Reactive问题,您是在要求生产者施加反压力,这实际上与Reactive应该如何工作无关。
话虽这么说,如果您要以这种方式进行操作,您应该意识到非常细粒度的时间操作几乎应该总是委派给Scheduler
,而不是尝试与Tasks
和CancellationTokens
进行协调。因此,我将重构为如下所示:
public static IObservable<string> GetFileSource(string path, Func<string, Task<string>> processor, IScheduler scheduler = null) {
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<string>(obs =>
{
//Grab the enumerator as our iteration state.
var enumerator = Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories)
.GetEnumerator();
return scheduler.Schedule(enumerator, async (e, recurse) =>
{
if (!e.MoveNext())
{
obs.OnCompleted();
return;
}
//Wait here until processing is done before moving on
obs.OnNext(await processor(e.Current));
//Recursively schedule
recurse(e);
});
});
}
然后,使用
TakeUntil
而不是传递取消令牌:var source = GetFileSource(path, x => {/*Do some async task here*/; return x; })
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1));
您还可以看到async
Generate
method实现的更高级的示例。关于c# - Rx.NET产生可取消的可观察文件名的方法是什么?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31522809/