问题描述
我想设置一个 TransformBlock
来并行处理它的项目.因此,我将 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
设置为 > 1.我不关心消息的顺序,但是 文档说:
I want to set up a TransformBlock
that processes its item in parallel. Thus, I'm setting ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
to > 1. I don't care about the order of the messages but the documentation says:
当您指定的最大并行度大于 1 时,会同时处理多条消息,因此,消息可能不会按照接收顺序进行处理.但是,从块中输出消息的顺序将是正确排序的.
正确排序"是否意味着如果队列中有一条消息需要较长的处理时间,则直到处理完这条消息后才会输出其他消息?
Does "correctly ordered" mean that if there is one message in the queue that needs long processing time, further messages are not output until this one message is processed?
如果是这样,我如何指定一个不关心排序的执行块(例如 TransformBlock
)?还是必须在消费端注明不在乎点单?
And if so, how can I specify an Execution Block (for example a TransformBlock
) that does not care about the ordering? Or do I have to specify at the consumption end that I don't care about ordering?
推荐答案
库中没有这样的块,但您可以通过组合一个 ActionBlock
和一个 BufferBlock 轻松地自己创建一个代码>.类似的东西:
There is no such block in the library, but you can easily create one yourself by combining an ActionBlock
and a BufferBlock
. Something like:
public static IPropagatorBlock<TInput, TOutput>
CreateUnorderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
var buffer = new BufferBlock<TOutput>(options);
var action = new ActionBlock<TInput>(
async input =>
{
var output = func(input);
await buffer.SendAsync(output);
}, options);
action.Completion.ContinueWith(
t =>
{
IDataflowBlock castedBuffer = buffer;
if (t.IsFaulted)
{
castedBuffer.Fault(t.Exception);
}
else if (t.IsCanceled)
{
// do nothing: both blocks share options,
// which means they also share CancellationToken
}
else
{
castedBuffer.Complete();
}
});
return DataflowBlock.Encapsulate(action, buffer);
}
这样,一旦一个项目被 ActionBlock
处理,它就会立即被移动到 BufferBlock
,这意味着不保持排序.
This way, once an item is processed by the ActionBlock
, it's immediately moved to the BufferBlock
, which means ordering is not maintained.
这段代码的一个问题是它没有很好地观察 BoundedCapacity
集合:实际上,这个块的容量是选项中设置的容量的两倍(因为两个块中的每一个都有单独的容量).
One issue with this code is that it doesn't observe the set BoundedCapacity
well: in effect, the capacity of this block is twice the capacity set in options (because each of the two blocks has a separate capacity).
这篇关于如何使用 TPL 数据流库指定无序执行块?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!