场景是这样的:在Parallel.For内部使用非并行数组。数组的所有元素都将被覆盖,因此从技术上讲,没有必要分配和初始化它(就我在C#教程中得出的结论而言,这总是在构造时发生):

float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0,16384,x =>
{
   int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
   for (int i = 0; i < histogram.Length; i++)
   {
      histogram[i] = some_func(); // each element in histogram[] is written anew
   }
   result[x] = do_something_with(histogram);
});


顺序代码中的解决方案很简单:将数组拉到外部for循环的前面:

float[] result = new float[16384];
int[] histogram = new int[32768]; // allocation and initialization with
for(x = 0; x < 16384; x++)
{
   for (int i = 0; i < histogram.Length; i++)
   {
      histogram[i] = some_func();
   }
   restult[x] = do_something_with(histogram);
}


外循环中现在既没有分配也没有无效的0-ing。
在并行版本中,这肯定是一个不好的举动,要么并行进程相互破坏直方图结果,要么C#足够聪明以锁定histogram从而关闭任何并行性。分配histogram[16384,32768]同样浪费。我现在正在尝试的是

public static ParallelLoopResult For<TLocal>(
    int fromInclusive,
    int toExclusive,
    Func<TLocal> localInit,
    Func<int, ParallelLoopState, TLocal, TLocal> body,
    Action<TLocal> localFinally
)


库构造(函数?),但是由于这是我第一次尝试使用C#进行并行编程,因此我充满了疑问。从顺序情况来看,以下翻译是否正确?

float[] result = new float[16384];
System.Threading.Tasks.Parallel.For<short[]>(0, 16384,
                                             () => new short[32768],
                                             (x, loopState, histogram) =>
{
    for (int i = 0; i < histogram.Length; i++)
    {
       histogram[i] = some_func();
    }
    result[x] = do_something_with(histogram);
    return histogram;
}, (histogram) => { });

最佳答案

我不能完全确定您的要求,但让我们看一个起点:

public void Original()
{
    float[] result = new float[16384];
    System.Threading.Tasks.Parallel.For(0, 16384, x =>
    {
        int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
        for (int i = 0; i < histogram.Length; i++)
        {
            histogram[i] = some_func(); // each element in histogram[] is written anew
        }
        result[x] = do_something_with(histogram);
    });
}


内部循环生成一个histogram,而外部循环则使用一个histogram,并使用它在Results中生成单个值。

一种易于操作的解决方案是进行TPL-Dataflow处理,这是TPL之上的一种抽象。要进行设置,我们将需要一些DTO通过数据流管道。

public class HistogramWithIndex
{
    public HistogramWithIndex(IEnumerable<int> histogram, int index)
    {
        Histogram = histogram;
        Index = index;
    }
    public IEnumerable<int> Histogram { get; }
    public int Index { get; }
}

public class IndexWithHistogramSize
{
    public IndexWithHistogramSize(int index, int histogramSize)
    {
        Index = index;
        HistogramSize = histogramSize;
    }
    public int Index { get; }
    public int HistogramSize { get; }
}


这些类表示您在处理的各个阶段的数据。现在让我们来看一下管道。

public async Task Dataflow()
{
    //Build our pipeline
    var options = new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        //This is default but I want to point it out
        EnsureOrdered = true
    };
    var buildHistorgramBlock = new TransformBlock<IndexWithHistogramSize, HistogramWithIndex>(inputData =>
    {
        var histogram = Enumerable.Range(0, inputData.HistogramSize).Select(_ => some_func());
        return new HistogramWithIndex(histogram, inputData.Index);
    }, options);
    var doSomethingBlock = new TransformBlock<HistogramWithIndex, int>(x => do_something_with(x.Histogram.ToArray()), options);

    var resultBlock1 = new ActionBlock<int>(x => Results1.Add(x), options);
    //var resultBlock2 = new ActionBlock<int>(x => //insert into list with index, options);

    //link the blocks
    buildHistorgramBlock.LinkTo(doSomethingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    doSomethingBlock.LinkTo(resultBlock1, new DataflowLinkOptions() { PropagateCompletion = true });

    //Post data
    var histogramSize = 32768;
    foreach (var index in Enumerable.Range(0, 16384))
    {
        await buildHistorgramBlock.SendAsync(new IndexWithHistogramSize(index, histogramSize));
    }

    buildHistorgramBlock.Complete();
    await resultBlock1.Completion;
}


由两个TransformBLocksActionBlock组成的块形成了链接的管道。此处的优点是,更改并行度,每个块引入反压的有限容量等变得非常容易。

需要特别注意的是:TransformBlocks,如果使用了并行性,即MDOP> 1,则它们将按照接收顺序输出项目。这意味着,如果他们按顺序来,他们就按顺序离开。您也可以使用阻止选项Ensure Ordering关闭订购。如果您希望项目在特定索引中不带/不按特定顺序排列,则此功能起作用。

这似乎有些矫kill过正,并且可能适合您的项目。但是我发现这可以非常灵活并且易于维护。尤其是当您开始向处理链中添加步骤时,添加一个块比将另一个for循环包装在所有内容周围要干净得多。

这是c&p的其余样板代码

private ConcurrentBag<int> Results1 = new ConcurrentBag<int>();
private int some_func() => 1;
private int do_something_with(int[] i) => i.First();

10-08 11:37