请参阅以下问题以获取背景信息:

How do Tasks in the Task Parallel Library affect ActivityID?

该问题询问Tasks如何影响Trace.CorrelationManager.ActivityId。 @Greg Samson用一个测试程序回答了他自己的问题,该程序表明ActivityId在Tasks上下文中是可靠的。测试程序在Task委托(delegate)的开头设置一个ActivityId,休眠以模拟工作,然后在末尾检查ActivityId以确保它具有相同的值(即,尚未被另一个线程修改)。该程序成功运行。

在研究线程,任务和并行操作的其他“上下文”选项(最终为日志提供更好的上下文)时,我遇到了Trace.CorrelationManager.LogicalOperationStack的一个奇怪问题(无论如何我还是很奇怪)。我已将我的“答案”复制到下面的他的问题中。

我认为它充分描述了我遇到的问题(在Parallel.For上下文中使用时,Trace.CorrelationManager.LogicalOperationStack显然已损坏-或某些东西,但前提是Parallel.For本身包含在逻辑操作中) 。

这是我的问题:

  • Trace.CorrelationManager.LogicalOperationStack是否可以与Parallel.For一起使用?如果是这样,如果Parallel.For已启动逻辑操作,是否应该有所不同?
  • 是否存在将LogicalOperationStack与Parallel.For一起使用的“正确”方法?我可以对该示例程序进行不同的编码,以使其“正常运行”吗? “工作”是指LogicalOperationStack始终具有期望的条目数,而条目本身就是期望的条目。

  • 我已经使用Threads和ThreadPool线程进行了一些其他测试,但是我将不得不返回并重试这些测试,以查看是否遇到类似的问题。

    我要说的是,任务/并行线程和ThreadPool线程确实从父线程“继承”了Trace.CorrelationManager.ActivityId和Trace.CorrelationManager.LogicalOperationStack值。这是预期的,因为这些值是由CorrelationManager使用CallContext的LogicalSetData方法(与SetData相反)存储的。

    再次,请返回该问题以获取我在下面发布的“答案”的原始上下文:

    How do Tasks in the Task Parallel Library affect ActivityID?

    另请参阅Microsoft的Parallel Extensions论坛上的类似问题(到目前为止尚未回答):

    http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

    [BEGIN PASTE]

    请原谅我将其发布为答案,因为它实际上并不是您的问题的答案,但是,它与您的问题有关,因为它涉及CorrelationManager行为和线程/任务/等。我一直在研究使用CorrelationManager的LogicalOperationStack(和StartLogicalOperation/StopLogicalOperation方法)在多线程方案中提供其他上下文。

    我以您的示例为例,对其进行了少许修改,以增加使用Parallel.For并行执行工作的功能。另外,我使用StartLogicalOperation/StopLogicalOperation括起(内部)DoLongRunningWork。从概念上讲,DoLongRunningWork每次执行时都会执行以下操作:
    DoLongRunningWork
      StartLogicalOperation
      Thread.Sleep(3000)
      StopLogicalOperation
    

    我发现,如果我将这些逻辑操作添加到您的代码中(或多或少按原样),则所有逻辑操作均保持同步(始终在堆栈上的预期操作数和堆栈上的操作值始终保持不变)预期的)。

    在我自己的一些测试中,我发现情况并非总是如此。逻辑操作堆栈已“损坏”。我能想到的最好的解释是,当“子”线程退出时,CallContext信息“合并”回“父”线程上下文中导致“旧”子线程上下文信息(逻辑运算)为“由另一个同级子线程继承”。

    该问题可能还与以下事实有关:Parallel.For显然使用主线程(至少在示例代码中已编写)作为“工作线程”之一(或在并行域中应调用的任何线程)。每当执行DoLongRunningWork时,都会(在开始时)启动新逻辑操作,并在(结束时)停止新逻辑操作(即,将其插入LogicalOperationStack并从中弹出)。如果主线程已具有有效的逻辑操作,并且DoLongRunningWork在主线程上执行,则将启动新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作。 DoLongRunningWork的任何后续执行(只要DoLongRunningWork的“迭代”在主线程上执行)将(显然)继承主线程的LogicalOperationStack(现在对其具有两个操作,而不仅仅是一个预期的操作)。

    我花了很长时间才弄清楚为什么我的示例中的LogicalOperationStack的行为与我的示例的修改版中的行为有所不同。最终,我发现在我的代码中,我将整个程序括在一个逻辑操作中,而在我的测试程序的修改版中却没有。这意味着在我的测试程序中,每次执行“工作”(类似于DoLongRunningWork)时,已经有一个逻辑操作生效。在我的测试程序的修改版本中,我没有将整个程序放在一个逻辑操作中。

    因此,当我修改测试程序以将整个程序放在一个逻辑操作中并且如果我使用Parallel.For时,我遇到了完全相同的问题。

    使用上面的概念模型,它将成功运行:
    Parallel.For
      DoLongRunningWork
        StartLogicalOperation
        Sleep(3000)
        StopLogicalOperation
    

    尽管由于逻辑LogicalOperationStack显然不同步,这最终会断言:
    StartLogicalOperation
    Parallel.For
      DoLongRunningWork
        StartLogicalOperation
        Sleep(3000)
        StopLogicalOperation
    StopLogicalOperation
    

    这是我的示例程序。它与您的相似之处在于它具有可操纵ActivityId和LogicalOperationStack的DoLongRunningWork方法。我还有两种DoLongRunningWork踢法。一种使用任务,一种使用Parallel.For。还可以执行每种类型,以便将整个并行化操作包含在逻辑操作中或不包含在逻辑操作中。因此,共有四种执行并行操作的方式。要尝试每种方法,只需取消注释所需的“Use ...”方法,重新编译并运行即可。 UseTasksUseTasks(true)UseParallelFor应该全部运行完成。 UseParallelFor(true)将在某个时候断言,因为LogicalOperationStack没有预期的条目数。
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace CorrelationManagerParallelTest
    {
      class Program
      {
        static void Main(string[] args)
        {
          //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
          //number of entries, all others will run to completion.
    
          UseTasks(); //Equivalent to original test program with only the parallelized
                          //operation bracketed in logical operation.
          ////UseTasks(true); //Bracket entire UseTasks method in logical operation
          ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                                 //rather than Tasks.  Bracket only the parallelized
                                 //operation in logical operation.
          ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
        }
    
        private static List<int> threadIds = new List<int>();
        private static object locker = new object();
    
        private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
    
        private static int mainThreadUsedInDelegate = 0;
    
        // baseCount is the expected number of entries in the LogicalOperationStack
        // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
        // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
        // it will be 0.
        private static void DoLongRunningWork(int baseCount)
        {
          lock (locker)
          {
            //Keep a record of the managed thread used.
            if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
              threadIds.Add(Thread.CurrentThread.ManagedThreadId);
    
            if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
            {
              mainThreadUsedInDelegate++;
            }
          }
    
          Guid lo1 = Guid.NewGuid();
          Trace.CorrelationManager.StartLogicalOperation(lo1);
    
          Guid g1 = Guid.NewGuid();
          Trace.CorrelationManager.ActivityId = g1;
    
          Thread.Sleep(3000);
    
          Guid g2 = Trace.CorrelationManager.ActivityId;
          Debug.Assert(g1.Equals(g2));
    
          //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
          //in effect when the Parallel.For operation was started.
          Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
          Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
    
          Trace.CorrelationManager.StopLogicalOperation();
        }
    
        private static void UseTasks(bool encloseInLogicalOperation = false)
        {
          int totalThreads = 100;
          TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
          Task task = null;
          Stopwatch stopwatch = new Stopwatch();
          stopwatch.Start();
    
          if (encloseInLogicalOperation)
          {
            Trace.CorrelationManager.StartLogicalOperation();
          }
    
          Task[] allTasks = new Task[totalThreads];
          for (int i = 0; i < totalThreads; i++)
          {
            task = Task.Factory.StartNew(() =>
            {
              DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
            }, taskCreationOpt);
            allTasks[i] = task;
          }
          Task.WaitAll(allTasks);
    
          if (encloseInLogicalOperation)
          {
            Trace.CorrelationManager.StopLogicalOperation();
          }
    
          stopwatch.Stop();
          Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
          Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
          Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
    
          Console.ReadKey();
        }
    
        private static void UseParallelFor(bool encloseInLogicalOperation = false)
        {
          int totalThreads = 100;
          Stopwatch stopwatch = new Stopwatch();
          stopwatch.Start();
    
          if (encloseInLogicalOperation)
          {
            Trace.CorrelationManager.StartLogicalOperation();
          }
    
          Parallel.For(0, totalThreads, i =>
          {
            DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
          });
    
          if (encloseInLogicalOperation)
          {
            Trace.CorrelationManager.StopLogicalOperation();
          }
    
          stopwatch.Stop();
          Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
          Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
          Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
    
          Console.ReadKey();
        }
    
      }
    }
    

    LogicalOperationStack是否可以与Parallel.For(和/或其他线程/任务构造)一起使用或如何使用的整个问题可能有其自身的问题。也许我会发一个问题。同时,我想知道您对此是否有任何想法(或者,我想知道您是否考虑过使用LogicalOperationStack,因为ActivityId看起来很安全)。

    [END PASTE]

    有人对这个问题有什么想法吗?

    最佳答案

    [开始更新]

    我还在Microsoft's Parallel Extensions for .Net support forum上问了这个问题,并最终收到了answer from Stephen Toub。事实证明,有一个bug in the LogicalCallContext导致LogicalOperationStack损坏。还有一个很好的描述(在斯蒂芬对我对他的回答的答复中),简要介绍了Parallel.For如何分发任务,以及使Parallel.For容易受到该错误影响的原因。

    在下面的回答中,我推测LogicalOperationStack与Parallel.For不兼容,因为Parallel.For使用主线程作为“ worker ”线程之一。根据斯蒂芬的解释,我的猜测是不正确的。 Parallel.For确实将主线程用作“ worker ”线程之一,但并非简单地按原样使用它。第一个Task在主线程上运行,但其运行方式就像在新线程上运行一样。阅读Stephen的描述以获取更多信息。

    [结束更新]

    据我所知,答案如下:

    ActivityId和LogicalOperationStack都通过CallContext.LogicalSetData存储。这意味着这些值将“流”到任何“子”线程。这样做非常酷,例如,您可以在多线程服务器的入口点设置ActivityId(例如,服务调用),并且最终从该入口点启动的所有线程都可以属于同一“事件”。同样,逻辑操作(通过LogicalOperationStack)也流到子线程。

    关于Trace.CorrelationManager.ActivityId:

    ActivityId似乎与我测试过的所有线程模型兼容:直接使用线程,使用ThreadPool,使用任务,使用Parallel。*。在所有情况下,ActivityId均具有预期值。

    关于Trace.CorrelationManager.LogicalOperationStack:

    LogicalOperationStack似乎与大多数线程模型兼容,但与Parallel。*不兼容。直接使用线程,ThreadPool和Tasks,LogicalOperationStack(在我的问题中提供的示例代码中进行了操作)可以保持其完整性。在任何时候,LogicalOperationStack的内容都是预期的。

    LogicalOperationStack与Parallel.For不兼容。如果逻辑操作“有效”,即在启动Parallel。*操作之前调用了CorrelationManager.StartLogicalOperation,然后在Paralle。*的上下文中(即在委托(delegate)中)启动了新的逻辑操作。 ,则LogicalOperationStack将被破坏。 (我应该说它可能会损坏。Parallel。*可能不会创建任何其他线程,这意味着LogicalOperationStack是安全的)。

    问题源于以下事实:Parallel。*使用主线程(或更正确地说,是开始并行操作的线程)作为其“工作”线程之一。这意味着,当在与“主”线程相同的“ worker ”线程中启动和停止“逻辑操作”时,将修改“主”线程的LogicalOperationStack。即使调用代码(即委托(delegate))正确维护了堆栈(确保每个StartLogicalOperation被相应的StopLogicalOperation“停止”),“主”线程堆栈也会被修改。最终(无论如何,对我而言)看来,“主”线程的LogicalOperationStack实际上是由两个不同的“逻辑”线程修改的:“主”线程和“ worker ”线程,两者恰好是相同的线。

    我不知道为什么它不起作用的内在细节(至少如我所期望的那样)。我最好的猜测是,每次在线程(与主线程不同)上执行委托(delegate)时,该线程都会“继承”主线程LogicalOperationStack的当前状态。如果委托(delegate)当前正在主线程上执行(被重用为工作线程),并且已开始逻辑操作,则另一个(或多个)并行化委托(delegate)将“继承”主线程的LogicalOperationStack,该对象现在具有一个(或多个)新的逻辑操作生效!

    FWIW,我实现了(主要用于测试,目前我实际上并未在使用它)以下“逻辑堆栈”来模仿LogicalOperationStack,但是这样做的方式使其可以与Parallel一起使用。*随意尝试它和/或使用它。要进行测试,请替换对

    Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation
    

    在我的原始问题的示例代码中,调用了
    LogicalOperation.OperationStack.Push()/Pop().
    
    
    //OperationStack.cs
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    
    using System.Runtime.Remoting.Messaging;
    
    namespace LogicalOperation
    {
      public static class OperationStack
      {
        private const string OperationStackSlot = "OperationStackSlot";
    
        public static IDisposable Push(string operation)
        {
          OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
          OperationStackItem op = new OperationStackItem(parent, operation);
          CallContext.LogicalSetData(OperationStackSlot, op);
          return op;
        }
    
        public static object Pop()
        {
          OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
    
          if (current != null)
          {
            CallContext.LogicalSetData(OperationStackSlot, current.Parent);
            return current.Operation;
          }
          else
          {
            CallContext.FreeNamedDataSlot(OperationStackSlot);
          }
          return null;
        }
    
        public static object Peek()
        {
          OperationStackItem top = Top();
          return top != null ? top.Operation : null;
        }
    
        internal static OperationStackItem Top()
        {
          OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
          return top;
        }
    
        public static IEnumerable<object> Operations()
        {
          OperationStackItem current = Top();
          while (current != null)
          {
            yield return current.Operation;
            current = current.Parent;
          }
        }
    
        public static int Count
        {
          get
          {
            OperationStackItem top = Top();
            return top == null ? 0 : top.Depth;
          }
        }
    
        public static IEnumerable<string> OperationStrings()
        {
          foreach (object o in Operations())
          {
            yield return o.ToString();
          }
        }
      }
    }
    
    
    //OperationStackItem.cs
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    
    namespace LogicalOperation
    {
      public class OperationStackItem : IDisposable
      {
        private OperationStackItem parent = null;
        private object operation;
        private int depth;
        private bool disposed = false;
    
        internal OperationStackItem(OperationStackItem parentOperation, object operation)
        {
          parent = parentOperation;
          this.operation = operation;
          depth = parent == null ? 1 : parent.Depth + 1;
        }
    
        internal object Operation { get { return operation; } }
        internal int Depth { get { return depth; } }
    
        internal OperationStackItem Parent { get { return parent; } }
    
        public override string ToString()
        {
          return operation != null ? operation.ToString() : "";
        }
    
        #region IDisposable Members
    
        public void Dispose()
        {
          if (disposed) return;
    
          OperationStack.Pop();
    
          disposed = true;
        }
    
        #endregion
      }
    }
    

    这是受Brent VanderMeide在此处描述的范围对象的启发:http://www.dnrtv.com/default.aspx?showNum=114

    您可以像这样使用此类:
    public void MyFunc()
    {
      using (LogicalOperation.OperationStack.Push("MyFunc"))
      {
        MyOtherFunc();
      }
    }
    
    public void MyOtherFunc()
    {
      using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
      {
        MyFinalFunc();
      }
    }
    
    public void MyFinalFunc()
    {
      using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
      {
        Console.WriteLine("Hello");
      }
    }
    

    10-06 02:50