问题描述
我正在尝试使用FastAPI创建一个WebHook终结点,并将到达那里的任何json请求正文写入RabbitMQ。
我不知道如何连接到RabbitMQ、创建通道并通过连接到FastAPI asyncio循环来使其保持活动状态。此处的其他问题或答案都无济于事。
我目前的"解决方案"是在FastAPI应用程序启动方法中启动第二个线程,并使用queue e.SimpleQueue在线程之间进行对话。因此,FastAPI Path方法将对象写入此SimpleQueue,第二个线程从中读取对象并将其发布到RabbitMQ。
这种简单方法的问题在于,因为第二个线程在读取SimpleQueue时阻塞,RabbitMQ连接没有得到通过它发送的Keepalives,因此服务器将其关闭。我的代码在写入RabbitMQ时捕获异常,然后重新连接并再次尝试,但这相当难看。
我无法理解如何将pika或aio-pika映射中的异步示例改编为FastAPI应用。
有没有人能向我展示,鉴于下面这个简单的FastAPI应用程序,我如何打开RabbitMQ连接,该连接将通过它发送必要的保持活动以保持打开,以便我可以通过Path方法中的连接进行发布?
from fastapi import FastAPI, Response
from typing import Any, Dict
app = FastAPI()
@app.on_event("startup")
async def startup() -> None:
# Connect to RabbitMQ
# Create channel
# Declare queue
JSONObject = Dict[str, Any]
@app.post("/webhook")
async def webhook_endpoint(msg: JSONObject):
# Write msg to RabbitMQ channel here.
return Response(status_code=204)
我的另一个想法是仍然使用线程,但让该线程在RabbitMQ连接上执行阻塞读取,以希望通过它不断发送保持活动,并且不会干扰通过相同连接从path方法发布。但这显然是一次黑客攻击。我更喜欢以"正确"的方式进行操作,并使用异步代码。
edit:似乎没有一种方法可以进行阻塞读取,所以使用channel.basic_get()进行无限循环,然后休眠。更刺耳。
推荐答案
我通过调整pika async publisher example可以正常工作。
我更改了示例,以便它创建AsyncioConnection而不是使用SelectConnection,因为FastAPI已经启动了标准的Asyncio事件循环,我希望pika使用该事件循环,而不是SelectConnection决定使用的任何内容。
这意味着示例中的重新连接逻辑没有按照编码方式工作,所以我需要修复它,它已从下面的代码中删除。
但是,此代码确实在后台保持连接活动,并且我可以发布消息以响应对WebHook URL的命中。将日志级别更改为DEBUG将显示正在发送和接收的检测信号。这不是最终代码-我将更改队列等,但它确实显示AsyncioConnection可以连接到已经在运行事件循环OK的FastAPI。
import pika
from pika.adapters.asyncio_connection import AsyncioConnection
from pika.exchange_type import ExchangeType
from typing import Any, Dict
import asyncio, json, logging, os, queue, threading, time
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z')
logger = logging.getLogger(__name__)
class AsyncioRabbitMQ(object):
EXCHANGE = 'message'
EXCHANGE_TYPE = ExchangeType.topic
PUBLISH_INTERVAL = 1
QUEUE = 'text'
ROUTING_KEY = 'example.text'
def __init__(self, amqp_url):
self._connection = None
self._channel = None
self._deliveries = []
self._acked = 0
self._nacked = 0
self._message_number = 0
self._stopping = False
self._url = amqp_url
def connect(self):
logger.info('Connecting to %s', self._url)
return AsyncioConnection(
pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, connection):
logger.info('Connection opened')
self._connection = connection
logger.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_connection_open_error(self, _unused_connection, err):
logger.error('Connection open failed: %s', err)
def on_connection_closed(self, _unused_connection, reason):
logger.warning('Connection closed: %s', reason)
self._channel = None
def on_channel_open(self, channel):
logger.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
logger.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reason):
logger.warning('Channel %i was closed: %s', channel, reason)
self._channel = None
if not self._stopping:
self._connection.close()
def setup_exchange(self, exchange_name):
logger.info('Declaring exchange %s', exchange_name)
# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = functools.partial(self.on_exchange_declareok, userdata=exchange_name)
self._channel.exchange_declare(exchange=exchange_name, exchange_type=self.EXCHANGE_TYPE, callback=cb)
def on_exchange_declareok(self, _unused_frame, userdata):
logger.info('Exchange declared: %s', userdata)
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
logger.info('Declaring queue %s', queue_name)
self._channel.queue_declare(queue=queue_name, callback=self.on_queue_declareok)
def on_queue_declareok(self, _unused_frame):
logger.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.QUEUE, self.EXCHANGE, routing_key=self.ROUTING_KEY, callback=self.on_bindok)
def on_bindok(self, _unused_frame):
logger.info('Queue bound')
self.start_publishing()
def start_publishing(self):
logger.info('Issuing Confirm.Select RPC command')
self._channel.confirm_delivery(self.on_delivery_confirmation)
def on_delivery_confirmation(self, method_frame):
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
logger.info('Received %s for delivery tag: %i', confirmation_type, method_frame.method.delivery_tag)
if confirmation_type == 'ack':
self._acked += 1
elif confirmation_type == 'nack':
self._nacked += 1
self._deliveries.remove(method_frame.method.delivery_tag)
logger.info(
'Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._message_number,
len(self._deliveries), self._acked, self._nacked)
def publish_message(self, message):
if self._channel is None or not self._channel.is_open:
return
hdrs = { "a": "b" }
properties = pika.BasicProperties(
app_id='example-publisher',
content_type='application/json',
headers=hdrs)
self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
json.dumps(message, ensure_ascii=False),
properties)
self._message_number += 1
self._deliveries.append(self._message_number)
logger.info('Published message # %i', self._message_number)
app = FastAPI()
ep = None
@app.on_event("startup")
async def startup() -> None:
global ep
await asyncio.sleep(10) # Wait for MQ
user = os.environ['RABBITMQ_DEFAULT_USER']
passwd = os.environ['RABBITMQ_DEFAULT_PASS']
host = os.environ['RABBITMQ_HOST']
port = os.environ['RABBITMQ_PORT']
ep = AsyncioRabbitMQ(f'amqp://{user}:{passwd}@{host}:{port}/%2F')
ep.connect()
JSONObject = Dict[str, Any]
@app.post("/webhook")
async def webhook_endpoint(msg: JSONObject) -> None:
global ep
ep.publish_message(msg)
return Response(status_code=204)
这篇关于如何在FastAPI的异步循环中使用Pika?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!