我有一个要向其发布消息的BufferBlock:

public class DelimitedFileBlock : ISourceBlock<string>
{
    private ISourceBlock<string> _source;
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

    //Read a file
    While(!eof)
        row = read one row
    //if consumers are slow, then sleep for a while
    while(!(_source as BufferBlock<string>).Post<string>(row))
    {
        Thread.Sleep(5000);
    }
}

这是一个具有2400万行的5GB文件。

我现在有一个使用ActionBlock的Target块:
public class SolaceTargetBlock : ITargetBlock<string>
       private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

       public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
    {
        //post to another block to publish
        bool success = _publishToSolaceBlock.Post(messageValue);

现在,在控制台应用程序中,我指定:
 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam",
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam",
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam",
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

 DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

我仅将有限容量作为1进行测试。

现在,我使用LinkTo将这三个使用者链接到我的源:
 delimitedFileBlock.LinkTo(solaceTargetBlock1);
 delimitedFileBlock.LinkTo(solaceTargetBlock2);
 delimitedFileBlock.LinkTo(solaceTargetBlock3);

这将在10003行之后转到Thread.Sleep(5000)语句,而while循环中的Post始终返回false。

我期待有LinkTo,因此solaceTargetBlocks完成后将能够选择下一条消息,但是LinkTo不会清除BufferBlock。因此,我该如何在多个使用者之间进行负载平衡。我是否必须接收并编写一个简单的负载平衡逻辑以在使用者之间分配?

最佳答案

Post method(重点是我的)上 DataflowBlock<T> class的文档中:



这意味着目标可以选择拒绝阻止(这是您所看到的行为)。

进一步,它指出:



这意味着您可能会有更好的结果(取决于目标块),因为您的消息可能会被推迟,但仍会被处理,而不是被完全拒绝。

我想象 BoundedCapacity property和三个 BufferBlock<T> 实例上的 ActionBlock<TInput> 设置与您看到的内容有关:

  • BufferBlock<T>上的最大缓冲区为10000;一旦将10,000个项目放入队列,它将拒绝其余项目(请参阅上面的第二个引号),因为它无法处理它们( SendAsync 在这里也不起作用,因为它无法缓冲要延迟的消息)。
  • ActionBlock<TInput>实例上的最大缓冲区为1,并且其中有三个。

  • 10,000 +(1 * 3)= 10,000 + 3 = 10,003

    为了解决这个问题,您需要做一些事情。

    首先,在创建MaxDegreeOfParallelism实例时,需要为 ExecutionDataFlowBlockOptions property ActionBlock<TInput> 设置一个更合理的值。

    默认情况下,MaxDegreeOfParallelismActionBlock<TInput>设置为1;这保证了调用将被序列化,并且您不必担心线程安全性。如果要让ActionBlock<T>关注线程安全,请保留此设置。

    如果ActionBlock<TInput>是线程安全的,则没有理由限制它,因此应将MaxDegreeOfParallelism设置为 DataflowBlockOptions.Unbounded

    如果您正在访问ActionBlock<TInput>中的某种共享资源(可以在有限的基础上同时访问),则可能是您做错了。

    如果您有某种共享资源,那么您应该将其运行在另一个块中,并在其上设置MaxDegreeOfParallelism

    其次,如果您关注吞吐量并且可以放置项目,则应设置BoundedCapacity属性。

    另请注意,您指出“如果消费者行动缓慢,请睡一会儿”;如果正确地连接了块,则没有理由这样做,您应该让数据流过,并仅将限制放置在需要的地方。您的生产者不应该为限制消费者负责,而应该让消费者为限制消费者负责。

    最后,您的代码看起来不需要自己实现数据流模块接口(interface)。您可以这样构造它:
    // The source, your read lines will be posted here.
    var delimitedFileBlock = new BufferBlock<string>();
    
    // The Action for the action blocks.
    Action<string> action =
        s => { /* Do something with the string here. */ };
    
    // Create the action blocks, assuming that
    // action is thread-safe, no need to have it process one at a time
    // or to bound the capacity.
    var solaceActionBlock1 = new ActionBlock<string>(action,
        new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
        });
    var solaceActionBlock2 = new ActionBlock<string>(action,
        new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
        });
    var solaceActionBlock3 = new ActionBlock<string>(action,
        new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
        });
    
    // Link everything.
    delimitedFileBlock.LinkTo(solaceTargetBlock1);
    delimitedFileBlock.LinkTo(solaceTargetBlock2);
    delimitedFileBlock.LinkTo(solaceTargetBlock3);
    
    // Now read the file, and post to the BufferBlock<T>:
    // Note: This is pseudo-code.
    while (!eof)
    {
        // Read the row.
        string row = ...;
    
        delimitedFileBlock.Post(read);
    }
    

    还要注意,没有三个ActionBlock<TInput>实例是没有必要的,除非您需要将输出过滤到不同的操作(您不在此处进行操作),所以上述内容实际上简化为(假设您的操作是线程安全的,因此您可以无论如何都会将MaxDegreeOfParallelism增加到Unbounded):
    // The source, your read lines will be posted here.
    var delimitedFileBlock = new BufferBlock<string>();
    
    // The Action for the action blocks.
    Action<string> action =
        s => { /* Do something with the string here. */ };
    
    // Create the action blocks, assuming that
    // action is thread-safe, no need to have it process one at a time
    // or to bound the capacity.
    var solaceActionBlock1 = new ActionBlock<string>(action,
        new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
        });
    
    // Link everything.
    delimitedFileBlock.LinkTo(solaceTargetBlock);
    
    // Now read the file, and post to the BufferBlock<T>:
    // Note: This is pseudo-code.
    while (!eof)
    {
        // Read the row.
        string row = ...;
    
        delimitedFileBlock.Post(read);
    }
    

    10-08 14:45