问题描述
我正在尝试理解 C# 中的任务,但仍有一些问题.我正在尝试创建一个包含视频的应用程序.主要目的是从文件中读取视频(我使用的是 Emgu.CV)并通过 TCP/IP 将其发送到板中进行处理,然后以流(实时)方式返回.首先,我是连续做的.因此,读取Bitmap
,从板子发送-接收,并绘图.但是读取位图并绘制它们需要太多时间.我想要一个传输、接收 FIFO 缓冲区来保存视频帧,以及一个不同的任务来完成发送接收每一帧的工作.所以我想同时进行.我想我应该创建 3 个任务:
I am trying to understand Tasks in C# but still having some problems. I am trying to create an application containing video. The main purpose is to read the video from a file (I am using Emgu.CV) and send it via TCP/IP for process in a board and then back in a stream (real-time) way. Firstly, I did it in serial. So, reading a Bitmap
, sending-receiving from board, and plotting. But reading the bitmaps and plotting them takes too much time. I would like to have a Transmit, Receive FIFO Buffers that save the video frames, and a different task that does the job of sending receiving each frame. So I would like to do it in parallel. I thought I should create 3 Tasks:
tasks.Add(Task.Run(() => Video_load(video_path)));
tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
tasks.Add(Task.Run(() => VideoDisp_hw(32)));
我想运行并行".我应该使用什么类型的对象?并发队列?缓冲块?还是只是一个列表?
Which I would like to run "parallel". What type of object should I use? A concurrent queue? BufferBlock? or just a list?
多谢指教!我想问一件事.我正在尝试使用 2 个 TPL 块创建一个简单的控制台程序.1 Block 将是 Transform 块(获取消息,即开始")并将数据加载到列表中,另一个块将是 ActionBlock(仅从列表中读取数据并打印它们).代码如下:
Thanks for the advices! I would like to ask something. I am trying to create a simple console program with 2 TPL blocks. 1 Block would be Transform block (taking a message i.e. "start" ) and loading data to a List and another block would be ActionBlock (just reading the data from the list and printing them). Here is the code below:
namespace TPL_Dataflow
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
Random randn = new Random();
var loadData = new TransformBlock<string, List<int>>(async sample_string =>
{
List<int> input_data = new List<int>();
int cnt = 0;
if (sample_string == "start")
{
Console.WriteLine("Inside loadData");
while (cnt < 16)
{
input_data.Add(randn.Next(1, 255));
await Task.Delay(1500);
Console.WriteLine("Cnt");
cnt++;
}
}
else
{
Console.WriteLine("Not started yet");
}
return input_data;
});
var PrintData = new ActionBlock<List<int>>(async input_data =>
{
while(input_data.Count > 0)
{
Console.WriteLine("output Data = " + input_data.First());
await Task.Delay(1000);
input_data.RemoveAt(0);
}
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
loadData.LinkTo(PrintData, input_data => input_data.Count() >0 );
//loadData.LinkTo(PrintData, linkOptions);
loadData.SendAsync("start");
loadData.Complete();
PrintData.Completion.Wait();
}
}
}
但它似乎以串行方式工作..我做错了什么?我尝试异步执行 while 循环.我想同时做两件事.当列表中的数据可用时,然后绘制.
But it seems to work in serial way.. What am I doing wrong? I tried to do the while loops async. I would like to do the 2 things in parallel. When data available from the List then plotted.
推荐答案
您可以使用 TransformManyBlock
作为生产者块,ActionBlock
作为生产者块消费者块.TransformManyBlock
将使用接受 Func>
委托的构造函数实例化,并传递一个 迭代器方法(下面示例中的 Produce
方法),它会一一生成值:
You could use a TransformManyBlock<string, int>
as the producer block, and an ActionBlock<int>
as the consumer block. The TransformManyBlock
would be instantiated with the constructor that accepts a Func<string, IEnumerable<int>>
delegate, and passed an iterator method (the Produce
method in the example below) that yields values one by one:
Random random = new Random();
var producer = new TransformManyBlock<string, int>(Produce);
IEnumerable<int> Produce(string message)
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
yield return value;
Thread.Sleep(1500);
cnt++;
}
}
else
{
yield break;
}
}
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
producer.LinkTo(consumer, new() { PropagateCompletion = true });
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
不幸的是,生产者必须在产生每个值之间的空闲期间阻塞工作线程 (Thread.Sleep(1500);
),因为 TransformManyBlock
当前没有有一个接受 Func>
的构造函数.这可能会在 TPL 数据流 库.您可以跟踪this GitHub 问题,以获悉此功能何时发布.
Unfortunately the producer has to block the worker thread during the idle period between yielding each value (Thread.Sleep(1500);
), because the TransformManyBlock
currently does not have a constructor that accepts a Func<string, IAsyncEnumerable<int>>
. This will be probably fixed in the next release of the TPL Dataflow library. You could track this GitHub issue, to be informed about when this feature will be released.
替代解决方案:您可以不将生产者和消费者明确链接起来,而是将它们保持不链接,并手动将生产者产生的值发送给消费者.在这种情况下,两个块都是 ActionBlock
s:
Alternative solution: Instead of linking explicitly the producer and the consumer, you could keep them unlinked, and send manually the values produced by the producer to the consumer. In this case both blocks would be ActionBlock
s:
Random random = new Random();
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
var producer = new ActionBlock<string>(async message =>
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
var accepted = await consumer.SendAsync(value);
if (!accepted) break; // The consumer has failed
await Task.Delay(1500);
cnt++;
}
}
});
PropagateCompletion(producer, consumer);
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
}
这种方法的主要困难在于如何将生产者的完成传播给消费者,从而最终两个块都完成.显然您不能使用 new DataflowLinkOptions { PropagateCompletion = true }
配置,因为这些块没有明确链接.您也不能手动Complete
消费者,因为在这种情况下,它会过早地停止接受来自生产者的值.解决这个问题的方法就是上面例子中的PropagateCompletion
方法.
The main difficulty with this approach is how to propagate the completion of the producer to the consumer, so that eventually both blocks are completed. Obviously you can't use the new DataflowLinkOptions { PropagateCompletion = true }
configuration, since the blocks are not linked explicitly. You also can't Complete
manually the consumer, because in this case it would stop prematurely accepting values from the producer. The solution to this problem is the PropagateCompletion
method shown in the above example.
这篇关于异步任务,视频缓冲的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!