MaxDegreeOfParallelism

MaxDegreeOfParallelism

本文介绍了AsyncLocal值与TPL数据流不正确的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请考虑以下示例:

class Program

{
    private static readonly ITargetBlock<string> Mesh = CreateMesh();
    private static readonly AsyncLocal<string> AsyncLocalContext
        = new AsyncLocal<string>();

    static async Task Main(string[] args)
    {
        var tasks = Enumerable.Range(1, 4)
            .Select(ProcessMessage);
        await Task.WhenAll(tasks);

        Mesh.Complete();
        await Mesh.Completion;

        Console.WriteLine();
        Console.WriteLine("Done");
    }

    private static async Task ProcessMessage(int number)
    {
        var param = number.ToString();
        using (SetScopedAsyncLocal(param))
        {
            Console.WriteLine($"Before send {param}");
            await Mesh.SendAsync(param);
            Console.WriteLine($"After send {param}");
        }
    }

    private static IDisposable SetScopedAsyncLocal(string value)
    {
        AsyncLocalContext.Value = value;

        return new Disposer(() => AsyncLocalContext.Value = null);
    }

    private static ITargetBlock<string> CreateMesh()
    {
        var blockOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = DataflowBlockOptions.Unbounded,
            EnsureOrdered = false,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        };

        var block1 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block1 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block2 = new TransformBlock<string, string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block2 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");

            return input;
        }, blockOptions);

        var block3 = new ActionBlock<string>(async input =>
        {
            await Task.Yield();
            Console.WriteLine(
                $"   Block3 [thread {Thread.CurrentThread.ManagedThreadId}]" +
                $" Input: {input} - Context: {AsyncLocalContext.Value}.");
        }, blockOptions);

        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};

        block1.LinkTo(block2, linkOptions);
        block2.LinkTo(block3, linkOptions);

        return new EncapsulatedActionBlock<string>(block1, block3.Completion);
    }
}

internal class EncapsulatedActionBlock<T> : ITargetBlock<T>
{
    private readonly ITargetBlock<T> _wrapped;

    public EncapsulatedActionBlock(ITargetBlock<T> wrapped, Task completion)
    {
        _wrapped = wrapped;
        Completion = completion;
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        T messageValue, ISourceBlock<T> source, bool consumeToAccept) =>
        _wrapped.OfferMessage(messageHeader, messageValue, source, consumeToAccept);

    public void Complete() => _wrapped.Complete();

    public void Fault(Exception exception) => _wrapped.Fault(exception);

    public Task Completion { get; }
}

internal class Disposer : IDisposable
{
    private readonly Action _disposeAction;

    public Disposer(Action disposeAction)
    {
        _disposeAction = disposeAction
            ?? throw new ArgumentNullException(nameof(disposeAction));
    }

    public void Dispose()
    {
        _disposeAction();
    }
}

执行的结果将类似于:


Before send 1
After send 1
Before send 2
After send 2
Before send 3
After send 3
Before send 4
After send 4
   Block1 [thread 9] Input: 3 - Context: 3.
   Block1 [thread 10] Input: 2 - Context: 1.
   Block1 [thread 8] Input: 4 - Context: 4.
   Block1 [thread 11] Input: 1 - Context: 2.
   Block2 [thread 9] Input: 2 - Context: 3.
   Block2 [thread 7] Input: 1 - Context: 2.
   Block2 [thread 10] Input: 3 - Context: 3.
   Block2 [thread 8] Input: 4 - Context: 4.
   Block3 [thread 11] Input: 4 - Context: 4.
   Block3 [thread 7] Input: 1 - Context: 2.
   Block3 [thread 9] Input: 3 - Context: 3.
   Block3 [thread 4] Input: 2 - Context: 3.

Done

如您所见,传递到第二个TDF块后,传递的上下文值和存储的上下文值并不总是相同的.此行为搞砸了多个Logging框架的LogContext功能用法.

As you can see the passed context value and stored one's are not always the same after moving to second TDF block. This behavior screws up multiple Logging frameworks' LogContext feature usages.

  1. 这是一种预期的行为(请解释原因)吗?
  2. TPL数据流是否以某种方式弄乱了执行上下文?

推荐答案

要了解正在发生的事情,您必须了解Dataflow块如何工作.它们内部没有阻塞的线程,等待消息到达.该处理由工作人员任务完成.让我们考虑 MaxDegreeOfParallelism = 1 的简单(默认)情况.最初有零个工作任务.使用 SendAsync ,与发布消息相同的任务将成为工作程序任务,并开始处理消息.如果在处理第一条消息时发布了另一条消息,则会发生其他情况.第二条消息将排队在块的输入队列中,并且将其发布的任务完成.第二条消息将由处理第一条消息的工作程序任务处理.只要队列中有消息排队,初始工作程序任务就会选择它们并逐一处理它们.如果在某个时刻没有更多的缓冲消息,则工作任务将完成,并且该块将返回其初始状态(零工作任务).下一个 SendAsync 将成为新的辅助任务,依此类推.如果 MaxDegreeOfParallelism = 1 ,则在任何给定时刻只能存在一个辅助任务.

To understand what's going on you must understand how Dataflow blocks work. There are no blocked threads inside them, waiting for messages to arrive. The processing is done by worker tasks. Lets consider the simple (and default) case of MaxDegreeOfParallelism = 1. Initially there are zero worker tasks. When a message is posted asynchronously with SendAsync, the same task that posted the message becomes a worker task and starts processing the message. If another message is posted while the first is processed, something else will happen. The second message will be enqueued in the block's input queue, and the task that posted it will complete. The second message will be processed by the worker-task that processed the first message. As long as there are messages enqueued in the queue, the initial worker task will pick them and process them one by one. If at some moment there are no more buffered messages, the worker task will complete, and the block will return in it's initial state (zero worker tasks). The next SendAsync will become the new worker task, and so on. With MaxDegreeOfParallelism = 1, only one worker task can exist at any given moment.

让我们用一个例子来说明这一点.以下是 ActionBlock ,它以延迟X馈入,并处理每个延迟为Y的消息.

Lets demonstrate this with an example. Below is an ActionBlock that is feeded with delay X, and processes each message with delay Y.

private static void ActionBlockTest(int sendDelay, int processDelay)
{
    Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");
    var asyncLocal = new AsyncLocal<int>();
    var actionBlock = new ActionBlock<int>(async i =>
    {
        await Task.Delay(processDelay);
        Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");
    });
    Task.Run(async () =>
    {
        foreach (var i in Enumerable.Range(1, 5))
        {
            asyncLocal.Value = i;
            await actionBlock.SendAsync(i);
            await Task.Delay(sendDelay);
        }
    }).Wait();
    actionBlock.Complete();
    actionBlock.Completion.Wait();
}

让我们看看如果我们快速发送消息并缓慢处理它们会发生什么情况:

Lets see what happens if we send the messages fast and process them slowly:

ActionBlockTest(100, 200); // .NET Core 3.0

AsyncLocal 上下文保持不变,因为相同的工作程序任务处理了所有消息.

The AsyncLocal context remained the same, because the same worker task processed all the messages.

现在,让我们缓慢地发送消息并快速处理它们:

Now lets send the messages slowly and process them fast:

ActionBlockTest(200, 100); // .NET Core 3.0

每条消息的 AsyncLocal 上下文都是不同的,因为每条消息都是由不同的工作程序任务处理的.

The AsyncLocal context is different for each message, because each message was processed by a different worker task.

这个故事的道德教训是,每个 SendAsync 都不能保证在消息行进到管道结束之前,都遵循消息创建单个异步工作流.因此, AsyncLocal 类不能用于保存每条消息的环境数据.

The moral lesson of this story is that each SendAsync does not guarantee the creation of a single asynchronous workflow that follows the message until the end of its journey, to the end of the pipeline. So the AsyncLocal class cannot be used to hold ambient data for each message.

这篇关于AsyncLocal值与TPL数据流不正确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 00:42