本文介绍了用于TPL数据流的BroadcastCopyBlock,保证交付的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很高兴在TPL Dataflow中实现以下 BroadcastCopyBlock 的一些输入,该实现将接收到的消息复制到所有使用方,并注册到 BroadcastCopyBlock 并保证传递给所有使用者,这些使用者在收到消息时便已链接到该块。 (与 BroadcastBlock 一样,如果下一个消息传入,则在发送前一个消息给所有使用者之前,它不会保证消息的传递。)

我主要关心的是消息的保留和释放保留。如果接收块决定不处理该消息,将会发生什么?我的理解是,这将导致内存泄漏,因为消息将无限期保留。我在想,我应该以某种方式将邮件标记为未使用,但是我不确定如何。我在考虑一些人工消息接收器( ActionBlock 没有任何操作),还是可以将消息标记为已丢弃?



对实现的进一步投入也很感激。



这大概是以下问题的重复,但是我宁愿使用我自己的类,而不是一种创建块的方法。还是会被认为是不好的风格?

  ///< summary> 
///向多个使用者广播同一条消息。这不会克隆消息,所有使用者都会收到相同的消息
///< / summary>
///< typeparam name = T>< / typeparam>
公共类BrodcastCopyBlock< T> :IPropagatorBlock< T,T>
{
私有ITargetBlock< T>在{ }

///< summary>
///为预订到该块的每个目标都保存一个TransformBlock
///< / summary>
私有只读IDictionary< ITargetBlock< T>,TransformBlock< T,T> _OutBlocks =新的Dictionary< ITargetBlock< T> ;、 TransformBlock< T,T>();


public BrodcastCopyBlock()
{
In = new ActionBlock< T>(message => Process(message));

In.Completion.ContinueWith(task =>
{
if(task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}

///< summary>
///为传递的目标创建一个转换源块。
///< / summary>
///< param name = target>< / param>
private void CreateOutBlock(ITargetBlock< T> target)
{
if(_OutBlocks.ContainsKey(target))
return;

var outBlock = new TransformBlock< T,T>(e => e);
_OutBlocks [target] = outBlock;
}

private void Process(T message)
{
foreach(_OutBlocks.Values中的var outBlock)
{
outBlock.Post (信息);
}
}

///< inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,T messageValue,ISourceBlock< T> source,bool消耗ToAccept)
{
return In.OfferMessage(messageHeader,messageValue,source,
}

///< inheritdoc />
public void Complete()
{
foreach(_OutBlocks.Values中的var outBlock)
{
((ISourceBlock< T>)outBlock).Complete();
}
}

///< inheritdoc />
public void Fault(Exception exception)
{
foreach(_OutBlocks.Values中的var outBlock)
{
((ISourceBlock< T>)outBlock).Fault(例外);
}
}

///< inheritdoc />
公共任务完成=> Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));

///< inheritdoc />
public IDisposable LinkTo(ITargetBlock< T>目标,DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks [target] .LinkTo(target,linkOptions);
}

///< inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader,ITargetBlock< T> target,out bool messageConsumed)
{
return((ISourceBlock< T>)_ OutBlocks [target])。ConsumeMessage(messageHeader,target, out messageConsumed);
}

///< inheritdoc />
公共布尔ReserveMessage(DataflowMessageHeader messageHeader,ITargetBlock< T> target)
{
return((ISourceBlock< T>)_ OutBlocks [target])。ReserveMessage(messageHeader,target);
}

///< inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader,ITargetBlock< T>目标)
{
((ISourceBlock< T>)_ OutBlocks [target])。ReleaseReservation(messageHeader,target);
}
}


解决方案

TL / DR

您的实现使用方法noreferrer> ActionBlock (如果目标拒绝该消息仍会丢失数据),请切换到一个,而且可能不需要全部实现这些方法,您只需要接口实现。






在回到您的主要问题之前,我想先澄清一下。我认为您对库,我想在这里解释一下。您说的是第一个接收消息的使用者,将其从队列中删除与,它涉及链接到,例如:

  var buffer = new BufferBlock< int>(); 
var Consumer1 = new ActionBlock< int>(i => {});
var Consumer2 =新的ActionBlock< int>(i => {Console.WriteLine(i);});
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
//仅用于一个使用者,没有控制台输出
buffer.Post(1);

正是您在说的,请考虑以下代码:

  private static void UnboundedCase()
{
var broadcast = new BroadcastBlock int(i => i);
var fastAction = new ActionBlock< int>(i => Console.WriteLine($ FAST Unbounded Block:{i}));
var slowAction =新的ActionBlock< int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($ SLOW Unbounded Block:{i} );
});
broadcast.LinkTo(slowAction,新的DataflowLinkOptions {PropagateCompletion = true));
broadcast.LinkTo(fastAction,new DataflowLinkOptions {PropagateCompletion = true));
for(var i = 0; i< 3; ++ i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

输出将为

  FAST无界块:0 
FAST无界块:1
FAST无界块:2
SLOW无界块:0
SLOW无界阻止:1
慢速无界阻止:2

但是,只有在传入数据的速度小于处理数据的速度,因为在其他情况下,由于缓冲区的增加,您的内存将很快耗尽,正如您在问题中所述。让我们看看如果使用用于限制慢块的传入数据缓冲区:

  private static void BoundedCase()
{
var广播=新的BroadcastBlock< int>(i => i);
var fastAction = new ActionBlock< int>(i => Console.WriteLine($ FAST Bounded Block:{i}));
var slowAction =新的ActionBlock< int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($ SLOW Bounded Block:{i} );
},新的ExecutionDataflowBlockOptions {BoundedCapacity = 2});
broadcast.LinkTo(slowAction,新的DataflowLinkOptions {PropagateCompletion = true));
broadcast.LinkTo(fastAction,new DataflowLinkOptions {PropagateCompletion = true));
for(var i = 0; i< 3; ++ i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

输出将为

  Fast有界块:0 
Fast有界块:1
Fast有界块:2
SLOW有界块:0
SLOW有界阻止:1

如您所见,我们的慢块丢失了最后一条消息,这不是我们的意思正在找。原因是默认情况下使用方法来传递消息。根据:

因此,此方法可以帮助我们完成任务,让我们介绍一些包装器,它可以完全满足我们的要求-真实处理器的数据:

  private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock< int(i => i);
var fastAction = new ActionBlock< int>(i => Console.WriteLine($ FAST Wrapper Block:{i}));
var slowAction =新的ActionBlock< int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($ Slow Wrapper Block:{i} );
},新的ExecutionDataflowBlockOptions {BoundedCapacity = 2});
var fastActionWrapper = new ActionBlock< int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock< int>(i => slowAction.SendAsync(i));

broadcast.LinkTo(slowActionWrapper,新的DataflowLinkOptions {PropagateCompletion = true));
broadcast.LinkTo(fastActionWrapper,new DataflowLinkOptions {PropagateCompletion = true});
for(var i = 0; i< 3; ++ i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

输出将为

  FAST无界块:0 
FAST无界块:1
FAST无界块:2
SLOW无界块:0
SLOW无界阻止:1
慢速无界阻止:2

但是等待永远不会结束-我们的基本包装程序不会传播链接块的完成,并且不能链接到任何东西。我们可以尝试等待包装器完成:

  private static void BoundedWrapperFiniteCase()
{
var广播=新的BroadcastBlock< int>(i => i);
var fastAction = new ActionBlock< int>(i => Console.WriteLine($ FAST有限块:{i})));
var slowAction =新的ActionBlock< int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($ SLOW有限块:{i} );
},新的ExecutionDataflowBlockOptions {BoundedCapacity = 2});
var fastActionWrapper = new ActionBlock< int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock< int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper,新的DataflowLinkOptions {PropagateCompletion = true});
broadcast.LinkTo(fastActionWrapper,new DataflowLinkOptions {PropagateCompletion = true});
for(var i = 0; i< 3; ++ i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}

输出将为

  FAST有限块:0 
FAST有限块:1
FAST有限块:2
SLOW有限块:0

这绝对不是我们想要的-完成了所有工作,并且不会等待发布最后一条消息。此外,我们甚至看不到第二条消息,因为我们在 Sleep 方法结束之前退出了该方法!因此,您肯定需要您自己的实现。



现在,最后,对您的代码有一些想法:


  1. 您不需要实现大量的方法-您的包装器将用作,因此仅实现该接口。

  2. 您的实现使用

  3. 在上次更改之后,您应该评估数据流的性能-如果有很多异步等待数据传递,您可能会看到性能和/或内存问题。此问题应通过。

  4. 您对任务实际上颠倒了数据流的顺序-您正在等待目标完成,正如我认为的那样,这不是一个好习惯-您可能应该创建数据流的结尾块(甚至可以是块,它只是同步地删除传入的消息),并等待其完成。


I would be glad for some input on the following implementation of a BroadcastCopyBlock in TPL Dataflow, which copies a received message to all consumers, that registered to the BroadcastCopyBlock and guarantees delivery to all consumers, which are linked to the block at the time it receives the message. (Unlike the BroadcastBlock which does not guarntee delivery of messages, if the next one comes in, before the former message has been delivered to all consumers).

My main concern is the reserving of messages and releasing of reservations. What would happen, if a receiving block decides to not handle the message? My understanding is, this would create a memory leak, since the message would be kept indefinitely. I'm thinking, that I should somehow mark the message as unused, but I'm not sure, how. I was thinking about some artificial message sink (an ActionBlock with no action), or can I just mark a message as discarded?

Further Input on the implementation is also appreciated.

This is probably almost a duplicate of the following question, but I would prefer to use my own class, instead of a method to create the block. Or would that be considered bad style?
BroadcastBlock with Guaranteed Delivery in TPL Dataflow

/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
    private ITargetBlock<T> In { get; }

    /// <summary>
    /// Holds a TransformBlock for each target, that subscribed to this block
    /// </summary>
    private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();


    public BrodcastCopyBlock()
    {
        In = new ActionBlock<T>(message => Process(message));

        In.Completion.ContinueWith(task =>
                                   {
                                       if (task.Exception == null)
                                           Complete();
                                       else
                                           Fault(task.Exception);
                                   }
          );
    }

    /// <summary>
    /// Creates a transform source block for the passed target.
    /// </summary>
    /// <param name="target"></param>
    private void CreateOutBlock(ITargetBlock<T> target)
    {
        if (_OutBlocks.ContainsKey(target))
            return;

        var outBlock = new TransformBlock<T, T>(e => e);
        _OutBlocks[target] = outBlock;
    }

    private void Process(T message)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            outBlock.Post(message);
        }
    }

    /// <inheritdoc />
    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

    /// <inheritdoc />
    public void Complete()
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Complete();
        }
    }

    /// <inheritdoc />
    public void Fault(Exception exception)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Fault(exception);
        }
    }

    /// <inheritdoc />
    public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));

    /// <inheritdoc />
    public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
    {
        CreateOutBlock(target);
        return _OutBlocks[target].LinkTo(target, linkOptions);
    }

    /// <inheritdoc />
    public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    /// <inheritdoc />
    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
    }

    /// <inheritdoc />
    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        ((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
    }
}
解决方案

TL/DR
Your implementation uses the Post method inside the ActionBlock, which still will lose the data if target rejects the message, switch to the SendAsync one, and, probably, you don't need to implenment all these methods, you need only ITargetBlock<in TInput> interface implementation.


I want to clarify something before coming back to your main question. I think that you are confused by some options from TPL Dataflow library, and I want explain them a bit here. The behavior you're saying The first consumer, which receives the message, deletes it from the queue is not about the BroadcastBlock, it is about the multiple consumers linked for an ISourceBlock, like BufferBlock:

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);

What the BroadcastBlock do is exactly what are you talking about, consider this code:

private static void UnboundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Unbounded Block: {i}");
        });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

The output will be

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

However, this can be done only is the speed of incoming data is less than the speed of processing the data, because in other case your memory will end up quickly because of buffers grow, as you stated in your question. Let's see what will happen if we use the ExecutionDataflowBlockOptions for limit the incoming data buffer for a slow block:

private static void BoundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Bounded Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

The output will be

FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1

As you can see, our slow block lost the last message, which is not what we are looking for. The reason for this is that the BroadcastBlock, by default, uses the Post method to deliver messages. According official Intro Document:

So, this method could help us in our mission, let's introduce some wrapper ActionBlock, which do exactly what we want - SendAsync the data for our real processors:

private static void BoundedWrapperInfiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
    {
        Thread.Sleep(2000);
        Console.WriteLine($"SLOW Wrapper Block: {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));

    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

The output will be

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

But this waiting will never end - our basic wrapper does not propagate the completion for linked blocks, and the ActionBlock can't be linked to anything. We can try to wait for an wrapper completion:

private static void BoundedWrapperFiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW finite Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowActionWrapper.Completion.Wait();
}

The output will be

FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0

Which is definitely not what we wanted - ActionBlock finished all the job, and the posting for a last message wouldn't be awaited for. Moreover, we don't even see the second message because we exit the method before Sleep method ends! So you definitely need your own implementation for this.

Now, at last, some thoughts about your code:

  1. You do not need such large amount of methods being implemented - your wrapper will be used as ITargetBlock<in TInput>, so implement only that interface.
  2. Your implementation uses the Post method inside the ActionBlock, which, as we saw, could lead to data loss in case of some problems on consumer's side. Consider SendAsync method instead.
  3. After previous change, you should measure up the performance of your dataflow - if you got many async waits for data to deliver, you probably will see the performance and/or memory problems. This should be fixed with some advanced settings which are discussed in linked documentation.
  4. Your implementation of the Completion task actually reverses the order of your dataflow - you are waiting for targets to complete, which, as I think, is not good practice - you probably should create an ending block for your dataflow (this could be even NullTarget block, which simply synchronously drops the incoming message), and wait for it to be completed.

这篇关于用于TPL数据流的BroadcastCopyBlock,保证交付的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 07:43