本文介绍了Python Tornado 从另一个线程发送 WebSocket 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 Python 中使用 WebSockets 来使 Web 客户端了解我正在使用 PySerial 从串行端口读取的数据的最新信息.我目前正在使用以下代码通过单独的线程连续读取串行数据

I want to use WebSockets in Python to keep web clients up to date about data that I am reading from a serial port using PySerial. I am currently using the following code to read the serial data in continuously with a separate thread

def read_from_port():
    while running:
        reading = ser.readline().decode()
        handle_data(reading)

thread = threading.Thread(target=read_from_port)
thread.daemon = True
thread.start()

我正在对串行数据执行一些处理,然后如果计算结果与其之前的值不同,我想向所有连接的 WebSocket 客户端广播一条消息.为此,我设置了以下代码

I am performing some processing on the serial data and then want to broadcast a message to all the connected WebSocket clients if the calculated result differs from its previous value. For this I have set up the following code

clients = []

def Broadcast(message):
    for client in clients:
        client.sendMessage(json.dumps(message).encode('utf8'))
        print("broadcasted")

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)

    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)

    def on_close(self):
        print('connection closed')
        clients.remove(self)

    def check_origin(self, origin):
        return True

application = tornado.web.Application([
    (r'/ws', WSHandler),
])

if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.instance().start()

然后我想使用广播"worker 中的方法来广播结果.从工作线程使用此方法会产生以下错误

I then want to use the "broadcast" method in the worker to broadcast out a result. Using this method from the worker thread produces the following error

File "main.py", line 18, in Broadcast
    client.write_message(message)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
    return self.ws_connection.write_message(message, binary=binary)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
    fut = self._write_frame(True, opcode, message, flags=flags)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
    return self.stream.write(frame)
  File "/usr/local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
    future = Future()  # type: Future[None]
  File "/usr/local/Cellar/[email protected]/3.8.3_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.

我了解问题在于 Tornado write_message 函数不是线程安全的,并且产生此错误是因为我试图直接从工作线程调用该函数.据我所知,在 Tornado 中使用并发代码的推荐方法是通过 asyncio,但我认为在这种情况下,线程方法可能更合适,因为我有一个基本上不断并行运行的循环.

I understand the issue is that the Tornado write_message function is not thread safe and that this error is being produced because I am trying to call the function directly from the worker thread. As far as I can determine, the recommended way to use concurrent code with Tornado is through asyncio, but I think a threading approach might be more appropriate in this situation where I have a loop that essentially runs in parallel constantly.

不幸的是,我对 asyncio 以及如何在 Python 中实现线程知之甚少,所以我想找出从不同线程发送 WebSocket 消息的最简单方法是什么.

Unfortunately I know very little about asyncio and how threading is implemented in Python, so I would like to find out what is the simplest way that I can send WebSocket messages from a different thread.

推荐答案

https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading 给了我可以实现的必要线索这非常优雅地使用call_soon_threadsafe";功能.因此,以下代码似乎可以解决问题

Reading the official documentation for using asyncio and multithreading together at https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading gave me the necessary clue that one can achieve this quite elegantly using the "call_soon_threadsafe" function. The following code thus seems to do the trick

tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)

clients = []

def bcint(message):
    for client in clients:
        client.write_message(message)
        print("broadcasted")

def Broadcast(message):
    io_loop.asyncio_loop.call_soon_threadsafe(bcint, message)

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)

    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)

    def on_close(self):
        print('connection closed')
        clients.remove(self)

    def check_origin(self, origin):
        return True

application = tornado.web.Application([
    (r'/ws', WSHandler),
])

if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.current().start()

这篇关于Python Tornado 从另一个线程发送 WebSocket 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 11:20