问题描述
我在Azure上运行Service Bus,每秒发送大约 10-100条消息.
I'm running Service Bus on Azure, pumping about 10-100 messages per second.
最近我已切换到 .net 4.5 ,所有人都兴奋地重构了所有代码,以使每行中至少有两次'async'和'await '确保已正确完成:)
Recently I've switched to .net 4.5 and all excited refactored all the code to have 'async' and 'await' at least twice in each line to make sure it's done 'properly' :)
现在我想知道这到底是更好还是更糟.如果您可以看一下代码片段,并让我知道您的想法.我特别担心如果线程上下文切换不能给我带来更多痛苦,而不是受益于所有异步...(从!dumpheap来看,这绝对是一个因素)
Now I'm wondering whether it's actually for better or for worse. If you could have a look at the code snippets and let me know what your thoughts are. I especially worried if the thread context switching is not giving me more grief than benefit, from all the asynchrony... (looking at !dumpheap it's definitely a factor)
仅作一点说明-我将发布2种方法-一种在ConcurrentQueue上进行while循环,等待新消息,另一种方法一次发送一条消息.我还完全按照Azure博士的规定使用了瞬态故障处理"块.
Just a bit of description - I will be posting 2 methods - one that does a while loop on a ConcurrentQueue, waiting for new messages and the other method that sends one message at a time. I'm also using the Transient Fault Handling block exactly as Dr. Azure prescribed.
发送循环(从头开始,等待新消息):
Sending loop (started at the beginning, waiting for new messages):
private async void SendingLoop()
{
try
{
await this.RecreateMessageFactory();
this.loopSemaphore.Reset();
Buffer<SendMessage> message = null;
while (true)
{
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
this.semaphore.WaitOne();
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
while (this.queue.TryDequeue(out message))
{
try
{
using (message)
{
//only take send the latest message
if (!this.queue.IsEmpty)
{
this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
continue;
}
else
{
if (this.Topic == null || this.Topic.Path != message.Value.Topic)
await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);
if (this.cancel.Token.IsCancellationRequested)
break;
await this.SendMessage(message, this.cancel.Token);
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
ex.LogError();
}
}
}
}
catch (OperationCanceledException)
{ }
catch (Exception ex)
{
ex.LogError();
}
finally
{
if (this.loopSemaphore != null)
this.loopSemaphore.Set();
}
}
发送消息:
private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
{
//this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
bool entityNotFound = false;
if (this.MessageSender.IsClosed)
{
//this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
await this.EnsureMessageSender(cancellationToken);
}
try
{
await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
{
message.Value.Body.Seek(0, SeekOrigin.Begin);
using (var msg = new BrokeredMessage(message.Value.Body, false))
{
await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
}
}, cancellationToken);
}
catch (MessagingEntityNotFoundException)
{
entityNotFound = true;
}
catch (OperationCanceledException)
{ }
catch (ObjectDisposedException)
{ }
catch (Exception ex)
{
ex.LogError();
}
if (entityNotFound)
{
if (!cancellationToken.IsCancellationRequested)
{
await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
}
}
}
上面的代码来自"Sender"类,该类每秒发送1条消息.在任何给定时间,我大约有50-100个实例在运行,因此可能有很多线程.
The code above is from a 'Sender' class that sends 1 message/second. I have about 50-100 instances running at any given time, so it could be quite a number of threads.
顺便说一句,不要太担心确保消息发送者,重新创建消息工厂,保证主题存在,它们不会经常被调用.
Btw do not worry about EnsureMessageSender, RecreateMessageFactory, EnsureTopicExists too much, they are not called that often.
如果只需要一次发送一条消息,而不用担心异步问题并避免随之而来的开销,那么让一个后台线程在消息队列中工作并同步发送消息是否更好呢?
Would I not be better of just having one background thread working through the message queue and sending messages synchronously, provided all I need is send one message at a time, not worry about the async stuff and avoid the overheads coming with it.
请注意,将一条消息发送到Azure Service Bus通常只需要几毫秒的时间,实际上并不昂贵. (除非速度很慢,超时或Service Bus后端出现问题,否则它可能会在尝试发送内容时挂起一会儿).
Note that usually it's a matter of milliseconds to send one Message to Azure Service Bus, it's not really expensive. (Except at times when it's slow, times out or there is a problem with Service Bus backend, it could be hanging for a while trying to send stuff).
很抱歉,很长的帖子,
Stevo
建议的解决方案
这个例子可以解决我的情况吗?
Would this example be a solution to my situation?
static void Main(string[] args)
{
var broadcaster = new BufferBlock<int>(); //queue
var cancel = new CancellationTokenSource();
var run = Task.Run(async () =>
{
try
{
while (true)
{
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
//async wait until a value is available
var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
int next = 0;
//greedy - eat up and ignore all the values but last
while (broadcaster.TryReceive(out next))
{
Console.WriteLine("Skipping " + val);
val = next;
}
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
Console.WriteLine("Sending " + val);
//simulate sending delay
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine("Value sent " + val);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}, cancel.Token);
//simulate sending messages. One every 200mls
for (int i = 0; i < 20; i++)
{
Console.WriteLine("Broadcasting " + i);
broadcaster.Post(i);
Thread.Sleep(200);
}
cancel.Cancel();
run.Wait();
}
推荐答案
您说:
这是异步的一个很好的例子.您在这里节省了很多线程.异步减少上下文切换,因为它不是基于线程的.如果需要等待,它不会上下文切换.相反,下一个工作项是在同一线程(如果有)上进行处理的.
This is a good case for async. You save lots of threads here. Async reduces context switching because it is not thread-based. It does not context-switch in case of something requiring a wait. Instead, the next work item is being processed on the same thread (if there is one).
因此,您的异步解决方案肯定会比同步解决方案更好地扩展.是否需要在工作流的50-100个实例中实际使用较少的CPU,需要进行衡量.实例越多,异步变得更快的可能性就越大.
For that reason you async solution will definitely scale better than a synchronous one. Whether it actually uses less CPU at 50-100 instances of your workflow needs to be measured. The more instances there are the higher the probability of async being faster becomes.
现在,实现存在一个问题:您使用的不是异步设备的ConcurrentQueue
.因此,即使在异步版本中,您实际上也会使用50-100个线程.它们要么阻塞(您要避免),要么忙于燃烧100%CPU(在您的实现中似乎是这种情况!).您还需要摆脱这个问题,并使队列也异步.也许SemaphoreSlim
在这里很有帮助,因为它可以异步等待.
Now, there is one problem with the implementation: You're using a ConcurrentQueue
which is not async-ready. So you actually do use 50-100 threads even in your async version. They will either block (which you wanted to avoid) or busy-wait burning 100% CPU (which seems to be the case in your implementation!). You need to get rid of this problem and make the queuing async, too. Maybe a SemaphoreSlim
is of help here as it can be waited on asynchronously.
这篇关于Azure ServiceBus和异步-是还是不是?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!