我已经使用TPL Dataflow了很多,但是却为我无法解决的问题绊绊:
我有以下架构:BroadCastBlock<List<object1>>
-> 2个不同的TransformBlock<List<Object1>, Tuple<int, List<Object1>>>
->都链接到TransformManyBlock<Tuple<int, List<Object1>>, Object2>
我在链的末尾更改了TransformManyBlock中的lambda表达式:(a)对流元组执行操作的代码,(b)完全没有代码。
在TransformBlocks中,我测量从第一项到达开始到TransformBlock.Completion指示块已完成为止的时间(broadCastBlock链接到transportCompletion设置为true的fromfrom块)。
我无法调和的是为什么在(b)情况下transformBlocks完成的速度比(a)快5-6倍。这完全违背了整个TDF设计意图。转换块中的项目已传递到transfromManyBlock,因此,transformManyBlock对转换块完成时会产生影响的项目执行的操作完全无关紧要。我看不出有什么单一原因可以解释transfromManyBlock中发生的任何事情都可能与前面的TransformBlocks有关。
任何人都可以调和这个奇怪的发现吗?
这是一些代码来显示差异。运行代码时,请确保将以下两行更改为:
tfb1.transformBlock.LinkTo(transformManyBlock);
tfb2.transformBlock.LinkTo(transformManyBlock);
至:
tfb1.transformBlock.LinkTo(transformManyBlockEmpty);
tfb2.transformBlock.LinkTo(transformManyBlockEmpty);
为了观察前面的transformBlocks在运行时的差异。
class Program
{
static void Main(string[] args)
{
Test test = new Test();
test.Start();
}
}
class Test
{
private const int numberTransformBlocks = 2;
private int currentGridPointer;
private Dictionary<int, List<Tuple<int, List<Object1>>>> grid;
private BroadcastBlock<List<Object1>> broadCastBlock;
private TransformBlockClass tfb1;
private TransformBlockClass tfb2;
private TransformManyBlock<Tuple<int, List<Object1>>, Object2>
transformManyBlock;
private TransformManyBlock<Tuple<int, List<Object1>>, Object2>
transformManyBlockEmpty;
private ActionBlock<Object2> actionBlock;
public Test()
{
grid = new Dictionary<int, List<Tuple<int, List<Object1>>>>();
broadCastBlock = new BroadcastBlock<List<Object1>>(list => list);
tfb1 = new TransformBlockClass();
tfb2 = new TransformBlockClass();
transformManyBlock = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>
(newTuple =>
{
for (int counter = 1; counter <= 10000000; counter++)
{
double result = Math.Sqrt(counter + 1.0);
}
return new Object2[0];
});
transformManyBlockEmpty
= new TransformManyBlock<Tuple<int, List<Object1>>, Object2>(
tuple =>
{
return new Object2[0];
});
actionBlock = new ActionBlock<Object2>(list =>
{
int tester = 1;
//flush transformManyBlock
});
//linking
broadCastBlock.LinkTo(tfb1.transformBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
broadCastBlock.LinkTo(tfb2.transformBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
//link either to ->transformManyBlock or -> transformManyBlockEmpty
tfb1.transformBlock.LinkTo(transformManyBlock);
tfb2.transformBlock.LinkTo(transformManyBlock);
transformManyBlock.LinkTo(actionBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
transformManyBlockEmpty.LinkTo(actionBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
//completion
Task.WhenAll(tfb1.transformBlock.Completion
, tfb2.transformBlock.Completion)
.ContinueWith(_ =>
{
transformManyBlockEmpty.Complete();
transformManyBlock.Complete();
});
transformManyBlock.Completion.ContinueWith(_ =>
{
Console.WriteLine("TransformManyBlock (with code) completed");
});
transformManyBlockEmpty.Completion.ContinueWith(_ =>
{
Console.WriteLine("TransformManyBlock (empty) completed");
});
}
public void Start()
{
const int numberBlocks = 100;
const int collectionSize = 300000;
//send collection numberBlock-times
for (int i = 0; i < numberBlocks; i++)
{
List<Object1> list = new List<Object1>();
for (int j = 0; j < collectionSize; j++)
{
list.Add(new Object1(j));
}
broadCastBlock.Post(list);
}
//mark broadCastBlock complete
broadCastBlock.Complete();
Console.WriteLine("Core routine finished");
Console.ReadLine();
}
}
class TransformBlockClass
{
private Stopwatch watch;
private bool isStarted;
private int currentIndex;
public TransformBlock<List<Object1>, Tuple<int, List<Object1>>> transformBlock;
public TransformBlockClass()
{
isStarted = false;
watch = new Stopwatch();
transformBlock = new TransformBlock<List<Object1>, Tuple<int, List<Object1>>>
(list =>
{
if (!isStarted)
{
StartUp();
isStarted = true;
}
return new Tuple<int, List<Object1>>(currentIndex++, list);
});
transformBlock.Completion.ContinueWith(_ =>
{
ShutDown();
});
}
private void StartUp()
{
watch.Start();
}
private void ShutDown()
{
watch.Stop();
Console.WriteLine("TransformBlock : Time elapsed in ms: "
+ watch.ElapsedMilliseconds);
}
}
class Object1
{
public int val { get; private set; }
public Object1(int val)
{
this.val = val;
}
}
class Object2
{
public int value { get; private set; }
public List<Object1> collection { get; private set; }
public Object2(int value, List<Object1> collection)
{
this.value = value;
this.collection = collection;
}
}
*编辑:我发布了另一个代码段,这次使用值类型的集合,但是我无法重现上面代码中观察到的问题。难道是传递引用类型并同时对其进行操作(即使在不同的数据流块中)也会阻塞并引起争用吗? *
class Program
{
static void Main(string[] args)
{
Test test = new Test();
test.Start();
}
}
class Test
{
private BroadcastBlock<List<int>> broadCastBlock;
private TransformBlock<List<int>, List<int>> tfb11;
private TransformBlock<List<int>, List<int>> tfb12;
private TransformBlock<List<int>, List<int>> tfb21;
private TransformBlock<List<int>, List<int>> tfb22;
private TransformManyBlock<List<int>, List<int>> transformManyBlock1;
private TransformManyBlock<List<int>, List<int>> transformManyBlock2;
private ActionBlock<List<int>> actionBlock1;
private ActionBlock<List<int>> actionBlock2;
public Test()
{
broadCastBlock = new BroadcastBlock<List<int>>(item => item);
tfb11 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
tfb12 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
tfb21 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
tfb22 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
transformManyBlock1 = new TransformManyBlock<List<int>, List<int>>(item =>
{
Thread.Sleep(100);
//or you can replace the Thread.Sleep(100) with actual work,
//no difference in results. This shows that the issue at hand is
//unrelated to starvation of threads.
return new List<int>[1] { item };
});
transformManyBlock2 = new TransformManyBlock<List<int>, List<int>>(item =>
{
return new List<int>[1] { item };
});
actionBlock1 = new ActionBlock<List<int>>(item =>
{
//flush transformManyBlock
});
actionBlock2 = new ActionBlock<List<int>>(item =>
{
//flush transformManyBlock
});
//linking
broadCastBlock.LinkTo(tfb11, new DataflowLinkOptions
{ PropagateCompletion = true });
broadCastBlock.LinkTo(tfb12, new DataflowLinkOptions
{ PropagateCompletion = true });
broadCastBlock.LinkTo(tfb21, new DataflowLinkOptions
{ PropagateCompletion = true });
broadCastBlock.LinkTo(tfb22, new DataflowLinkOptions
{ PropagateCompletion = true });
tfb11.LinkTo(transformManyBlock1);
tfb12.LinkTo(transformManyBlock1);
tfb21.LinkTo(transformManyBlock2);
tfb22.LinkTo(transformManyBlock2);
transformManyBlock1.LinkTo(actionBlock1
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
transformManyBlock2.LinkTo(actionBlock2
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
//completion
Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
{
Console.WriteLine("TransformBlocks 11 and 12 completed");
transformManyBlock1.Complete();
});
Task.WhenAll(tfb21.Completion, tfb22.Completion).ContinueWith(_ =>
{
Console.WriteLine("TransformBlocks 21 and 22 completed");
transformManyBlock2.Complete();
});
transformManyBlock1.Completion.ContinueWith(_ =>
{
Console.WriteLine
("TransformManyBlock (from tfb11 and tfb12) finished");
});
transformManyBlock2.Completion.ContinueWith(_ =>
{
Console.WriteLine
("TransformManyBlock (from tfb21 and tfb22) finished");
});
}
public void Start()
{
const int numberBlocks = 100;
const int collectionSize = 300000;
//send collection numberBlock-times
for (int i = 0; i < numberBlocks; i++)
{
List<int> list = new List<int>();
for (int j = 0; j < collectionSize; j++)
{
list.Add(j);
}
broadCastBlock.Post(list);
}
//mark broadCastBlock complete
broadCastBlock.Complete();
Console.WriteLine("Core routine finished");
Console.ReadLine();
}
}
最佳答案
好的,最后尝试;-)
概要:
方案1中观察到的时间增量可以通过垃圾收集器的不同行为来完全解释。
当运行方案1链接transformManyBlocks时,运行时行为是在主线程上创建新项(列表)时触发垃圾回收,而在运行方案1并链接transformManyBlockEmptys时情况并非如此。
请注意,创建新的引用类型实例(Object1)会导致在GC堆中分配内存的调用,这又可能触发GC收集运行。创建了许多Object1实例(和列表)后,垃圾收集器还有很多工作要做,以扫描堆中(可能)无法访问的对象。
因此,可以通过以下任一方式将观察到的差异最小化:
将Object1从类转换为结构(从而确保实例的内存未在堆上分配)。
保留对生成列表的引用(从而减少垃圾收集器识别不可达对象所需的时间)。
生成所有项目,然后将其发布到网络。
(注意:我无法解释为什么垃圾收集器在方案1“ transformManyBlock”与方案1“ transformManyBlockEmpty”中的行为不同,但是通过ConcurrencyVisualizer收集的数据清楚地表明了区别。)
结果:
(测试是在Core i7 980X,6核,启用HT的情况下运行的):
我对方案2进行了如下修改:
// Start a stopwatch per tfb
int tfb11Cnt = 0;
Stopwatch sw11 = new Stopwatch();
tfb11 = new TransformBlock<List<int>, List<int>>(item =>
{
if (Interlocked.CompareExchange(ref tfb11Cnt, 1, 0) == 0)
sw11.Start();
return item;
});
// [...]
// completion
Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
{
Console.WriteLine("TransformBlocks 11 and 12 completed. SW11: {0}, SW12: {1}",
sw11.ElapsedMilliseconds, sw12.ElapsedMilliseconds);
transformManyBlock1.Complete();
});
结果:
场景1(如发布,即链接到transformManyBlock):
TransformBlock:以毫秒为单位的经过时间:6826
TransformBlock:以毫秒为单位的经过时间:6826
场景1(链接到transformManyBlockEmpty):
TransformBlock:以毫秒为单位的经过时间:3140
TransformBlock:以毫秒为单位的经过时间:3140
方案1(循环主体中的transformManyBlock,Thread.Sleep(200)):
TransformBlock:以毫秒为单位的经过时间:4949
TransformBlock:以毫秒为单位的经过时间:4950
方案2(已发布但已修改为报告时间):
转换块21和22已完成。 SW21:619毫秒,SW22:669毫秒
转换块11和12已完成。 SW11:669毫秒,SW12:667毫秒
接下来,我更改了场景1和2,以准备将输入数据发布到网络之前:
// Scenario 1
//send collection numberBlock-times
var input = new List<List<Object1>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
var list = new List<Object1>(collectionSize);
for (int j = 0; j < collectionSize; j++)
{
list.Add(new Object1(j));
}
input.Add(list);
}
foreach (var inp in input)
{
broadCastBlock.Post(inp);
Thread.Sleep(10);
}
// Scenario 2
//send collection numberBlock-times
var input = new List<List<int>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
List<int> list = new List<int>(collectionSize);
for (int j = 0; j < collectionSize; j++)
{
list.Add(j);
}
//broadCastBlock.Post(list);
input.Add(list);
}
foreach (var inp in input)
{
broadCastBlock.Post(inp);
Thread.Sleep(10);
}
结果:
方案1(transformManyBlock):
TransformBlock:以毫秒为单位的经过时间:1029
TransformBlock:以毫秒为单位的经过时间:1029
方案1(transformManyBlockEmpty):
TransformBlock:经过的时间(以毫秒为单位):975
TransformBlock:经过的时间(以毫秒为单位):975
方案1(循环主体中的transformManyBlock,Thread.Sleep(200)):
TransformBlock:以毫秒为单位的经过时间:972
TransformBlock:以毫秒为单位的经过时间:972
最后,我将代码改回原始版本,但保留了对
在以下位置创建了列表:
var lists = new List<List<Object1>>();
for (int i = 0; i < numberBlocks; i++)
{
List<Object1> list = new List<Object1>();
for (int j = 0; j < collectionSize; j++)
{
list.Add(new Object1(j));
}
lists.Add(list);
broadCastBlock.Post(list);
}
结果:
方案1(transformManyBlock):
TransformBlock:以毫秒为单位的经过时间:6052
TransformBlock:以毫秒为单位的经过时间:6052
方案1(transformManyBlockEmpty):
TransformBlock:以毫秒为单位的经过时间:5524
TransformBlock:以毫秒为单位的经过时间:5524
方案1(循环主体中的transformManyBlock,Thread.Sleep(200)):
TransformBlock:以毫秒为单位的经过时间:5098
TransformBlock:以毫秒为单位的经过时间:5098
同样,将Object1从类更改为结构会导致两个块在大约相同的时间(大约快10倍)完成。
更新:下面的答案不足以解释观察到的行为。
在一种情况下,TransformMany lambda内部执行了一个紧密循环,这将占用CPU并耗尽其他线程来占用处理器资源。这就是为什么可以观察到完成延续任务执行延迟的原因。在第二种情况下,在TransformMany lambda内部执行Thread.Sleep,使其他线程有机会执行Completion延续任务。在运行时行为中观察到的差异与TPL数据流无关。为了改善观察到的增量,在场景1中,在循环体内引入Thread.Sleep应该足够了:
for (int counter = 1; counter <= 10000000; counter++)
{
double result = Math.Sqrt(counter + 1.0);
// Back off for a little while
Thread.Sleep(200);
}
(下面是我的原始答案。我没有仔细阅读过OP的问题,仅在阅读了他的评论后才了解他的要求。我仍然将其留作参考。)
您确定自己正在衡量正确的事情吗?请注意,当您执行以下操作:
transformBlock.Completion.ContinueWith(_ => ShutDown());
时,时间测量将受到TaskScheduler行为的影响(例如,持续任务开始执行需要多长时间)。尽管我无法观察到您在机器上看到的差异,但是使用专用线程来测量时间时,我得到了更精确的结果(就tfb1和tfb2完成时间之间的差值而言): // Within your Test.Start() method...
Thread timewatch = new Thread(() =>
{
var sw = Stopwatch.StartNew();
tfb1.transformBlock.Completion.Wait();
Console.WriteLine("tfb1.transformBlock completed within {0} ms",
sw.ElapsedMilliseconds);
});
Thread timewatchempty = new Thread(() =>
{
var sw = Stopwatch.StartNew();
tfb2.transformBlock.Completion.Wait();
Console.WriteLine("tfb2.transformBlock completed within {0} ms",
sw.ElapsedMilliseconds);
});
timewatch.Start();
timewatchempty.Start();
//send collection numberBlock-times
for (int i = 0; i < numberBlocks; i++)
{
// ... rest of the code
关于c# - TPL数据流,对核心设计感到困惑,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13834757/