For与ThreadPool和异步

For与ThreadPool和异步

本文介绍了Parallel.For与ThreadPool和异步/等待的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以异步方式处理5000个文件,而又不限制线程池的增长.但是,Parallel.For循环并没有给我一个一致的正确答案(计数很快就出现了),而Task.Run是.

I'm trying to process 5000 files in an async manner without growing the Threadpool unrestricted. The Parallel.For loop however, is not giving me a consistent correct answer (count comes up short), while the Task.Run is.

导致这些错误答案的Parallel.For循环中我在做什么?

What am I doing wrong in the Parallel.For loop that is causing these incorrect answers?

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static volatile int count = 0;
    static volatile int total = 0;
    static void Main(string[] args)
    {
        Parallel.For(0, 5000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
            async (index) =>
            {
                string filePath = $"C:\\temp\\files\\out{index}.txt";
                var bytes = await ReadFileAsync(filePath);
                Interlocked.Add(ref total, bytes.Length);
                Interlocked.Increment(ref count);
            });
        Console.WriteLine(count);
        Console.WriteLine(total);

        count = 0;
        total = 0;
        List<Task> tasks = new List<Task>();
        foreach (int index in Enumerable.Range(0, 5000))
        {
            tasks.Add(Task.Run(async () =>
            {
                string filePath = $"C:\\temp\\files\\out{index}.txt";
                var bytes = await ReadFileAsync(filePath);
                Interlocked.Add(ref total, bytes.Length);
                Interlocked.Increment(ref count);
            }));
        }
        Task.WhenAll(tasks).Wait();
        Console.WriteLine(count);
        Console.WriteLine(total);
    }
    public static async Task<byte[]> ReadFileAsync(string filePath)
    {
        byte[] bytes = new byte[4096];
        using (var sourceStream = new FileStream(filePath,
                FileMode.Open, FileAccess.Read, FileShare.Read,
                bufferSize: 4096, useAsync: true))
        {
            await sourceStream.ReadAsync(bytes, 0, 4096);
        };
        return bytes;
    }
}

推荐答案

Parallel.For不了解async.

因此,Parallel.For的运行不符合您的预期.由于异步lambda生成的任务没有等待,因此所有迭代将在创建而不是完成任务所需的时间内完成.

As such, the Parallel.For is not performing as you expect. Because the task generated by the async lambda is not waited for, all of the iterations will complete in the time it takes to create the tasks, not complete them.

Parallel.For之后,许多迭代仍将具有尚未完成的待处理任务,因此,您对counttotal的添加尚未完成.

After your Parallel.For, a number of iterations will still have a pending task that is not yet complete, and therefore, your additions to count and total have not yet completed.

Stephen Toub已经实现了Parallel.ForEach的异步版本. ( ForEachAsync )实现如下:

Stephen Toub has implemented an async version of Parallel.ForEach. (ForEachAsync) The implementation is as follows:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

所以您可以重写循环:

Enumerable.Range(0, 5000).ForEachAsync(10, async (index)=>{
   //$$$
});

这篇关于Parallel.For与ThreadPool和异步/等待的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 10:04