问题描述
我有一个类似于以下的处理程序,它本质上是响应一个命令并将一大堆命令发送到不同的队列.
I have a handler similar to the following, which essentially responds to a command and sends a whole bunch of commands to a different queue.
public void Handle(ISomeCommand message)
{
int i=0;
while (i < 10000)
{
var command = Bus.CreateInstance<IAnotherCommand>();
command.Id = i;
Bus.Send("target.queue@d1555", command);
i++;
}
}
这个块的问题是,在循环完全完成之前,没有消息出现在目标队列或传出队列中.有人能帮我理解这种行为吗?
The issue with this block is, until the loop is fully completed none of the messages appear in the target queue or in the outgoing queue. Can someone help me understand this behavior?
此外,如果我使用任务在处理程序中发送消息,如下所示,消息会立即出现.所以有两个问题,
Also if I use Tasks to send messages within the Handler as below, messages appear immediately. So two questions on this,
- 对基于任务的发送立即执行的解释是什么?
在消息处理程序中使用任务有什么后果吗?
- What's the explanation on Task based Sends to go through immediately?
Are there are any ramifications on using Tasks with in message handlers?
public void Handle(ISomeCommand message)
{
int i=0;
while (i < 10000)
{
System.Threading.ThreadPool.QueueUserWorkItem((args) =>
{
var command = Bus.CreateInstance<IAnotherCommand>();
command.Id = i;
Bus.Send("target.queue@d1555", command);
i++;
});
}
}
非常感谢您的时间!
推荐答案
第一个问题: 从队列中挑选消息,为其运行所有已注册的消息处理程序以及任何其他事务性操作(如写入新消息或对数据库的写入)在一个事务中执行.要么全部完成,要么全都不完成.所以你看到的是预期的行为:从队列中选择一条消息,处理 ISomeCommand 并写入 10000 个新的 IAnotherCommand 要么完全完成,要么什么都不做.要避免这种行为,您可以执行以下操作之一:
First question: Picking a message from a queue, running all the registered message handlers for it AND any other transactional action(like writing new messages or writes against a database) is performed in ONE transaction. Either it all completes or none of it. So what you are seeing is the expected behaviour: picking a message from the queue, handling ISomeCommand and writing 10000 new IAnotherCommand is either done completely or none of it. To avoid this behaviour you can do one of the following:
将您的 NServiceBus 端点配置为非事务性
Configure your NServiceBus endpoint to not be transactional
public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher,IWantCustomInitialization
{
public void Init()
{
Configure.With()
.DefaultBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(false)
.UnicastBus();
}
}
将 IAnotherCommand 的发送包装在抑制环境事务的事务范围内.
Wrap the sending of IAnotherCommand in a transaction scope that suppresses the ambient transaction.
public void Handle(ISomeCommand message)
{
using (new TransactionScope(TransactionScopeOption.Suppress))
{
int i=0;
while (i < 10000)
{
var command = Bus.CreateInstance();
command.Id = i;
Bus.Send("target.queue@d1555", command);
i++;
}
}
}
通过使用 System.Threading.ThreadPool.QueueUserWorkItem 或 Task 类自己启动一个新线程,在另一个线程上发出 Bus.Send.这是有效的,因为环境事务不会自动转移到新线程.
Issue the Bus.Send on another thread, by either starting a new thread yourself, using System.Threading.ThreadPool.QueueUserWorkItem or the Task classes. This works because an ambient transaction is not automatically carried over to a new thread.
第二个问题: 使用 Tasks 或我提到的任何其他方法的后果是,您对整个事情没有事务性保证.
Second question: The ramifications of using Tasks, or any of the other methods I mentioned, is that you have no transactional quarantee for the whole thing.
你生成了5000条IAnotherMessage,突然没电了怎么办?
How do you handle the case where you have generated 5000 IAnotherMessage and the power suddenly goes out?
如果您使用 2) 或 3) 原始 ISomeMessage 将不会完成,并会在您再次启动端点时由 NServiceBus 自动重试.最终结果:5000 + 10000 IAnotherCommands.
If you use 2) or 3) the original ISomeMessage will not complete and will be retried automatically by NServiceBus when you start up the endpoint again. End result: 5000 + 10000 IAnotherCommands.
如果你使用 1) 你将完全失去 IAnotherMessage 并且最终只有 5000 个 IAnotherCommands.
If you use 1) you will lose IAnotherMessage completely and end up with only 5000 IAnotherCommands.
使用推荐的事务方式,最初的 5000 个 IAnotherCommands 将被丢弃,原始 ISomeMessage 返回队列并在端点再次启动时重试.最终结果:10000 IAnotherCommands.
Using the recommended transactional way, the initial 5000 IAnotherCommands would be discarded, the original ISomeMessage comes back on the queue and is retried when the endpoint starts up again. Net result: 10000 IAnotherCommands.
这篇关于处理程序中的递归 Bus.Send()(事务、线程、任务)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!