本文介绍了Azure ServiceBus和异步-是还是不是?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Azure上运行Service Bus,每秒发送大约 10-100条消息.

I'm running Service Bus on Azure, pumping about 10-100 messages per second.

最近我已切换到 .net 4.5 ,所有人都兴奋地重构了所有代码,以使每行中至少有两次'async'和'await '确保已正确完成:)

Recently I've switched to .net 4.5 and all excited refactored all the code to have 'async' and 'await' at least twice in each line to make sure it's done 'properly' :)

现在我想知道这到底是更好还是更糟.如果您可以看一下代码片段,并让我知道您的想法.我特别担心如果线程上下文切换不能给我带来更多痛苦,而不是受益于所有异步...(从!dumpheap来看,这绝对是一个因素)

Now I'm wondering whether it's actually for better or for worse. If you could have a look at the code snippets and let me know what your thoughts are. I especially worried if the thread context switching is not giving me more grief than benefit, from all the asynchrony... (looking at !dumpheap it's definitely a factor)

仅作一点说明-我将发布2种方法-一种在ConcurrentQueue上进行while循环,等待新消息,另一种方法一次发送一条消息.我还完全按照Azure博士的规定使用了瞬态故障处理"块.

Just a bit of description - I will be posting 2 methods - one that does a while loop on a ConcurrentQueue, waiting for new messages and the other method that sends one message at a time. I'm also using the Transient Fault Handling block exactly as Dr. Azure prescribed.

发送循环(从头开始,等待新消息):

Sending loop (started at the beginning, waiting for new messages):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

发送消息:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上面的代码来自"Sender"类,该类每秒发送1条消息.在任何给定时间,我大约有50-100个实例在运行,因此可能有很多线程.

The code above is from a 'Sender' class that sends 1 message/second. I have about 50-100 instances running at any given time, so it could be quite a number of threads.

顺便说一句,不要太担心确保消息发送者,重新创建消息工厂,保证主题存在,它们不会经常被调用.

Btw do not worry about EnsureMessageSender, RecreateMessageFactory, EnsureTopicExists too much, they are not called that often.

如果只需要一次发送一条消息,而不用担心异步问题并避免随之而来的开销,那么让一个后台线程在消息队列中工作并同步发送消息是否更好呢?

Would I not be better of just having one background thread working through the message queue and sending messages synchronously, provided all I need is send one message at a time, not worry about the async stuff and avoid the overheads coming with it.

请注意,将一条消息发送到Azure Service Bus通常只需要几毫秒的时间,实际上并不昂贵. (除非速度很慢,超时或Service Bus后端出现问题,否则它可能会在尝试发送内容时挂起一会儿).

Note that usually it's a matter of milliseconds to send one Message to Azure Service Bus, it's not really expensive. (Except at times when it's slow, times out or there is a problem with Service Bus backend, it could be hanging for a while trying to send stuff).

很抱歉,很长的帖子,

Stevo

建议的解决方案

这个例子可以解决我的情况吗?

Would this example be a solution to my situation?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false);

                    Console.WriteLine("Value sent " + val);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }

推荐答案

您说:

这是异步的一个很好的例子.您在这里节省了很多线程.异步减少上下文切换,因为它不是基于线程的.如果需要等待,它不会上下文切换.相反,下一个工作项是在同一线程(如果有)上进行处理的.

This is a good case for async. You save lots of threads here. Async reduces context switching because it is not thread-based. It does not context-switch in case of something requiring a wait. Instead, the next work item is being processed on the same thread (if there is one).

因此,您的异步解决方案肯定会比同步解决方案更好地扩展.是否需要在工作流的50-100个实例中实际使用较少的CPU,需要进行衡量.实例越多,异步变得更快的可能性就越大.

For that reason you async solution will definitely scale better than a synchronous one. Whether it actually uses less CPU at 50-100 instances of your workflow needs to be measured. The more instances there are the higher the probability of async being faster becomes.

现在,实现存在一个问题:您使用的不是异步设备的ConcurrentQueue.因此,即使在异步版本中,您实际上也会使用50-100个线程.它们要么阻塞(您要避免),要么忙于燃烧100%CPU(在您的实现中似乎是这种情况!).您还需要摆脱这个问题,并使队列也异步.也许SemaphoreSlim在这里很有帮助,因为它可以异步等待.

Now, there is one problem with the implementation: You're using a ConcurrentQueue which is not async-ready. So you actually do use 50-100 threads even in your async version. They will either block (which you wanted to avoid) or busy-wait burning 100% CPU (which seems to be the case in your implementation!). You need to get rid of this problem and make the queuing async, too. Maybe a SemaphoreSlim is of help here as it can be waited on asynchronously.

这篇关于Azure ServiceBus和异步-是还是不是?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 16:22