这是一个简短的代码示例,可以快速向您介绍我的问题是什么:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var secondBlock = new TransformBlock<int,string>(async x =>
{
if (x == 12)
{
await Task.Delay(5000);
return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
}
return $"{DateTime.Now}: Message is {x}";
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
firstBlock.LinkTo(secondBlock);
secondBlock.LinkTo(thirdBlock);
var populateTask = Task.Run(async () =>
{
foreach (var x in Enumerable.Range(1, 15))
{
await firstBlock.SendAsync(x);
}
});
populateTask.Wait();
secondBlock.Completion.Wait();
}
}
}
输出为:
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
为什么是这个订单,我如何改变网络,以获得下面的输出?
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
所以我想知道为什么其他的块(或任务)都要等待延迟的块?
更新
既然你们让我更详细地解释我的问题,我做了这个样本,更接近我正在研究的真实管道。假设应用程序下载一些数据并根据返回的响应计算散列。
using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
{
using (var httpClient = new HttpClient())
{
if (x == "4") await Task.Delay(5000);
var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
return new Tuple<string, string>(x, result);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
{
using (var algorithm = SHA256.Create())
{
var bytes = Encoding.UTF8.GetBytes(x.Item2);
var hash = algorithm.ComputeHash(bytes);
return new Tuple<string, byte[]>(x.Item1, hash);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
{
var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";
Console.WriteLine(output);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
firstBlock.LinkTo(secondBlock);
secondBlock.LinkTo(thirdBlock);
thirdBlock.LinkTo(fourthBlock);
var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();
fourthBlock.Completion.Wait();
}
private static string GetHashAsString(byte[] bytes)
{
var sb = new StringBuilder();
int i;
for (i = 0; i < bytes.Length; i++)
{
sb.AppendFormat("{0:X2}", bytes[i]);
if (i % 4 == 3) sb.Append(" ");
}
return sb.ToString();
}
}
}
让我们看看请求的顺序:
这绝对有道理。所有的要求都尽快提出。慢的第四个请求在列表的末尾。
现在让我们看看我们有什么产出:
09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F
09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
您可以看到,第三个之后的所有散列都是在第四个响应出现之后计算的。
基于这两个事实,我们可以说所有下载的页面都在等待第四个请求的完成。最好不要等到第四个请求,数据下载后立即计算散列。我有办法做到这一点吗?
最佳答案
好吧,通过@sirrufo的引用,我开始考虑实现我自己的TransformBlock
,它将满足我的需求,并处理传入的项目,而不考虑订购。这样就不会破坏网络,在下载的一部分中在块之间建立一个间隙,这将是一个优雅的方法。
所以我开始研究我该怎么做。研究TransformBlock
本身的来源似乎是一个很好的起点,所以我打开Github上的TransformBlock
来源并开始分析它。
从一开始上课我就发现了一件有趣的事:
//如果使用并行,则需要支持对无序完成的消息重新排序。
// However, a developer can override this with EnsureOrdered == false.
if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
{
_reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
}
看起来正是我们想要的!让我们在Github上的
EnsureOrdered
类中看到这个DataflowBlockOptions
选项:/// <summary>Gets or sets whether ordered processing should be enforced on a block's handling of messages.</summary>
/// <remarks>
/// By default, dataflow blocks enforce ordering on the processing of messages. This means that a
/// block like <see cref="TransformBlock{TInput, TOutput}"/> will ensure that messages are output in the same
/// order they were input, even if parallelism is employed by the block and the processing of a message N finishes
/// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input
/// ordering prior to making those results available to a consumer). Some blocks may allow this to be relaxed,
/// however. Setting <see cref="EnsureOrdered"/> to false tells a block that it may relax this ordering if
/// it's able to do so. This can be beneficial if the immediacy of a processed result being made available
/// is more important than the input-to-output ordering being maintained.
/// </remarks>
public bool EnsureOrdered
{
get { return _ensureOrdered; }
set { _ensureOrdered = value; }
}
它看起来非常好,所以我立即切换到IDE设置它。不幸的是,没有这样的设置:
我一直在寻找,发现了这个note:
4.5.25-β-23019
包已重命名为system.threading.tasks.dataflow
当我在谷歌上找到这个叫做“CC”的“AA”!因此,我卸载了
System.Threading.Tasks.Dataflow
包并通过发出以下命令安装了Microsoft.Tpl.Dataflow
:Install-Package System.Threading.Tasks.Dataflow
还有
System.Threading.Tasks.Dataflow
选项。我更新了代码,将EnsureOrdered
设置为EnsureOrdered
:using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false };
var firstBlock = new TransformBlock<int, string>(x => x.ToString(), options);
var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
{
using (var httpClient = new HttpClient())
{
if (x == "4") await Task.Delay(5000);
var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
return new Tuple<string, string>(x, result);
}
}, options);
var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
{
using (var algorithm = SHA256.Create())
{
var bytes = Encoding.UTF8.GetBytes(x.Item2);
var hash = algorithm.ComputeHash(bytes);
return new Tuple<string, byte[]>(x.Item1, hash);
}
}, options);
var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
{
var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";
Console.WriteLine(output);
}, options);
firstBlock.LinkTo(secondBlock);
secondBlock.LinkTo(thirdBlock);
thirdBlock.LinkTo(fourthBlock);
var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();
fourthBlock.Completion.Wait();
}
private static string GetHashAsString(byte[] bytes)
{
var sb = new StringBuilder();
int i;
for (i = 0; i < bytes.Length; i++)
{
sb.AppendFormat("{0:X2}", bytes[i]);
if (i % 4 == 3) sb.Append(" ");
}
return sb.ToString();
}
}
}
结果就是我想要的:
10.08.2016 11:03:23: Hash for element #3: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #1: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #2: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #10: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #8: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #9: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #5: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #7: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #6: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:27: Hash for element #4: FD25E52B FCD8DE81 BD38E11B 13C20B96 09473283 F25346B2 04593B70 E4357BDA