问题描述
我有一个简单的 AMQP/RabbitMQ 异步使用者,它使用 Pika 库用 Python 编写并基于 异步消费者示例 来自 Pika 文档.主要区别是我想在一个线程中运行我的,我希望它正确关闭连接,然后在一定时间间隔后退出(即终止线程).这是我打开连接和设置超时的方法.我还打开了一个频道,创建了一个交换并绑定了一个队列……一切正常.
I have a simple asynchronous consumer for AMQP/RabbitMQ, written in Python using the Pika library and based on the Asynchronous consumer example from the Pika docs. The main difference is that I want to run mine in a thread and I want it to close the connection properly then exit (i.e. terminate the thread) after a certain time interval. Here are my methods to open a connection and set a timeout. I also open a channel, create an exchange and bind a queue... all that works fine.
def connect(self):
LOGGER.info('OPEN connection...')
return pika.SelectConnection(self._parameters, self.on_connection_open, stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
LOGGER.info('Connection opened')
self.add_on_connection_close_callback()
self._connection.add_timeout(5, self.timer_tick)
self.open_recv_channel()
这是超时回调:
def timer_tick(self):
LOGGER.info('---TICK---')
self._stop()
这是 _stop 方法:
Here's the _stop method:
def _stop(self):
LOGGER.info('Stopping...')
self._connection.close()
LOGGER.info('Stopped')
time.sleep(5)
self._connection.ioloop.stop()
这是启动线程的run方法:
Here's the run method which launches the thread:
def run(self):
print "-Run Started-"
self._connection = self.connect()
self._connection.ioloop.start()
print "-Run Finished-"
这里是 main() 的主要部分:
Here's the main bit of main():
client = TestClient()
client.start()
client.join()
LOGGER.info('Returned.')
time.sleep(30)
我的问题是self._connection.close()"不能正常工作.我添加了一个 on_close 回调:
My problem is that the "self._connection.close()" won't work properly. I added an on_close callback:
self._connection.add_on_close_callback(self.on_connection_closed)
但从未调用过 on_connection_closed().此外,连接未关闭.我可以在 RabbitMQ 管理 Web 界面中看到它,即使在线程完成后它仍然存在.这是输出:
But on_connection_closed() is never called. Also, the connection is NOT closed. I can see it in the RabbitMQ management web interface, and it remains even after the thread finishes. Here's the output:
-Run Started-
2015-01-28 14:39:28,431: OPEN connection...
2015-01-28 14:39:28,491: Queue bound
(...[snipped] various other messages here...)
2015-01-28 14:39:28,491: Issuing consumer related RPC commands
2015-01-28 14:39:28,491: Adding consumer cancellation callback
(Pause here waiting for timeout callback)
2015-01-28 14:39:33,505: ---TICK---
2015-01-28 14:39:33,505: Stopping...
2015-01-28 14:39:33,505: Closing connection (200): Normal shutdown
2015-01-28 14:39:33,505: Stopped
-Run Finished-
2015-01-28 14:39:39,507: Returned.
Closing connection (200): Normal shutdown"来自 Pika,但我的 on_close 或 on_cancel 回调都没有被调用,无论我是从关闭通道开始,还是只是关闭连接.唯一起作用的是用basic_cancel"停止消费者,这会导致我的on_cancel_callback"被调用.
"Closing connection (200): Normal shutdown" comes from Pika, but none of my on_close or on_cancel callbacks are called, whether I start by closing the channel, or just close the connection. The only thing that DOES work is stopping the consumer with "basic_cancel", which causes my "on_cancel_callback" to be called.
我想在主程序中使用一个循环来创建和销毁消费者线程,但目前,每次我运行一个循环时,我最终都会留下一个孤立的连接,因此我的连接数会无限增加.当程序关闭时,连接会消失.
I want to use a loop in the main program to create and destroy consumer threads, but at the moment, every time I run one I end up with an orphaned connection left over so my number of connections goes up indefinitely. The connections DO disappear when the program closes.
使用 connection.close() 应该可以工作:来自 Pika Docs:
Using connection.close() should work: From the Pika Docs:
关闭(reply_code=200,reply_text='正常关机')
close(reply_code=200, reply_text='Normal shutdown')
断开与 RabbitMQ 的连接.如果有任何打开的通道,它会在完全断开连接之前尝试关闭它们.具有活动消费者的通道将尝试向 RabbitMQ 发送 Basic.Cancel 以在关闭通道之前完全停止消息的传递.
Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.
推荐答案
如果您在线程之间共享连接,这可能会导致问题.pika
不是线程安全的,不同线程不应使用连接.
If you're sharing the connection between your threads this can cause problems. pika
is not thread safe and connections shouldn't be used by different threads.
问:
Pika 线程安全吗?
答:
Pika 在代码中没有任何线程的概念.如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建.跨线程共享一个 Pika 连接是不安全的.
这篇关于Pika SelectConnection 适配器的 close() 方法不会关闭连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!