过去几天,我一直在尝试将事件流集成到我的flask应用程序中,以在本地测试中取得良好的结果,但是在服务器上使用uWSGI运行该应用程序时,情况会更糟。我的代码基本上是基于flask的example构建的。我正在使用python 3.4.2

问题

在我的uWSGI服务器上运行该应用程序时,每当客户端尝试连接到gevent.hub.LoopExit: 'This operation would block forever'.端点时,它都会引发/streaming。我的假设是这是由于无限期地在空队列上调用get()引起的。

完整回溯:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/werkzeug/wsgi.py", line 691, in __next__
    return self._next()
  File "/usr/lib/python3/dist-packages/werkzeug/wrappers.py", line 81, in _iter_encoded
    for item in iterable:
  File "./voting/__init__.py", line 49, in gen
    result = queue.get(block=True)
  File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 284, in get
    return self.__get_or_peek(self._get, block, timeout)
  File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 261, in __get_or_peek
    result = waiter.get()
  File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 878, in get
    return self.hub.switch()
  File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 609, in switch
    return greenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x7f717f40f5a0 epoll default pending=0 ref=0 fileno=6>)

我的密码
/streaming端点:
@app.route("/streaming", methods=["GET", "OPTIONS"])
def streaming():
    def gen():
        queue = Queue()
        subscriptions.add_subscription(session_id, queue)

        try:
            while True:
                result = queue.get()      # Where the Exception is raised
                ev = ServerSentEvent(json.dumps(result["data"]), result["type"])
                yield ev.encode()

        except GeneratorExit:  # TODO Need a better method to detect disconnecting
            subscriptions.remove_subscription(session_id, queue)

    return Response(gen(), mimetype="text/event-stream")

将事件添加到队列中:
def notify():
    msg = {"type": "users", "data": db_get_all_registered(session_id)}
    subscriptions.add_item(session_id, msg)      # Adds the item to the relevant queues.

gevent.spawn(notify)

如前所述,它可以使用werkzeug在本地正常运行:
from app import app
from gevent.wsgi import WSGIServer
from werkzeug.debug import DebuggedApplication

a = DebuggedApplication(app, evalex=True)
server = WSGIServer(("", 5000), a)
server.serve_forever()

我尝试过的
  • 使用monkey.patch_all()进行猴子修补。
  • Queue切换到JoinableQueue
  • gevent.sleep(0)Queue.get()结合使用。
  • 最佳答案

    该异常基本上意味着该循环/线程中没有其他可运行的greenlet可以切换到该greenlet。因此,当greenlet进入阻塞状态(queue.get())时,集线器无处可去,无所事事。

    gevent的WSGIServer中可以使用相同的代码,因为服务器本身是运行socket.accept循环的greenlet,因此总有另一个greenlet可以切换到该环境。但是显然uwsgi不能那样工作。

    解决此问题的方法是安排其他Greenlet运行。例如,与其生成一个greenlet来按需通知,不如安排一个greenlet已经在运行并阻塞在自己的队列中。

    关于python - 使用gevent.queue.Queue.get(): gevent. hub.LoopExit: 'This operation would block forever',我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41198276/

    10-09 07:08