交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9
我知道...我并没有真正使用TplDataflow发挥最大的潜力。 ATM我只是将BufferBlock
用作安全的消息传递队列,其中生产者和使用者以不同的速率运行。我看到一些奇怪的行为使我感到困惑
继续。
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
在上面的代码(这是2000行分布式解决方案的一部分)中,每100毫秒左右周期性地调用一次
Send
。这意味着一个项目在大约每秒10次被Post
编码为messageQueue
。这已得到验证。但是,有时候ReceiveAsync
似乎没有在超时时间内完成(即Post
并未导致ReceiveAsync
完成),并且TimeoutException
在30秒后被提升。此时,messageQueue.Count
已成百上千。这是意外的。这个问题也以较慢的发布速度(每秒发布1个帖子)被观察到,通常在BufferBlock
通过1000个项目之前发生。因此,要变通解决此问题,我使用了下面的代码,虽然可以正常工作,但是在接收时偶尔会导致1 s的延迟(由于上述错误的发生)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
对我来说,这看起来像是TDF中的竞争条件,但我无法弄清楚为什么在其他以类似方式使用
BufferBlock
的地方没有这种情况的原因。从ReceiveAsync
更改为Receive
的实验无济于事。我没有检查,但是我想孤立地看,上面的代码可以完美地工作。我已经在“TPL数据流简介” tpldataflow.docx中看到了这种模式。我该怎么做才能深入浅出?是否有任何指标可以帮助推断正在发生的事情?如果我无法创建可靠的测试用例,我还能提供什么信息?
帮助!
最佳答案
斯蒂芬似乎认为以下是解决方案
您可以确认还是否认?