我面临以下问题:
我有一个Foo
对象的数据流,并将这些对象流式传输到几个并发的进程中任务/线程,这些任务/线程又依次处理这些对象并输出FooResult
对象。除其他成员外,每个FooResult
都包含创建Foo
时使用的相同FooResult
。但是,并非每个Foo
都必须创建一个FooResult
。
我的问题是,我想从整个过程中传递一个包装对象,该包装对象包含原始的Foo
,并可能包含所有并发任务中可能已经通过FooResult
创建的Foo
对象(如果有的话)。
注意:我目前使用TPL Dataflow,而每个并发进程都在ActionBlock<Foo>
内发生,该BroadCastBlock<Foo>
是从SendAsync()
链接到的。它使用FooResult
到目标数据流块,以发送可能创建的FooResult
。显然,并发数据流块在无法预测的时间生成FooResult
,这是我目前正在努力解决的问题。我似乎无法弄清楚在所有ActionBlock<Foo>
中总共创建了多少个Foo
,因此我可以将它们与原始Foo
捆绑在一起并作为包装对象传递。
当前在伪代码中,其外观如下:
BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2;
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);
aBlock1 = new ActionBlock<Foo>(foo =>
{
//do something here. Sometimes create a FooResult. If then
targetBlock.SendAsync(fooResult);
});
//similar for aBlock2
但是,当前代码的问题在于,如果
FooResult
在任何操作块中均未生成单个FooResult
,则targetBlock可能不接收任何内容。另外,可能是targetBlock接收了2个FooResult
对象,因为每个操作块都产生了一个Foo
。我想要的是targetBlock接收一个包含每个
FooResult
的包装对象,如果创建了FooResult
对象,则还包含一个ojit_code的集合。我有什么想法可以使解决方案以上述方式工作?它不必细读TPL数据流,但如果这样做的话,它会很整洁。
更新:以下是我通过svick建议的JoinBlock实现获得的结果。我不会使用它(除非可以在性能方面进行调整),因为它运行起来非常慢,我每秒可以得到大约89000个项目(而且那仅仅是int值类型)。
public class Test
{
private BroadcastBlock<int> broadCastBlock;
private TransformBlock<int, int> transformBlock1;
private TransformBlock<int, int> transformBlock2;
private JoinBlock<int, int, int> joinBlock;
private ActionBlock<Tuple<int, int, int>> processorBlock;
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
joinBlock = new JoinBlock<int, int, int>();
processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
{
//Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions { PropagateCompletion = true });
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}
更新代码以反射(reflect)建议:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}
最佳答案
我可以看到两种解决方法:
JoinBlock
。您的广播块以及两个工作块都将发送到连接块的一个目标。如果一个工作程序块没有任何结果,它将改为给它null
(或其他一些特殊值)。您的工作程序块将需要更改为TranformBlock<Foo, FooResult>
,因为使用ActionBlock
不能保证排序(至少在设置MaxDegreeOfParallelism
时不能保证),TransformBlock
可以保证排序。JoinBlock
的结果将是Tuple<Foo, FooResult, FooResult>
,其中FooResult
的任何一个或两个都可以是null
。尽管我不确定我是否喜欢此解决方案在很大程度上取决于正确的项目排序,但对我来说这似乎很脆弱。
NotificationWrapper
。在这种情况下,您可以使用
TaskCompletionSource
和Task.WhenAll()
进行同步。 关于c# - 如何并行处理项目然后合并结果?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13497345/