问题描述
我正在尝试使用tpl创建具有以下形式的数据流:
->LoadDataBlock1->ProcessDataBlock1->GetInputPathsBlock->LoadDataBlock2->ProcessDataBlock2->MergeDataBlock->保存数据块->LoadDataBlock3->ProcessDataBlock3->...->LoadDataBlockN->ProcessDataBlockN->
这个想法是, GetInputPathsBlock
是一个块,它找到要加载的输入数据的路径,然后将该路径发送到每个 LoadDataBlock
.LoadDataBlocks都是相同的(除了它们各自从GetInputPaths中接收到唯一的inputPath字符串).然后将加载的数据发送到 ProcessDataBlock
,后者进行一些简单的处理.然后,来自每个 ProcessDataBlock
的数据将发送到 MergeDataBlock
,合并该数据并将其发送到 SaveDataBlock
,然后将其保存到文件中./p>
将其视为需要每月运行的数据流.首先,找到每天数据的路径.每天的数据都将被加载和处理,然后在整个月中合并在一起并保存.每个月可以并行运行,一个月中每一天的数据可以并行加载和并行处理(在加载单日数据之后),并且当该月的所有内容加载并处理后,就可以合并并保存
我尝试过的事情
据我所知 TransformManyBlock< TInput,string>
可以用来进行拆分( GetInputPathsBlock
),并且可以链接到普通的TransformBlock< string,InputData>
( LoadDataBlock
),然后从那里到另一个 TransformBlock< InputData,ProcessedData>
( ProcessDataBlock
),但我不知道如何将其合并回单个块.
我的目光
我找到了此答案,该答案使用 TransformManyBlock
从 IEnumerable<; item>
转换为 item
,但我对此并不完全了解,也无法链接 TransformBlock< InputData,ProcessedData>
(ProcessDataBlock
) TransformBlock< IEnumerable< ProcessedData>>,ProcessedData>
,所以我不知道如何使用它.
我还看到了类似的答案,它建议使用 JoinBlock
,但是数字输入文件的数量N有所不同,并且文件都以相同的方式加载.
还有此答案,它似乎可以满足我的要求,但我并不完全理解,而且我不知道如何将字典中的设置转移到我的案子中.
如何拆分和合并数据流?
- 我是否缺少块类型
- 我可以以某种方式两次使用
TransformManyBlock
吗? - tpl对拆分/合并有意义吗,还是有一种更简单的异步/等待方式?
我将使用嵌套块来避免拆分每月数据,然后不得不再次合并它们.这是两个嵌套的 TransformBlock
的示例,它们处理2020年的所有日子:
var MonthlyBlock = new TransformBlock< int,List< string>>(异步(month)=>{var dailyBlock =新的TransformBlock< int,字符串>(异步(天)=>{等待Task.Delay(100);//模拟异步工作返回day.ToString();},新的ExecutionDataflowBlockOptions(){MaxDegreeOfParallelism = 4});foreach(Enumerable.Range(1,DateTime.DaysInMonth(2020,month))中的可变日)等待dailyBlock.SendAsync(day);dailyBlock.Complete();var dailyResults =等待dailyBlock.ToListAsync();返回dailyResults;},新的ExecutionDataflowBlockOptions(){MaxDegreeOfParallelism = 1});foreach(Enumerable.Range(1,12)中的var month)等待monthBlock.SendAsync(month);MonthlyBlock.Complete();
为了收集内部区块的日常结果,我使用了扩展方法 ToListAsync
,如下所示:
公共静态异步任务<列表< T>>ToListAsync< T>(此IReceivableSourceBlock< T>块,CancellationToken cancellingToken =默认值){var list = new List< T>();同时(await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false)){一会儿(block.TryReceive(out var item)){list.Add(item);}}await block.Completion.ConfigureAwait(false);//传播可能的异常返回清单;}
I am trying to create a dataflow using tpl with the following form:
-> LoadDataBlock1 -> ProcessDataBlock1 ->
GetInputPathsBlock -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
-> LoadDataBlock3 -> ProcessDataBlock3 ->
...
-> LoadDataBlockN -> ProcessDataBlockN ->
The idea is, that GetInputPathsBlock
is a block, which finds the paths to the input data that is to be loaded, and then sends the path to each LoadDataBlock
. The LoadDataBlocks are all identical (except that they have each recieved a unique inputPath string from GetInputPaths). The loaded data is then sent to the ProcessDataBlock
, which does some simple processing. Then the data from each ProcessDataBlock
is sent to MergeDataBlock
, which merges it and sends it to SaveDataBlock
, which then saves it to a file.
Think of it as a dataflow that needs to run for each month. First the path is found for the data for each day. Each day's data is loaded and processed, and then merged together for the entire month and saved. Each month can be run parallelly, data for each day in a month can be loaded parallelly and processed parallelly (after the individual day data has been loaded), and once everything for the month has been loaded and processed, it can be merged and saved.
What I tried
As far as I can tell TransformManyBlock<TInput,string>
can be used to do the splitting (GetInputPathsBlock
), and can be linked to a normal TransformBlock<string,InputData>
(LoadDataBlock
), and from there to another TransformBlock<InputData,ProcessedData>
(ProcessDataBlock
), but I don't know how to then merge it back to a single block.
What I looked at
I found this answer, which uses TransformManyBlock
to go from an IEnumerable<item>
to item
, but I don't fully understand it, and I can't link a TransformBlock<InputData,ProcessedData>
(ProcessDataBlock
) to aTransformBlock<IEnumerable<ProcessedData>>,ProcessedData>
, so I don't know how to use it.
I have also seen answers like this, which suggests using JoinBlock
, but the number of input files N varies, and the files are all loaded in the same way anyway.
There is also this answer, which seems to do what I want, but I don't fully understand it, and I don't know how the setup with the dictionary would be transferred to my case.
How do I split and merge my dataflow?
- Is there a block type I am missing
- Can I somehow use
TransformManyBlock
twice? - Does tpl make sense for the split/merge or is there a simpler async/await way?
I would use a nested block to avoid splitting my monthly data and then having to merge them again. Here is an example of two nested TransformBlock
s that process all days of the year 2020:
var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
{
var dailyBlock = new TransformBlock<int, string>(async (day) =>
{
await Task.Delay(100); // Simulate async work
return day.ToString();
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });
foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
await dailyBlock.SendAsync(day);
dailyBlock.Complete();
var dailyResults = await dailyBlock.ToListAsync();
return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
foreach (var month in Enumerable.Range(1, 12))
await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();
For collecting the daily results of the inner block I used the extension method ToListAsync
that is shown below:
public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
CancellationToken cancellationToken = default)
{
var list = new List<T>();
while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (block.TryReceive(out var item))
{
list.Add(item);
}
}
await block.Completion.ConfigureAwait(false); // Propagate possible exception
return list;
}
这篇关于如何拆分和合并此数据流管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!