问题描述
我需要构建将处理大量消息的TPL数据流管道.因为有很多消息,所以我不能简单地将它们 Post
放入 BufferBlock
的无限队列中,否则我将面临内存问题.所以我想使用 BoundedCapacity = 1
选项禁用队列,并使用 MaxDegreeOfParallelism
进行并行任务处理,因为我的 TransformBlock
可能需要一些时间每条消息.我还使用 PropagateCompletion
完成所有操作,并且无法沿管道传播.
I need to construct TPL dataflow pipeline which will process a lot of messages. Because there are many messages I can not simply Post
them into infinite queue of the BufferBlock
or I will face memory issues. So I want to use BoundedCapacity = 1
option to disable the queue and use MaxDegreeOfParallelism
to use parallel task processing since my TransformBlock
s could take some time for each message. I also use PropagateCompletion
to make all completion and fail to propagate down the pipeline.
但是当第一条消息刚发生错误时,我正面临错误处理的问题:调用 await SendAsync
只是将我的应用切换为无限等待状态.
But I'm facing the issue with error handling when error happened just right after the first message: calling await SendAsync
simply switch my app into infinite waiting.
我已将案例简化为示例控制台应用程序:
I've simplified my case to sample console app:
var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
BoundedCapacity = 1
});
var process_block = new ActionBlock<int>(x =>
{
throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2,
BoundedCapacity = 1
});
data_buffer.LinkTo(process_block,
new DataflowLinkOptions { PropagateCompletion = true });
for (var k = 1; k <= 5; k++)
{
await data_buffer.SendAsync(k);
Console.WriteLine("Send: {0}", k);
}
data_buffer.Complete();
await process_block.Completion;
推荐答案
这是预期的行为.如果出现下游"故障,则该错误不会在网格上向后"传播.网格期望您检测到该故障(例如,通过 process_block.Completion
)并解决该问题.
This is expected behavior. If there's a fault "downstream", the error does not propagate "backwards" up the mesh. The mesh is expecting you to detect that fault (e.g., via process_block.Completion
) and resolve it.
如果您想向后传播错误,则可以在 process_block.Completion
上继续 await
或故障 (如果下游)阻止故障.
If you want to propagate errors backwards, you could have an await
or continuation on process_block.Completion
that faults the upstream block(s) if the downstream block(s) fault.
请注意,这不是唯一可行的解决方案.您可能需要重建网格的该部分或将源链接到替代目标.源块没有故障,因此它们可以继续使用已修复的网格进行处理.
Note that this is not the only possible solution; you may want to rebuild that part of the mesh or link the sources to an alternative target. The source block(s) have not faulted, so they can just continue processing with a repaired mesh.
这篇关于具有限制容量的转换块中的TPL数据流异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!