我正在构建一个必须处理大量数据的控制台应用程序。

基本上,应用程序从数据库获取引用。对于每个引用,请分析文件的内容并进行一些更改。这些文件是HTML文件,此过程使用RegEx替换进行了大量工作(查找引用并将其转换为链接)。然后将结果存储在文件系统中并发送到外部系统。

如果我继续该过程,请按顺序进行:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

我的程序运行正常,但是速度很慢。这就是为什么我要并行化该过程。

现在,我添加了AsParallel进行了简单的Parallelization:
var refs = GetReferencesFromDB().AsParallel();
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref);
    var html = File.ReadAllText(filePath);
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath);
    SendToWs(ref, convertedHtml);
});

这个简单的更改减少了过程的持续时间(减少了25%的时间)。但是,我对并行化的理解是,如果对依赖I/O的资源进行并行化,将不会有太多好处(或更糟的是,更少的好处),因为I/O不会神奇地翻倍。

这就是为什么我认为我应该更改方法而不是使整个过程并行化,而要创建相关的链式排队任务。

即,我应该创建一个像这样的流程:



但是,我不知道如何实现这种想法。

我觉得它将以一组消费者/生产者队列结束,但是我没有找到正确的样本。

而且,我不确定是否会有好处。

感谢您的建议

[编辑] 实际上,如果是rtm,我是使用C#4.5的理想人选:)

[编辑2] 让我觉得并行化不正确的另一件事是,在资源监视器中,我看到CPU,网络I/O和磁盘I/O的图形不稳定。当一个人高时,其他人则从低到中

最佳答案

您没有在任何代码中利用任何异步I/O API。您正在做的所有事情都受CPU限制,并且您所有的I/O操作都将浪费CPU资源。 AsParallel用于计算绑定(bind)任务,如果您想利用异步I/O,则需要在BeginXXX/EndXXX方法并在可用时加以利用来实现的。

阅读此文章以了解入门者:TPL TaskFactory.FromAsync vs Tasks with blocking methods

接下来,无论如何您都不希望在这种情况下使用AsParallelAsParallel启用流传输,这将导致立即为每个项目安排新的任务,但是您在这里不需要/不希望这样做。使用Parallel::ForEach对工作进行分区会更好地为您服务。

让我们看看如何在特定情况下使用此知识来实现​​最大并发性:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath,
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

现在,这里有一些注意事项:
  • 这是示例代码,因此我正在使用1MB的缓冲区读取/写入文件。这对于HTML文件来说是多余的,并且浪费了系统资源。您可以降低它以满足您的最大需求,也可以将链接的读/写实现到StringBuilder中,这是我的专职工作,因为我要编写约500行以上的代码来执行异步的链式读/写。 :P
  • 您会注意到,在继续执行读/写任务时,我有TaskContinuationOptions.AttachedToParent。这非常重要,因为它将阻止Parallel::ForEach启动工作的工作线程完成,直到所有基础异步调用都完成。如果不在这里,那么您将同时启动所有5000个项目的工作,这将污染TPL子系统并产生数千个预定任务,并且根本无法正确扩展。
  • 我同时调用SendToWs来将文件写入此处的文件共享。我不知道SendToWs实现的基础是什么,但这听起来也很适合进行异步处理。现在假设它是纯计算工作,因此将在执行时刻录CPU线程。我将作为练习给您,以找出如何最好地利用我向您展示的内容来提高吞吐量。
  • 这都是自由输入形式,我的大脑是这里唯一的编译器,因此,我使用语法高亮来确保语法良好。因此,请原谅任何语法错误,并让我知道如果我将任何事情搞砸了,以至于您无法做出正面或反面的结论,我将继续进行。
  • 关于c# - 如何充分依靠I/O适当并行化作业,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/8505815/

    10-10 01:06