场景是这样的:在Parallel.For内部使用非并行数组。数组的所有元素都将被覆盖,因此从技术上讲,没有必要分配和初始化它(就我在C#教程中得出的结论而言,这总是在构造时发生):
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0,16384,x =>
{
int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func(); // each element in histogram[] is written anew
}
result[x] = do_something_with(histogram);
});
顺序代码中的解决方案很简单:将数组拉到外部for循环的前面:
float[] result = new float[16384];
int[] histogram = new int[32768]; // allocation and initialization with
for(x = 0; x < 16384; x++)
{
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func();
}
restult[x] = do_something_with(histogram);
}
外循环中现在既没有分配也没有无效的0-ing。
在并行版本中,这肯定是一个不好的举动,要么并行进程相互破坏直方图结果,要么C#足够聪明以锁定
histogram
从而关闭任何并行性。分配histogram[16384,32768]
同样浪费。我现在正在尝试的是public static ParallelLoopResult For<TLocal>(
int fromInclusive,
int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
库构造(函数?),但是由于这是我第一次尝试使用C#进行并行编程,因此我充满了疑问。从顺序情况来看,以下翻译是否正确?
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For<short[]>(0, 16384,
() => new short[32768],
(x, loopState, histogram) =>
{
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func();
}
result[x] = do_something_with(histogram);
return histogram;
}, (histogram) => { });
最佳答案
我不能完全确定您的要求,但让我们看一个起点:
public void Original()
{
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0, 16384, x =>
{
int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func(); // each element in histogram[] is written anew
}
result[x] = do_something_with(histogram);
});
}
内部循环生成一个
histogram
,而外部循环则使用一个histogram
,并使用它在Results
中生成单个值。一种易于操作的解决方案是进行TPL-Dataflow处理,这是TPL之上的一种抽象。要进行设置,我们将需要一些DTO通过数据流管道。
public class HistogramWithIndex
{
public HistogramWithIndex(IEnumerable<int> histogram, int index)
{
Histogram = histogram;
Index = index;
}
public IEnumerable<int> Histogram { get; }
public int Index { get; }
}
public class IndexWithHistogramSize
{
public IndexWithHistogramSize(int index, int histogramSize)
{
Index = index;
HistogramSize = histogramSize;
}
public int Index { get; }
public int HistogramSize { get; }
}
这些类表示您在处理的各个阶段的数据。现在让我们来看一下管道。
public async Task Dataflow()
{
//Build our pipeline
var options = new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
//This is default but I want to point it out
EnsureOrdered = true
};
var buildHistorgramBlock = new TransformBlock<IndexWithHistogramSize, HistogramWithIndex>(inputData =>
{
var histogram = Enumerable.Range(0, inputData.HistogramSize).Select(_ => some_func());
return new HistogramWithIndex(histogram, inputData.Index);
}, options);
var doSomethingBlock = new TransformBlock<HistogramWithIndex, int>(x => do_something_with(x.Histogram.ToArray()), options);
var resultBlock1 = new ActionBlock<int>(x => Results1.Add(x), options);
//var resultBlock2 = new ActionBlock<int>(x => //insert into list with index, options);
//link the blocks
buildHistorgramBlock.LinkTo(doSomethingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
doSomethingBlock.LinkTo(resultBlock1, new DataflowLinkOptions() { PropagateCompletion = true });
//Post data
var histogramSize = 32768;
foreach (var index in Enumerable.Range(0, 16384))
{
await buildHistorgramBlock.SendAsync(new IndexWithHistogramSize(index, histogramSize));
}
buildHistorgramBlock.Complete();
await resultBlock1.Completion;
}
由两个
TransformBLocks
和ActionBlock
组成的块形成了链接的管道。此处的优点是,更改并行度,每个块引入反压的有限容量等变得非常容易。需要特别注意的是:
TransformBlocks
,如果使用了并行性,即MDOP> 1,则它们将按照接收顺序输出项目。这意味着,如果他们按顺序来,他们就按顺序离开。您也可以使用阻止选项Ensure Ordering
关闭订购。如果您希望项目在特定索引中不带/不按特定顺序排列,则此功能起作用。这似乎有些矫kill过正,并且可能适合您的项目。但是我发现这可以非常灵活并且易于维护。尤其是当您开始向处理链中添加步骤时,添加一个块比将另一个for循环包装在所有内容周围要干净得多。
这是c&p的其余样板代码
private ConcurrentBag<int> Results1 = new ConcurrentBag<int>();
private int some_func() => 1;
private int do_something_with(int[] i) => i.First();