我有一个简单的程序,可以迭代实现为反馈枚举器的无尽枚举。我已经在TPL和PLINQ中实现了这一点。两个示例均在可预测的迭代次数后锁定:PLINQ为8,TPL为3。如果不使用TPL / PLINQ即可执行代码,则运行良好。我已经以非线程安全的方式以及线程安全的方式实现了枚举数。如果并行度限制为一个(如示例中的情况),则可以使用前者。非线程安全的枚举器非常简单,并且不依赖于任何“花哨的” .NET库类。如果我增加并行度,则死锁之前执行的迭代次数会增加,例如,对于PLINQ,迭代数为8 *并行度。

这是迭代器:
枚举器(非线程安全的)

public class SimpleEnumerable<T>: IEnumerable<T>
{
    private T _value;
    private readonly AutoResetEvent _releaseValueEvent = new AutoResetEvent(false);

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public IEnumerator<T> GetEnumerator()
    {
        while(true)
        {
            _releaseValueEvent.WaitOne();
            yield return _value;
        }
    }

    public void OnNext(T value)
    {
        _value = value;
        _releaseValueEvent.Set();
    }
}


枚举器(线程安全)

public class SimpleEnumerable<T>: IEnumerable<T>
{
    private readonly BlockingCollection<T> _blockingCollection = new BlockingCollection<T>();

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public IEnumerator<T> GetEnumerator()
    {
        while(true)
        {
            yield return _blockingCollection.Take();
        }
    }

    public void OnNext(T value)
    {
        _blockingCollection.Add(value);
    }
}


PLINQ示例:

public static void Main(string[] args)
{
    var enumerable = new SimpleEnumerable<int>();
    enumerable.OnNext(0);

    enumerable
        .Do(i => Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}"))
        .AsParallel()
        .WithDegreeOfParallelism(1)
        .ForEach
        (
            i =>
            {
                Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
                enumerable.OnNext(i+1);
            }
        );
}


TPL示例:

public static void Main(string[] args)
{
    var enumerable = new SimpleEnumerable<int>();
    enumerable.OnNext(0);

    Parallel.ForEach
    (
        enumerable,
        new ParallelOptions { MaxDegreeOfParallelism = 1},
        i =>
        {
            Debug.WriteLine($"{i} {Thread.CurrentThread.ManagedThreadId}");
            enumerable.OnNext(i+1);
        }
    );
}


根据我对调用栈的分析,似乎在PLINQ和TPL中与分区程序相关的方法中都出现了死锁,但是我不确定如何解释这一问题。

通过反复试验,我发现将PLINQ enumerable包裹在Partitioner.Create(enumerable, EnumerablePartitionerOptions.NoBuffering)中可以解决此问题,但是我不确定为什么会发生死锁。

我将非常有兴趣找出该错误的根本原因。

请注意,这是一个人为的示例。我不是在批评代码,而是为什么会发生僵局。具体来说,在PLINQ示例中,如果.AsParallel().WithDegreeOfParallelism(1)行被注释掉,则代码可以正常工作。

最佳答案

实际上,您实际上没有逻辑上的值序列,因此,首先尝试创建IEnumerable根本没有任何意义。此外,几乎可以肯定,您不应该尝试创建可由多个线程使用的IEnumerator。之所以存在疯狂,仅仅是因为IEnumerator公开的界面并没有真正公开您想要实现的目标。您可以潜在地创建IEnumerator,该线程只能由单个线程使用,而该线程将根据多个线程使用的基础数据源来计算要返回的数据,因为这非常不同。

如果只想创建在不同线程中运行的生产者和使用者,则不要在BlockingCollection周围创建自己的“包装器”,*只需使用BlockingCollection。让生产者添加它,而消费者从中读取。如果消费者只是想在拿走这些物品时对它们进行迭代(通常要做的操作),则可以使用GetConsumingEnumerable

关于c# - 循环枚举器的PLINQ迭代导致死锁,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40873020/

10-13 00:08