我正在使用DataFlowEx,想知道如果引发异常,如何避免关闭整个DataFlow。

我有一个系统,其中的任务将随机出现,并且我希望网络记录故障,放弃该特定任务并继续执行其他任务。

在阅读有关TPL和DataFlowEx的文档时,特别是诸如


  它[故障块]应该拒绝任何进一步的传入消息。 Here
  
  DataflowEx对异常处理采取了快速失败的方法,就像
  TPL数据流。引发异常时,低级块结束于
  首先是故障状态。然后是父级的Dataflow实例
  失败块的通知。它将立即传播
  致命错误:通知其他孩子立即关闭。后
  它的所有子项都已完成/已完成,父数据流也可以
  它的完成,原始的例外包裹在
  状态也为Fault的CompletionTask。 Here


似乎不希望从故障中继续前进的障碍...

我的流程中包含很多文件IO,我希望偶尔会发生异常(网络卷在读/写,连接失败,权限问题期间脱机)。

我不希望整个管道消失。

这是我正在使用的代码的示例:

using Gridsum.DataflowEx;
using System;
using System.IO;
using System.Threading.Tasks.Dataflow;

namespace DataManagementSystem.Data.Pipeline.Actions
{
    class CopyFlow : Dataflow<FileInfo, FileInfo>
    {
        private TransformBlock<FileInfo, FileInfo> Copier;
        private string destination;

        public CopyFlow(string destination) : base(DataflowOptions.Default)
        {
            this.destination = destination;

            Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));

            RegisterChild(Copier);
        }

        public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }

        public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }

        protected virtual FileInfo Copy(FileInfo file)
        {
            try
            {
                return file.CopyTo(Path.Combine(destination, file.Name));
            }
            catch(Exception ex)
            {
                //Log the exception
                //Abandon this unit of work
                //resume processing subsequent units of work
            }

        }
    }
}


这是我将工作发送到管道的方式:

var result = pipeline.ProcessAsync(new[] { file1, file2 }).Result;

最佳答案

如果抛出Exception,则该块将成为故障。如果您不希望管道失败,则不能传播完成或处理Exception。处理异常可以采取多种形式,但是听起来您需要的只是一次简单的重试。您可以使用try/catch并实现自己的重试循环或使用类似Polly的方法。一个简单的例子如下所示。

public BuildPipeline() {
    var waitTime = TimeSpan.FromSeconds(1);
    var retryPolicy = Policy.Handle<IOException>()
                            .WaitAndRetryAsync(3, i => waitTime);
    var fileIOBlock = new ActionBlock<string>(async fileName => await retryPolicy.ExecuteAsync(async () => await FileIOAsync(fileName)));
}


注意:此代码尚未经过测试,但可以正确引导您。

编辑

您几乎拥有所需的一切。一旦捕获到异常并将其记录下来,您就可以返回null或一些其他标记,您可以从管道中过滤掉这些标记到NullTarget。此代码确保NullTarget过滤链接是Copier上的第一个链接,因此任何null都不会使其到达您的实际目的地。

class CopyFlow : Dataflow<FileInfo, FileInfo> {
    private TransformBlock<FileInfo, FileInfo> Copier;
    private string destination;

    public CopyFlow(string destination) : base(DataflowOptions.Default) {
        this.destination = destination;

        Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));
        Copier.LinkTo(DataflowBlock.NullTarget<FileInfo>(), info => info == null);

        RegisterChild(Copier);
    }

    public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }

    public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }

    protected virtual FileInfo Copy(FileInfo file) {
        try {
            return file.CopyTo(Path.Combine(destination, file.Name));
        } catch(Exception ex) {
            //Log the exception
            //Abandon this unit of work
            //resume processing subsequent units of work
            return null;
        }

    }
}

关于c# - 避免在一个块发生故障时关闭整个数据流网络,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47147341/

10-14 01:54