问题描述
考虑这个例子:
class 程序{私有静态只读 ITargetBlock网格 = 创建网格();private static readonly AsyncLocal;异步本地上下文= new AsyncLocal();静态异步任务 Main(string[] args){var 任务 = Enumerable.Range(1, 4).Select(ProcessMessage);等待 Task.WhenAll(tasks);Mesh.Complete();等待 Mesh.Completion;Console.WriteLine();Console.WriteLine("完成");}私有静态异步任务 ProcessMessage(int number){var param = number.ToString();使用 (SetScopedAsyncLocal(param)){Console.WriteLine($"发送前{param}");等待 Mesh.SendAsync(param);Console.WriteLine($"发送后{param}");}}私有静态 IDisposable SetScopedAsyncLocal(字符串值){AsyncLocalContext.Value = 值;return new Disposer(() => AsyncLocalContext.Value = null);}私有静态 ITargetBlock创建网格(){var blockOptions = 新的 ExecutionDataflowBlockOptions{BoundedCapacity = DataflowBlockOptions.Unbounded,确保有序 = 假,MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded};var block1 = new TransformBlock(异步输入 =>{等待 Task.Yield();Console.WriteLine($" Block1 [线程 {Thread.CurrentThread.ManagedThreadId}]" +$" 输入:{input} - 上下文:{AsyncLocalContext.Value}.");返回输入;}, blockOptions);var block2 = new TransformBlock(异步输入 =>{等待 Task.Yield();Console.WriteLine($" Block2 [线程 {Thread.CurrentThread.ManagedThreadId}]" +$" 输入:{input} - 上下文:{AsyncLocalContext.Value}.");返回输入;}, blockOptions);var block3 = new ActionBlock(异步输入 =>{等待 Task.Yield();Console.WriteLine($" Block3 [线程 {Thread.CurrentThread.ManagedThreadId}]" +$" 输入:{input} - 上下文:{AsyncLocalContext.Value}.");}, blockOptions);var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};block1.LinkTo(block2, linkOptions);block2.LinkTo(block3, linkOptions);return new EncapsulatedActionBlock(block1, block3.Completion);}}内部类 EncapsulatedActionBlock:ITargetBlock<T>{私有只读 ITargetBlock_包裹;public EncapsulatedActionBlock(ITargetBlock<T>wrapped, Task completion){_wrapped = 包裹;完成=完成;}public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,T messageValue, ISourceBlock;源,布尔消耗接受)=>_wrapped.OfferMessage(messageHeader, messageValue, source, consumerToAccept);public void Complete() =>_wrapped.Complete();公共无效故障(异常异常)=>_wrapped.Fault(异常);公共任务完成{得到;}}内部类 Disposer : IDisposable{私有只读操作_disposeAction;公共处置器(行动处置行动){_disposeAction = 处置动作??throw new ArgumentNullException(nameof(disposeAction));}公共无效处置(){_disposeAction();}}
执行的结果将类似于:
发送前 1发送后 1发送前 2发送后 2发送前 3发送后 3发送前 4发送后 4Block1 [线程 9] 输入:3 - 上下文:3.Block1 [线程 10] 输入:2 - 上下文:1.Block1 [线程 8] 输入:4 - 上下文:4.Block1 [线程 11] 输入:1 - 上下文:2.Block2 [线程 9] 输入:2 - 上下文:3.Block2 [线程 7] 输入:1 - 上下文:2.Block2 [线程 10] 输入:3 - 上下文:3.Block2 [线程 8] 输入:4 - 上下文:4.Block3 [线程 11] 输入:4 - 上下文:4.Block3 [线程 7] 输入:1 - 上下文:2.Block3 [线程 9] 输入:3 - 上下文:3.Block3 [线程 4] 输入:2 - 上下文:3.完毕如您所见,在移动到第二个 TDF 块后,传递的上下文值和存储的值并不总是相同的.这种行为会破坏多个 Logging 框架的 LogContext 功能使用.
- 这是预期行为吗(请解释原因)?
- TPL 数据流是否以某种方式弄乱了执行上下文?
要了解发生了什么,您必须了解 Dataflow 块的工作原理.它们内部没有阻塞线程,等待消息到达.处理由工作任务完成.让我们考虑 MaxDegreeOfParallelism = 1
的简单(和默认)情况.最初有零个工作任务.当使用 SendAsync,发布消息的相同任务成为工作任务并开始处理消息.如果在处理第一条消息时发布另一条消息,则会发生其他事情.第二条消息将被排入块的输入队列中,并且发布它的任务将完成.第二条消息将由处理第一条消息的工作任务处理.只要队列中有消息入队,初始工作任务就会挑选它们并一一处理.如果某个时刻没有更多缓冲消息,则工作任务将完成,并且块将以其初始状态返回(零工作任务).下一个 SendAsync
将成为新的工作任务,依此类推.使用 MaxDegreeOfParallelism = 1
,在任何给定时刻只能存在一个工作任务.
让我们用一个例子来证明这一点.下面是一个 ActionBlock
以延迟 X 馈送,并以延迟 Y 处理每条消息.
private static void ActionBlockTest(int sendDelay, int processDelay){Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");var asyncLocal = new AsyncLocal();var actionBlock = new ActionBlock(async i =>{等待 Task.Delay(processDelay);Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");});Task.Run(async() =>{foreach (var i in Enumerable.Range(1, 5)){asyncLocal.Value = i;等待 actionBlock.SendAsync(i);等待 Task.Delay(sendDelay);}}).等待();actionBlock.Complete();actionBlock.Completion.Wait();}
让我们看看如果我们快速发送消息并缓慢处理它们会发生什么:
ActionBlockTest(100, 200);//.NET 核心 3.0
发送延迟:100,处理延迟:200
已处理 1,上下文:1
已处理 2,上下文:1
已处理 3,上下文:1
已处理 4,上下文:1
已处理 5,上下文:1
AsyncLocal
上下文保持不变,因为同一个工作任务处理了所有消息.
现在让我们缓慢发送消息并快速处理它们:
ActionBlockTest(200, 100);//.NET 核心 3.0
发送延迟:200,处理延迟:100
已处理 1,上下文:1
已处理 2,上下文:2
已处理 3,上下文:3
已处理 4,上下文:4
已处理 5,上下文:5
每条消息的 AsyncLocal
上下文都不同,因为每条消息都由不同的工作任务处理.
这个故事的道德教训是,每个 SendAsync
都不能保证创建一个跟随消息的异步工作流,直到它的旅程结束,直到管道结束.所以 AsyncLocal
类不能用于保存每条消息的环境数据.
Consider this example:
class Program
{
private static readonly ITargetBlock<string> Mesh = CreateMesh();
private static readonly AsyncLocal<string> AsyncLocalContext
= new AsyncLocal<string>();
static async Task Main(string[] args)
{
var tasks = Enumerable.Range(1, 4)
.Select(ProcessMessage);
await Task.WhenAll(tasks);
Mesh.Complete();
await Mesh.Completion;
Console.WriteLine();
Console.WriteLine("Done");
}
private static async Task ProcessMessage(int number)
{
var param = number.ToString();
using (SetScopedAsyncLocal(param))
{
Console.WriteLine($"Before send {param}");
await Mesh.SendAsync(param);
Console.WriteLine($"After send {param}");
}
}
private static IDisposable SetScopedAsyncLocal(string value)
{
AsyncLocalContext.Value = value;
return new Disposer(() => AsyncLocalContext.Value = null);
}
private static ITargetBlock<string> CreateMesh()
{
var blockOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = DataflowBlockOptions.Unbounded,
EnsureOrdered = false,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
};
var block1 = new TransformBlock<string, string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block1 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");
return input;
}, blockOptions);
var block2 = new TransformBlock<string, string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block2 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");
return input;
}, blockOptions);
var block3 = new ActionBlock<string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block3 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");
}, blockOptions);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
block1.LinkTo(block2, linkOptions);
block2.LinkTo(block3, linkOptions);
return new EncapsulatedActionBlock<string>(block1, block3.Completion);
}
}
internal class EncapsulatedActionBlock<T> : ITargetBlock<T>
{
private readonly ITargetBlock<T> _wrapped;
public EncapsulatedActionBlock(ITargetBlock<T> wrapped, Task completion)
{
_wrapped = wrapped;
Completion = completion;
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept) =>
_wrapped.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
public void Complete() => _wrapped.Complete();
public void Fault(Exception exception) => _wrapped.Fault(exception);
public Task Completion { get; }
}
internal class Disposer : IDisposable
{
private readonly Action _disposeAction;
public Disposer(Action disposeAction)
{
_disposeAction = disposeAction
?? throw new ArgumentNullException(nameof(disposeAction));
}
public void Dispose()
{
_disposeAction();
}
}
The result of the execution will be something like:
Before send 1 After send 1 Before send 2 After send 2 Before send 3 After send 3 Before send 4 After send 4 Block1 [thread 9] Input: 3 - Context: 3. Block1 [thread 10] Input: 2 - Context: 1. Block1 [thread 8] Input: 4 - Context: 4. Block1 [thread 11] Input: 1 - Context: 2. Block2 [thread 9] Input: 2 - Context: 3. Block2 [thread 7] Input: 1 - Context: 2. Block2 [thread 10] Input: 3 - Context: 3. Block2 [thread 8] Input: 4 - Context: 4. Block3 [thread 11] Input: 4 - Context: 4. Block3 [thread 7] Input: 1 - Context: 2. Block3 [thread 9] Input: 3 - Context: 3. Block3 [thread 4] Input: 2 - Context: 3. Done
As you can see the passed context value and stored one's are not always the same after moving to second TDF block. This behavior screws up multiple Logging frameworks' LogContext feature usages.
- Is it an expected behavior (please explain why)?
- Does the TPL Dataflow messes up the execution context somehow?
To understand what's going on you must understand how Dataflow blocks work. There are no blocked threads inside them, waiting for messages to arrive. The processing is done by worker tasks. Lets consider the simple (and default) case of MaxDegreeOfParallelism = 1
. Initially there are zero worker tasks. When a message is posted asynchronously with SendAsync
, the same task that posted the message becomes a worker task and starts processing the message. If another message is posted while the first is processed, something else will happen. The second message will be enqueued in the block's input queue, and the task that posted it will complete. The second message will be processed by the worker-task that processed the first message. As long as there are messages enqueued in the queue, the initial worker task will pick them and process them one by one. If at some moment there are no more buffered messages, the worker task will complete, and the block will return in it's initial state (zero worker tasks). The next SendAsync
will become the new worker task, and so on. With MaxDegreeOfParallelism = 1
, only one worker task can exist at any given moment.
Lets demonstrate this with an example. Below is an ActionBlock
that is feeded with delay X, and processes each message with delay Y.
private static void ActionBlockTest(int sendDelay, int processDelay)
{
Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");
var asyncLocal = new AsyncLocal<int>();
var actionBlock = new ActionBlock<int>(async i =>
{
await Task.Delay(processDelay);
Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");
});
Task.Run(async () =>
{
foreach (var i in Enumerable.Range(1, 5))
{
asyncLocal.Value = i;
await actionBlock.SendAsync(i);
await Task.Delay(sendDelay);
}
}).Wait();
actionBlock.Complete();
actionBlock.Completion.Wait();
}
Lets see what happens if we send the messages fast and process them slowly:
ActionBlockTest(100, 200); // .NET Core 3.0
The AsyncLocal
context remained the same, because the same worker task processed all the messages.
Now lets send the messages slowly and process them fast:
ActionBlockTest(200, 100); // .NET Core 3.0
The AsyncLocal
context is different for each message, because each message was processed by a different worker task.
The moral lesson of this story is that each SendAsync
does not guarantee the creation of a single asynchronous workflow that follows the message until the end of its journey, to the end of the pipeline. So the AsyncLocal
class cannot be used to hold ambient data for each message.
这篇关于AsyncLocal 值与 TPL 数据流不正确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!