我有一个用例,我需要:

  • 遍历Xml文档
  • 中的每个Input节点
  • 对每个输入执行耗时的计算,
  • 将结果写入XML文件。

  • 输入看起来像这样:
    <Root>
      <Input>
        <Case>ABC123</Case>
        <State>MA</State>
        <Investor>Goldman</Investor>
      </Input>
      <Input>
        <Case>BCD234</Case>
        <State>CA</State>
        <Investor>Goldman</Investor>
      </Input>
    </Root>
    

    和输出:
    <Results>
      <Output>
        <Case>ABC123</Case>
        <State>MA</State>
        <Investor>Goldman</Investor>
        <Price>75.00</Price>
        <Product>Blah</Product>
      </Output>
      <Output>
        <Case>BCD234</Case>
        <State>CA</State>
        <Investor>Goldman</Investor>
        <Price>55.00</Price>
        <Product>Ack</Product>
      </Output>
    </Results>
    

    我想并行运行这些计算;典型的输入文件可能有50,000个输入节点,而没有线程的总处理时间可能是90分钟。大约90%的处理时间用于步骤#2(计算)。

    我可以很容易地遍历XmlReader in parallel:
    static IEnumerable<XElement> EnumerateAxis(XmlReader reader, string axis)
    {
      reader.MoveToContent();
      while (reader.Read())
      {
        switch (reader.NodeType)
        {
          case XmlNodeType.Element:
            if (reader.Name == axis)
            {
              XElement el = XElement.ReadFrom(reader) as XElement;
              if (el != null)
                yield return el;
            }
            break;
        }
      }
    }
    ...
    Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
    {
      // do calc
      // lock the XmlWriter, write, unlock
    });
    

    我目前倾向于在写入XmlWriter时使用锁,以确保线程安全。

    在这种情况下,有没有更优雅的方式来处理XmlWriter?具体来说,我是否应该让Parallel.ForEach代码将结果传递回原始线程,并让该线程处理XmlWriter,从而避免锁定?如果是这样,我不确定这样做的正确方法。

    最佳答案

    这是我最喜欢的一种问题:可以通过管道解决。

    请注意,根据您的情况,这种方法实际上可能会对性能产生负面影响,但是正如您明确询问如何在专用线程上使用writer一样,下面的代码精确地演示了这一点。

    免责声明:理想情况下,您应该考虑使用TPL Dataflow,但这不是我很精通的东西,因此我将采用熟悉的Task + BlockingCollection<T>路线。

    起初,我将建议一个3阶段的管道(读取,处理,写入),但是后来我意识到您已经将前两个阶段与在读取和读取节点时“流式处理”的方式结合在一起了。将它们提供给Parallel.ForEach(是的,您已经实现了各种管道)。甚至更好-更少的线程同步。

    考虑到这一点,代码现在变为:

    public class Result
    {
        public string Case { get; set; }
        public string State { get; set; }
        public string Investor { get; set; }
        public decimal Price { get; set; }
        public string Product { get; set; }
    }
    

    ...
    using (var reader = CreateXmlReader())
    {
        // I highly doubt that this collection will
        // ever reach its bounded capacity since
        // the processing stage takes so long,
        // but in case it does, Parallel.ForEach
        // will be throttled.
        using (var handover = new BlockingCollection<Result>(boundedCapacity: 100))
        {
            var processStage = Task.Run(() =>
            {
                try
                {
                    Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
                    {
                        // Do calc.
                        Thread.Sleep(1000);
    
                        // Hand over to the writer.
                        // This handover is not blocking (unless our
                        // blocking collection has reached its bounded
                        // capacity, which would indicate that the
                        // writer is running slower than expected).
                        handover.Add(new Result());
                    });
                }
                finally
                {
                    handover.CompleteAdding();
                }
            });
    
            var writeStage = Task.Run(() =>
            {
                using (var writer = CreateXmlReader())
                {
                    foreach (var result in handover.GetConsumingEnumerable())
                    {
                        // Write element.
                    }
                }
            });
    
            // Note: the two stages are now running in parallel.
            // You could technically use Parallel.Invoke to
            // achieve the same result with a bit less code.
            Task.WaitAll(processStage, writeStage);
        }
    }
    

    关于C#并行库,XmlReader,XmlWriter,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24467373/

    10-09 20:00