使用旋风鼠兔异步队列监测

使用旋风鼠兔异步队列监测

本文介绍了使用旋风鼠兔异步队列监测的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个AMQP服务器(),我想这两个发布和在的。要做到这一点,我想我会使用异步AMQP Python库;特别(它的变化,理应支持龙卷风)。

I have an AMQP server (RabbitMQ) that I would like to both publish and read from in a Tornado web server. To do this, I figured I would use an asynchronous amqp python library; in particular Pika (a variation of it that supposedly supports Tornado).

我写了显示从队列在请求结束时成功读取,除了code,我得到一个异常(浏览器返回精):

I have written code that appears to successfully read from the queue, except that at the end of the request, I get an exception (the browser returns fine):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
    Traceback (most recent call last):
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
        yield
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
        yield
      File "/usr/lib/python2.6/contextlib.py", line 113, in nested
        yield vars
      File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
        callback(*args, **kwargs)
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
        self._handle_read()
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
        self.on_data_available(chunk)
      File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
        self.channels[frame.channel_number].frame_handler(frame)
    KeyError: 1

我不能完全肯定我正确地使用这个库,所以我可能会做一些公然错误的。我的code的基本流程是:

I'm not entirely sure I am using this library correctly, so I might be doing something blatantly wrong. The basic flow of my code is:


  1. 请求进来

  2. 创建连接使用TornadoConnection RabbitMQ的;指定一个回调

  3. 在连接回调,创建通道,申报/绑定我的队列,并调用basic_consume;指定一个回调

  4. 在消费回调,关闭通道,并调用龙卷风的完成功能。

  5. 见异常。

我的问题有几个:


  1. 这是流量,甚至正确吗?我不知道该连接回调的目的是什么,只是如果我不使用它,它不工作。

  2. 我应该创建每个网页请求对应一个AMQP连接? RabbitMQ的文档显示,没有,我不应该,而是我应该坚持只产生渠道。但是,我哪里会创建连接,以及如何重新连接尝试应该是往下走简单?

  3. 如果我创建每个Web请求对应一个AMQP连接,在那里我应该关闭它?在我的回调调用amqp.close()似乎搞砸,甚至更多。

我会尽量有一些样本code起来有点晚,但我上面描述的步骤奠定了事物的消费方相当彻底。我有问题,与出版方为好,但排队的消费更是pressing。

I will try to have some sample code up a little later, but the steps I described above lay out the consuming side of things fairly completely. I am having issues with the publishing side as well, but the consuming of queues is more pressing.

推荐答案

这将有助于看到一些源$ C ​​$ C,但我在一个以上的生产项目使用同样的龙卷风支持鼠兔模块没有问题。

It would help to see some source code, but I use this same tornado-supporting pika module without issue in more than one production project.

您不想创建每个请求的连接。创建一个包装所有AMQP操作的类,实例化它作为龙卷风应用级一个单可以跨请求中使用(并在请求处理)。我在做这个'runapp()',做一些这样的东西,然后函数启动主龙卷风ioloop。

You don't want to create a connection per request. Create a class that wraps all of your AMQP operations, and instantiate it as a singleton at the tornado Application level that can be used across requests (and across request handlers). I do this in a 'runapp()' function that does some stuff like this and then starts the main tornado ioloop.

下面是一个叫做'事件'类。这是一个部分实现(具体地讲,我不界定self.handle_event'在这里。这是给你的。

Here's a class called 'Events'. It's a partial implementation (specifically, I don't define 'self.handle_event' here. That's up to you.

class Event(object):
  def __init__(self, config):
    self.host = 'localhost'
    self.port = '5672'
    self.vhost = '/'
    self.user = 'foo'
    self.exchange = 'myx'
    self.queue = 'myq'
    self.recv_routing_key = 'msgs4me'
    self.passwd = 'bar'

    self.connected = False
    self.connect()


  def connect(self):

    credentials = pika.PlainCredentials(self.user, self.passwd)

    parameters = pika.ConnectionParameters(host = self.host,
                                         port = self.port,
                                         virtual_host = self.vhost,
                                         credentials = credentials)

    srs = pika.connection.SimpleReconnectionStrategy()

    logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host,
                                                              self.port))
    self.connection = tornado_adapter.TornadoConnection(parameters,
                                                      wait_for_open = False,
                                                      reconnection_strategy = srs,
                                                      callback = self.on_connected)

  def on_connected(self):

    # Open the channel
    logging.debug("Events: Opening a channel")
    self.channel = self.connection.channel()

    # Declare our exchange
    logging.debug("Events: Declaring the %s exchange" %  self.exchange)
    self.channel.exchange_declare(exchange = self.exchange,
                                type = "fanout",
                                auto_delete = False,
                                durable = True)

    # Declare our queue for this process
    logging.debug("Events: Declaring the %s queue" %  self.queue)
    self.channel.queue_declare(queue = self.queue,
                             auto_delete = False,
                             exclusive = False,
                             durable = True)


    # Bind to the exchange
    self.channel.queue_bind(exchange = self.exchange,
                          queue = self.queue,
                          routing_key = self.recv_routing_key)

    self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True)

    # We should be connected if we made it this far
    self.connected = True

然后我把在一个名为events.py。我RequestHandlers任何后端code都利用它包装code这是既实用(我的RequestHandlers不直接调用任何AMQP模块方法common.py模块 - 同样的分贝,高速缓存等以及),所以我定义'事件=无'在common.py模块级,我还挺实例Event对象是这样的:

And then I put that in a file called 'events.py'. My RequestHandlers and any back end code all utilize a 'common.py' module that wraps code that's useful to both (my RequestHandlers don't call any amqp module methods directly -- same for db, cache, etc as well), so I define 'events=None' at the module level in common.py, and I instantiate the Event object kinda like this:

import events

def runapp(config):
    if myapp.common.events is None:
       myapp.common.events = myapp.events.Event(config)
    logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events)
    http_server = tornado.httpserver.HTTPServer(app,
                                            xheaders=config['HTTPServer']['xheaders'],
                                            no_keep_alive=config['HTTPServer']['no_keep_alive'])
    http_server.listen(port)
    main_loop = tornado.ioloop.IOLoop.instance()
    logging.debug("MAIN IOLOOP: %s", main_loop)
    main_loop.start()

新年快乐:-D

Happy new year :-D

这篇关于使用旋风鼠兔异步队列监测的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 01:00