我有这个密码:
var data = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });
var action = new ActionBlock<int>(async id =>
{
Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);
await Task.Delay(1000);
Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 1,
MaxDegreeOfParallelism = -1
});
data.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });
for (var id = 1; id <= 3; id++)
{
Console.WriteLine("[{0:T}] Sending {1}", DateTime.Now, id);
data.SendAsync(id).Wait();
Console.WriteLine("[{0:T}] Sending {1} complete", DateTime.Now, id);
}
data.Complete();
Task.WhenAll(data.Completion, action.Completion).Wait();
这段代码给了我这个输出:
[22:31:22] Sending 1
[22:31:22] Sending 1 complete
[22:31:22] Sending 2
[22:31:22] #1: Start
[22:31:22] Sending 2 complete
[22:31:22] Sending 3
[22:31:23] #1: End
[22:31:23] #2: Start
[22:31:23] Sending 3 complete
[22:31:24] #2: End
[22:31:24] #3: Start
[22:31:25] #3: End
为什么
ActionBlock
不并行工作,即使它有一个无限的dop? 最佳答案
您的ActionBlock
的并行度似乎有限,原因是它的BoundedCapacity
为1。BoundedCapacity
(与InputCount
不同)包括当前正在处理的项。这很容易证明:
var block = new ActionBlock<int>(_ => Task.Delay(-1), new ExecutionDataflowBlockOptions
{
BoundedCapacity = 1,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
await block.SendAsync(4); // Adds a new item
await block.SendAsync(4); // Blocks forever
这意味着,当设置
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
时,块不能同时接受多个项,因此实际上限制了并行度。您可以设置一个更大的
BoundedCapacity
,来解决这个问题:var action = new ActionBlock<int>(async id =>
{
Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);
await Task.Delay(1000);
Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});