当创建具有有限容量的批处理块并在发布新项目的同时(与之并行)调用triggerBatch-在触发批处理执行期间,发布新项目将失败。

为了确保在传入数据流暂停或减慢的情况下,数据不会在块中延迟太长时间,将调用触发器批处理(每X次)。

以下代码将输出一些“发布失败”事件。
例如:

    public static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
        batchBlock.LinkTo(actionBlock);

        var producerTask = Task.Factory.StartNew(() =>
        {
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            {
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            }
        });

        var triggerBatchTask = Task.Factory.StartNew(() =>
            {
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            });

        producerTask.Wait();
        triggerBatchTask.Wait();
    }

    public static void ProcessBatch(int[] batch)
    {
        Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
    }

*请注意,只有在batchBlock为边界时,此方案才可重现。

我是否缺少某些内容?或者batchBlock有问题吗?

最佳答案

BatchBlock并未真正拒绝该项目,而是尝试将其推迟。除非是Post(),否则不可以选择推迟。解决此问题的一种简单方法是使用await batchBlock.SendAsync(i)而不是batchBlock.Post(i)(这也意味着您需要将Task.Factory.StartNew(() =>更改为Task.Run(async () =>)。

为什么会这样?根据the source code的定义,如果BatchBlock是有界的,则TriggerBatch()是异步处理的,并且在处理过程中,不会接受任何新项目。

无论如何,您都不应该期望Post()总是在有界块上返回true,如果该块已满,则Post()也将返回false

09-05 12:49