本文介绍了备用数据流来与BroadcastBlock保证交付的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要有某种形式的对象,它就像一个BroadcastBlock,但同时保证交付。所以我用从一个答案this问题。但我真的不清楚地了解这里的执行流程。我有一个控制台应用程序。这里是我的code:

I need to have some kind of object that acts like a BroadcastBlock, but with guaranteed delivery. So i used an answer from this question. But i don't really clearly understand the execution flow here. I have a console app. Here is my code:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num =>
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

每个号码必须按顺序来处理,所以我不能在广播块中使用MaxDegreeOfParallelism。但接收号码的所有actionblocks可以并行运行。

Each number must be handled sequentially, so i can't use MaxDegreeOfParallelism in broadcaster block. But all actionblocks that receive the number can run in parallel.

因此​​,这里的问题是:

So here is the question:

在输出我可以看到不同的线程ID。我是否理解正确的工作方式如下:

In the output i can see different thread ids. Do i understand it correctly that works as follows:

执行命中等待block.SendAsync(NUM)。
如果当前块是没有准备好接受的数目,执行退出广播并在Task.WaitAll挂起。
当块接受的数目,foreach语句在广播,其余的线程池被执行。
而同样的,直到结束。
的foreach的每次迭代是在一个线程池执行。但实际上它发生的顺序。

Execution hits await block.SendAsync(num); in a broadcaster.If current block is not ready to accept the number, execution exits broadcaster and hangs at the Task.WaitAll.When block accepts the number, the rest of foreach statement in broadcaster is executed in a threadpool.And the same till the end.Each iteration of foreach is executed in a threadpool. But actually it happens sequentially.

我说得对还是错在我的理解?
我怎样才能改变这种code发送异步所有块数?

Am i right or wrong in my understanding?How can i change this code to send the number to all blocks asynchronously?

要确保,如果一个程序段不准备在瞬间收到的号码,我不会等待它和准备将收到的号码所有其他人。而所有的块可以并行运行。并保证交货。

To make sure that if one of blocks is not ready to receive the number at the moment, i won't wait for it and all others that are ready will receive the number. And that all blocks can run in parallel. And guarantee delivery.

推荐答案

假设你想通过广播同时使目标块接收时间来处理一个项目该项目同时需要修改广播提供给所有块的数量在同一时间,然后异步等待所有的一起来移动到下一个数字之前接受它:

Assuming you want to handle one item at a time by the broadcaster while enabling the target blocks to receive that item concurrently you need to change the broadcaster to offer the number to all blocks at the same time and then asynchronously wait for all of them together to accept it before moving on to the next number:

var broadcaster = new ActionBlock<int>(async num =>
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

现在,在这种情况下,你不必下班后等着你可以稍微优化,同时还返回一个awaitable任务:

Now, in this case where you don't have work after the await you can slightly optimize while still returning an awaitable task:

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);

这篇关于备用数据流来与BroadcastBlock保证交付的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 03:44
查看更多