


I am trying to create a dataflow using tpl with the following form:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    -> LoadDataBlockN -> ProcessDataBlockN ->

这个想法是,GetInputPathsBlock是一个块,它找到要加载的输入数据的路径,然后将该路径发送到每个LoadDataBlock. LoadDataBlocks都是相同的(除了它们各自从GetInputPaths中接收到唯一的inputPath字符串).然后将加载的数据发送到ProcessDataBlock,它进行一些简单的处理.然后将每个ProcessDataBlock中的数据发送到MergeDataBlock,然后将其合并并发送到SaveDataBlock,然后将其保存到文件中.

The idea is, that GetInputPathsBlock is a block, which finds the paths to the input data that is to be loaded, and then sends the path to each LoadDataBlock. The LoadDataBlocks are all identical (except that they have each recieved a unique inputPath string from GetInputPaths). The loaded data is then sent to the ProcessDataBlock, which does some simple processing. Then the data from each ProcessDataBlockis sent to MergeDataBlock, which merges it and sends it to SaveDataBlock, which then saves it to a file.


Think of it as a dataflow that needs to run for each month. First the path is found for the data for each day. Each day's data is loaded and processed, and then merged together for the entire month and saved. Each month can be run parallelly, data for each day in a month can be loaded parallelly and processed parallelly (after the individual day data has been loaded), and once everything for the month has been loaded and processed, it can be merged and saved.


据我所知TransformManyBlock<TInput,string>可用于进行拆分(GetInputPathsBlock),并可链接到普通TransformBlock<string,InputData>(LoadDataBlock),并从那里链接到另一个TransformBlock<InputData,ProcessedData>( ProcessDataBlock),但我不知道如何将其合并回单个块.

As far as I can tell TransformManyBlock<TInput,string> can be used to do the splitting (GetInputPathsBlock), and can be linked to a normal TransformBlock<string,InputData> (LoadDataBlock), and from there to another TransformBlock<InputData,ProcessedData> (ProcessDataBlock), but I don't know how to then merge it back to a single block.



I found this answer, which uses TransformManyBlock to go from an IEnumerable<item> to item, but I don't fully understand it, and I can't link a TransformBlock<InputData,ProcessedData> (ProcessDataBlock) to aTransformBlock<IEnumerable<ProcessedData>>,ProcessedData>, so I don't know how to use it.


I have also seen answers like this, which suggests using JoinBlock, but the number of input files N varies, and the files are all loaded in the same way anyway.


There is also this answer, which seems to do what I want, but I don't fully understand it, and I don't know how the setup with the dictionary would be transferred to my case.


  • 我是否缺少块类型
  • 我可以以某种方式两次使用TransformManyBlock吗?
  • tpl对拆分/合并有意义吗,还是有一种更简单的异步/等待方式?
  • Is there a block type I am missing
  • Can I somehow use TransformManyBlock twice?
  • Does tpl make sense for the split/merge or is there a simpler async/await way?



I would use a nested block to avoid splitting my monthly data and then having to merge them again. Here is an example of two nested TransformBlocks that process all days of the year 2020:

var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
    var dailyBlock = new TransformBlock<int, string>(async (day) =>
        await Task.Delay(100); // Simulate async work
        return day.ToString();
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
        await dailyBlock.SendAsync(day);

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

foreach (var month in Enumerable.Range(1, 12))
    await monthlyBlock.SendAsync(month);


For collecting the daily results of the inner block I used the extension method ToListAsync that is shown below:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
        while (block.TryReceive(out var item))
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;


08-20 04:24