我正在构建一个必须处理大量数据的控制台应用程序。
基本上,应用程序从数据库获取引用。对于每个引用,请分析文件的内容并进行一些更改。这些文件是HTML文件,此过程使用RegEx替换进行了大量工作(查找引用并将其转换为链接)。然后将结果存储在文件系统中并发送到外部系统。
如果我继续该过程,请按顺序进行:
var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
SendToWs(ref, convertedHtml);
}
我的程序运行正常,但是速度很慢。这就是为什么我要并行化该过程。
现在,我添加了AsParallel进行了简单的Parallelization:
var refs = GetReferencesFromDB().AsParallel();
refs.ForAll(ref=>
{
var filePath = GetFilePath(ref);
var html = File.ReadAllText(filePath);
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath);
SendToWs(ref, convertedHtml);
});
这个简单的更改减少了过程的持续时间(减少了25%的时间)。但是,我对并行化的理解是,如果对依赖I/O的资源进行并行化,将不会有太多好处(或更糟的是,更少的好处),因为I/O不会神奇地翻倍。
这就是为什么我认为我应该更改方法而不是使整个过程并行化,而要创建相关的链式排队任务。
即,我应该创建一个像这样的流程:
但是,我不知道如何实现这种想法。
我觉得它将以一组消费者/生产者队列结束,但是我没有找到正确的样本。
而且,我不确定是否会有好处。
感谢您的建议
[编辑] 实际上,如果是rtm,我是使用C#4.5的理想人选:)
[编辑2] 让我觉得并行化不正确的另一件事是,在资源监视器中,我看到CPU,网络I/O和磁盘I/O的图形不稳定。当一个人高时,其他人则从低到中
最佳答案
您没有在任何代码中利用任何异步I/O API。您正在做的所有事情都受CPU限制,并且您所有的I/O操作都将浪费CPU资源。 AsParallel
用于计算绑定(bind)任务,如果您想利用异步I/O,则需要在BeginXXX/EndXXX方法并在可用时加以利用来实现的。
阅读此文章以了解入门者:TPL TaskFactory.FromAsync vs Tasks with blocking methods
接下来,无论如何您都不希望在这种情况下使用AsParallel
。 AsParallel
启用流传输,这将导致立即为每个项目安排新的任务,但是您在这里不需要/不希望这样做。使用Parallel::ForEach
对工作进行分区会更好地为您服务。
让我们看看如何在特定情况下使用此知识来实现最大并发性:
var refs = GetReferencesFromDB();
// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
refs,
ref =>
{
string filePath = GetFilePath(ref);
byte[] fileDataBuffer = new byte[1048576];
// Need to use FileStream API directly so we can enable async I/O
FileStream sourceFileStream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
8192,
true);
// Use FromAsync to read the data from the file
Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
sourceFileStream.BeginRead
sourceFileStream.EndRead
fileDataBuffer,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async read is completed
readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
{
int soureFileStreamBytesRead;
try
{
// Determine exactly how many bytes were read
// NOTE: this will propagate any potential exception that may have occurred in EndRead
sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
}
finally
{
// Always clean up the source stream
sourceFileStream.Close();
sourceFileStream = null;
}
// This is here to make sure you don't end up trying to read files larger than this sample code can handle
if(sourceFileStreamBytesRead == fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
}
// Convert the file data to a string
string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);
// Parse the HTML
string convertedHtml = ParseHtml(html);
// This is here to make sure you don't end up trying to write files larger than this sample code can handle
if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
}
// Convert the file data back to bytes for writing
Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);
// Need to use FileStream API directly so we can enable async I/O
FileStream destinationFileStream = new FileStream(
destinationFilePath,
FileMode.OpenOrCreate,
FileAccess.Write,
FileShare.None,
8192,
true);
// Use FromAsync to read the data from the file
Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
destinationFileStream.BeginWrite,
destinationFileStream.EndWrite,
fileDataBuffer,
0,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async write is completed
destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
{
try
{
// NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
destinationFileStreamWriteAntecedent.Wait();
}
finally
{
// Always close the destination file stream
destinationFileStream.Close();
destinationFileStream = null;
}
},
TaskContinuationOptions.AttachedToParent);
// Send to external system **concurrent** to writing to destination file system above
SendToWs(ref, convertedHtml);
},
TaskContinuationOptions.AttachedToParent);
});
现在,这里有一些注意事项:
TaskContinuationOptions.AttachedToParent
。这非常重要,因为它将阻止Parallel::ForEach
启动工作的工作线程完成,直到所有基础异步调用都完成。如果不在这里,那么您将同时启动所有5000个项目的工作,这将污染TPL子系统并产生数千个预定任务,并且根本无法正确扩展。 关于c# - 如何充分依靠I/O适当并行化作业,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/8505815/