本文介绍了如何在FastAPI的异步循环中使用Pika?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用FastAPI创建一个WebHook终结点,并将到达那里的任何json请求正文写入RabbitMQ。

我不知道如何连接到RabbitMQ、创建通道并通过连接到FastAPI asyncio循环来使其保持活动状态。此处的其他问题或答案都无济于事。

我目前的"解决方案"是在FastAPI应用程序启动方法中启动第二个线程,并使用queue e.SimpleQueue在线程之间进行对话。因此,FastAPI Path方法将对象写入此SimpleQueue,第二个线程从中读取对象并将其发布到RabbitMQ。

这种简单方法的问题在于,因为第二个线程在读取SimpleQueue时阻塞,RabbitMQ连接没有得到通过它发送的Keepalives,因此服务器将其关闭。我的代码在写入RabbitMQ时捕获异常,然后重新连接并再次尝试,但这相当难看。

我无法理解如何将pika或aio-pika映射中的异步示例改编为FastAPI应用。

有没有人能向我展示,鉴于下面这个简单的FastAPI应用程序,我如何打开RabbitMQ连接,该连接将通过它发送必要的保持活动以保持打开,以便我可以通过Path方法中的连接进行发布?

from fastapi import FastAPI, Response
from typing import Any, Dict

app = FastAPI()

@app.on_event("startup")
async def startup() -> None:
    # Connect to RabbitMQ
    # Create channel
    # Declare queue

JSONObject = Dict[str, Any]

@app.post("/webhook")
async def webhook_endpoint(msg: JSONObject):

    # Write msg to RabbitMQ channel here.

    return Response(status_code=204)

我的另一个想法是仍然使用线程,但让该线程在RabbitMQ连接上执行阻塞读取,以希望通过它不断发送保持活动,并且不会干扰通过相同连接从path方法发布。但这显然是一次黑客攻击。我更喜欢以"正确"的方式进行操作,并使用异步代码。

edit:似乎没有一种方法可以进行阻塞读取,所以使用channel.basic_get()进行无限循环,然后休眠。更刺耳。

推荐答案

我通过调整pika async publisher example可以正常工作。

我更改了示例,以便它创建AsyncioConnection而不是使用SelectConnection,因为FastAPI已经启动了标准的Asyncio事件循环,我希望pika使用该事件循环,而不是SelectConnection决定使用的任何内容。

这意味着示例中的重新连接逻辑没有按照编码方式工作,所以我需要修复它,它已从下面的代码中删除。

但是,此代码确实在后台保持连接活动,并且我可以发布消息以响应对WebHook URL的命中。将日志级别更改为DEBUG将显示正在发送和接收的检测信号。

这不是最终代码-我将更改队列等,但它确实显示AsyncioConnection可以连接到已经在运行事件循环OK的FastAPI。

import pika
from pika.adapters.asyncio_connection import AsyncioConnection
from pika.exchange_type import ExchangeType

from typing import Any, Dict

import asyncio, json, logging, os, queue, threading, time

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z')
logger = logging.getLogger(__name__)


class AsyncioRabbitMQ(object):
    EXCHANGE = 'message'
    EXCHANGE_TYPE = ExchangeType.topic
    PUBLISH_INTERVAL = 1
    QUEUE = 'text'
    ROUTING_KEY = 'example.text'

    def __init__(self, amqp_url):
        self._connection = None
        self._channel = None

        self._deliveries = []
        self._acked = 0
        self._nacked = 0
        self._message_number = 0

        self._stopping = False
        self._url = amqp_url

    def connect(self):
        logger.info('Connecting to %s', self._url)
        return AsyncioConnection(
            pika.URLParameters(self._url),
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed)

    def on_connection_open(self, connection):
        logger.info('Connection opened')
        self._connection = connection
        logger.info('Creating a new channel')
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_connection_open_error(self, _unused_connection, err):
        logger.error('Connection open failed: %s', err)

    def on_connection_closed(self, _unused_connection, reason):
        logger.warning('Connection closed: %s', reason)
        self._channel = None

    def on_channel_open(self, channel):
        logger.info('Channel opened')
        self._channel = channel
        self.add_on_channel_close_callback()
        self.setup_exchange(self.EXCHANGE)

    def add_on_channel_close_callback(self):
        logger.info('Adding channel close callback')
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reason):
        logger.warning('Channel %i was closed: %s', channel, reason)
        self._channel = None
        if not self._stopping:
            self._connection.close()

    def setup_exchange(self, exchange_name):
        logger.info('Declaring exchange %s', exchange_name)
        # Note: using functools.partial is not required, it is demonstrating
        # how arbitrary data can be passed to the callback when it is called
        cb = functools.partial(self.on_exchange_declareok, userdata=exchange_name)
        self._channel.exchange_declare(exchange=exchange_name, exchange_type=self.EXCHANGE_TYPE, callback=cb)

    def on_exchange_declareok(self, _unused_frame, userdata):
        logger.info('Exchange declared: %s', userdata)
        self.setup_queue(self.QUEUE)

    def setup_queue(self, queue_name):
        logger.info('Declaring queue %s', queue_name)
        self._channel.queue_declare(queue=queue_name, callback=self.on_queue_declareok)

    def on_queue_declareok(self, _unused_frame):
        logger.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
        self._channel.queue_bind(self.QUEUE, self.EXCHANGE, routing_key=self.ROUTING_KEY, callback=self.on_bindok)

    def on_bindok(self, _unused_frame):
        logger.info('Queue bound')
        self.start_publishing()

    def start_publishing(self):
        logger.info('Issuing Confirm.Select RPC command')
        self._channel.confirm_delivery(self.on_delivery_confirmation)

    def on_delivery_confirmation(self, method_frame):
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
        logger.info('Received %s for delivery tag: %i', confirmation_type, method_frame.method.delivery_tag)
        if confirmation_type == 'ack':
            self._acked += 1
        elif confirmation_type == 'nack':
            self._nacked += 1
        self._deliveries.remove(method_frame.method.delivery_tag)
        logger.info(
            'Published %i messages, %i have yet to be confirmed, '
            '%i were acked and %i were nacked', self._message_number,
            len(self._deliveries), self._acked, self._nacked)

    def publish_message(self, message):
        if self._channel is None or not self._channel.is_open:
            return

        hdrs = { "a": "b" }
        properties = pika.BasicProperties(
            app_id='example-publisher',
            content_type='application/json',
            headers=hdrs)

        self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
                                    json.dumps(message, ensure_ascii=False),
                                    properties)
        self._message_number += 1
        self._deliveries.append(self._message_number)
        logger.info('Published message # %i', self._message_number)


app = FastAPI()
ep = None


@app.on_event("startup")
async def startup() -> None:
    global ep
    await asyncio.sleep(10) # Wait for MQ
    user = os.environ['RABBITMQ_DEFAULT_USER']
    passwd = os.environ['RABBITMQ_DEFAULT_PASS']
    host = os.environ['RABBITMQ_HOST']
    port = os.environ['RABBITMQ_PORT']

    ep = AsyncioRabbitMQ(f'amqp://{user}:{passwd}@{host}:{port}/%2F')
    ep.connect()


JSONObject = Dict[str, Any]


@app.post("/webhook")
async def webhook_endpoint(msg: JSONObject) -> None:
    global ep
    ep.publish_message(msg)
    return Response(status_code=204)

这篇关于如何在FastAPI的异步循环中使用Pika?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-20 13:32