本文介绍了通过同一个 websocket 连接发送和接收帧而不会阻塞的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

抱歉,这篇长文章,但我已经研究了一个多星期,所以我尝试了很多不同的东西.我对 Python 非常了解,但我对 Python 中的异步或非阻塞函数没有任何经验.

Sorry for the long post but I've been poking at this for over a week so I've tried a lot of different stuff. I know Python well enough but I don't have any experience with asyncio or non-blocking functions in Python.

我正在为需要 websocket 连接的 web 服务编写 API 库/模块/包/任何东西.有许多传入消息要处理,我有时需要发送一些与控制相关的(Web 应用程序级别,而不是 websocket 控制消息).我可以轻松地通过连接接收消息并对其采取行动.我可以发送消息,但只能响应收到的消息,因为接收循环总是阻塞等待消息.我不想等待传入的消息来处理传出的消息,因此脚本在收到新消息之前不必挂起输入.在我努力使双向通信按预期工作时,我发现我需要使用 Twisted、Tornado 或 asyncio 之类的东西,但到目前为止,我尝试过的每个实现都失败了.请注意,发送必须通过同一连接发生.在接收循环之外打开一个短暂的连接是行不通的.这是我到目前为止所做的:

I'm writing an API library/module/package/whatever for a web service that requires a websocket connection. There are many incoming messages to act on, and some control-related (web app level, not websocket control messages) that I need to send on occasion. I can easily receive messages over the connection and act on them. I can send messages, but only in response to received messages because the receive loop is always blocking waiting for messages. I don't want to wait for an incoming messages to process an outgoing one so the script doesn't have to hang on input until a new messages is received. In my struggles to get two-way communication working as desired I discovered I need to use something like Twisted, Tornado, or asyncio but so far every implementation I've tried has failed. Note that the sending has to happen over the same connection. Opening a short-lived connection outside of the receive loop will not work. Here's what I've done so far:

websocket 代码的第一次迭代是使用 websocket-client 包.它非常接近文档中的示例:

The first iteration of the websocket code was using the websocket-client package. It was very close to the example from the docs:

import websocket
try:
    import thread
except ImportError:
    import _thread as thread
import time

def on_message(ws, message):
    # Send message frames to respective functions
    # for sorting, objectification, and processing

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        # Send initial frames required for server to send the desired frames
    thread.start_new_thread(run, ())


if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(buildWebsocketURL()),
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

这会阻止循环之外的任何进一步执行.我尝试学习 _thread 模块,但找不到任何迹象表明我可以从外部与 websocket 线程通信".我尝试设置一个发布/订阅侦听器函数,该函数将数据从另一个发送器函数转发到 ws.send() 但它没有工作.没有错误或任何东西,只是没有任何已发送消息的指示.

This blocks any further execution outside of the loop. I tried learning up on the _thread module but I couldn't find any indication that I could "communicate" with the websocket thread from outside. I tried setting up a pub/sub listener function that would forward data to ws.send() from another sender function but it didn't work. No errors or anything, just no indication of any sent messages.

接下来我尝试了 Websockets 模块.这个似乎是从头开始构建以利用 asyncio.同样,我得到了一个客户端构建,它会发送初始消息并对收到的消息采取行动,但进度停止了:

Next I tried the Websockets module. This one seems to be built from the ground up to utilize asyncio. Again, I got a client build that would send initial messages and act on received messages but the progress stopped there:

async def wsconnection():
        async with websockets.connect(getWebsocketURL()) as websocket:
            while True:
                message = await websocket.recv()
                if message == '{"type":"broadcaster.ready"}':
                    subscriptions = getSubscriptions()  # Get subscriptions from ident data
                    logging.info('Sending bookmarks to server as subscription keys')
                    subscriptionupdate = '{{"type": "subscribe","subscription_keys": ["{0}"],"subscription_scope": "update"}}'.format(
                        '","'.join(subscriptions))
                    subscriptioncontent = '{{"subscription_keys": ["{0}"],"subscription_scope": "content","type": "subscribe"}}'.format(
                        '","'.join(subscriptions))
                    logging.debug(subscriptioncontent)
                    await websocket.send(subscriptionupdate)
                    await websocket.send(subscriptioncontent)
                    await websocket.send(
                        '{"type":"message_lobby.read","lobby_id":"1","message_id:"16256829"}')
                sortframe(message)

asyncio.get_event_loop().run_until_complete(wsconnection())

我尝试了在此处应用的上述发布/订阅侦听器,但无济于事.在更彻底地阅读此模块的文档后,我尝试在循环之外获取 websocket 协议对象(包含 send() 和 recv() 方法),然后创建两个协程(?),一个侦听传入消息,另一个侦听并发送外发消息.到目前为止,如果不运行 wsconnection() 函数范围内的 async with websockets.connect(getWebsocketURL()) as websocket: 行,我已经完全无法获得 websocket 协议对象.我尝试使用 websocket = websockets.client.connect() 根据 docs 我以为会设置我需要的协议对象,但事实并非如此.在没有广泛的 asyncio 知识的情况下,我能找到的所有示例似乎都没有揭示以我需要的方式构建 websockets 发送方和接收方的任何明显方法.

I tried the aforementioned pub/sub listener applied here to no avail. Upon reading the docs for this module more thoroughly I tried getting the websocket protocol object (that contains the send() and recv() methods) outside of the loop then creating two coroutines(?), one listening for incoming messages and one listening for and sending outgoing messages. So far I've been completely unable to get the websocket protocol object without running the async with websockets.connect(getWebsocketURL()) as websocket: line within the scope of the wsconnection() function. I tried using websocket = websockets.client.connect() which according to the docs I thought would set the protocol object I need but it doesn't. All of the examples I can find don't seem to reveal any apparent way to structure the websockets sender and receiver in the way I require without extensive knowledge of asyncio.

我还使用了与上述类似的代码结构,同时使用 asyncio 和扭曲,但我想出了与上述相同的所有问题.

I also poked around with autobahn with similar code structures as above using both asyncio and Twisted but I came up with all the same problems as above.

到目前为止,我得到的最接近的是上面的 Websockets 包.文档有一个 示例代码段 用于发送/接收连接,但我无法真正了解那里发生的事情,因为它都是针对 asyncio 的.总的来说,我真的很难理解 asyncio,我认为一个大问题是它最近似乎发展得非常迅速,所以有大量非常特定于版本的信息围绕着这些冲突.不幸的是,不利于学习.~~~~这是我尝试使用该示例的方法,它连接,接收初始消息,然后连接丢失/关闭:

So far the closest I've gotten was with the Websockets package above. The docs have an example snippet for a send/recv connection but I can't really read what's going on there as it's all very specific to asyncio. I'm really having trouble wrapping my head around asyncio in general and I think a big problem is it seems to have very rapidly evolved recently so there is a ton of very version-specific information floating around that conflicts. Not good for learning, unfortunately. ~~~~This is what I tried using that example and it connects, receives initial messages, then the connection is lost/closed:

async def producer(message):
    print('Sending message')

async def consumer_handler(websocket, path):
    while True:
        message = await websocket.recv()
        await print(message)
        await pub.sendMessage('sender', message)

async def producer_handler(websocket, path):
    while True:
        message = await producer()
        await websocket.send(message)

async def wsconnect():
        async with websockets.connect(getWebsocketURL()) as websocket:
            path = "443"
            async def handler(websocket, path):
                consumer_task = asyncio.ensure_future(
                    consumer_handler(websocket, path))
                producer_task = asyncio.ensure_future(
                    producer_handler(websocket, path))
                done, pending = await asyncio.wait(
                    [consumer_task, producer_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )
                for task in pending:
                    task.cancel()

pub.subscribe(producer, 'sender')

asyncio.get_event_loop().run_until_complete(wsconnect())

那么我如何构造这段代码以通过同一个 websocket 连接进行发送和接收?当 websocket 连接打开时,我还需要在同一个脚本中进行各种 API 调用,这使事情进一步复杂化.

So how do I structure this code to get sending and receiving over the same websocket connection? I also have various API calls to make in the same script while the websocket connection is open which further complicates things.

我使用的是 Python 3.6.6,并且此脚本旨在作为模块导入到其他脚本中,因此需要将 websocket 功能封装在用于外部调用的函数或类中.

I'm using Python 3.6.6 and this script is intended to be imported as a module into other scripts so the websocket functionality will need to be wrapped up in a function or class for external calls.

推荐答案

我和你的情况完全一样.我知道这是一个非常不雅的解决方案因为它仍然不是全双工,但我似乎无法在互联网或 stackoverflow 上找到任何涉及 asyncio 和我使用的 websockets 模块的示例.

I am in the exact same situation as u. I know that this is a very inelegant solutionbecause it still isn't full-duplex but i can't seem to find any example on the internet or stackoverflow involving asyncio and the websockets module which i used.

我不认为我完全理解您的 websockets 示例(是服务器端代码还是客户端代码?)但我将解释我的情况和解决方案",也许这对您也有用.

I don't think i completely understand your websockets example (is that server-side or client-side code?) but i'm going to explain my situation and "solution" and maybe that would be usable for you too.

所以我有一个服务器主函数,它有一个 websocket 用 recv() 在循环中监听消息.当我发送开始"时,它将启动一个函数,该函数将每秒向浏览器中的 javascript 客户端发送数据.但是当函数发送数据时,我有时想暂停或停止来自我的客户端的数据流发送停止消息.问题是,当我使用 recv() 而数据发送已经开始时,服务器停止发送数据,只等待消息.我尝试了线程、多处理和其他一些东西,但最终我找到了希望临时解决方案,即在客户端收到一条数据后立即向服务器发送pong"消息,以便服务器在下一次循环迭代中继续发送数据或例如,如果pong"消息是stop",则停止发送数据,但这不是真正的双工,只是快速半双工...

So i have a server main function that has a websocket listening for messages in a loop with recv(). When i send "start" it will start a function that will send data every second to the javascript client in the browser. But while the function is sending data i sometimes want to pause or stop the stream of data from my client be sending a stop message. The problem is that when i use recv() while the data sending has already begun the server stops sending data and only waits for a message. I tried threads,multiprocessing and some other stuff but eventually i came to the hopefully temporarily solution of sending a "pong" message to the server immediately after the client receives a piece of data so that the server continues sending data at the next loop iteration or stop sending data if the "pong" message is "stop" instead for example but yeah this is not real duplex just fast half-duplex...

我的python服务器"上的代码

code on my python "server"

 async def start_server(self,websocket,webserver_path):
    self.websocket = websocket
    self.webserver_path = webserver_path
    while True:
        command = await self.websocket.recv()
        print("received command")
        if command == "start":
            await self.analyze()
        asyncio.sleep(1)

在我的分析功能中:

        for i,row in enumerate(data)
            await self.websocket.send(json.dumps(row))
            msg = await self.websocket.recv()
            if msg == "stop":
                self.stopFlag = True
                return
            await asyncio.sleep(1)

主要

start_server = websockets.serve(t.start_server, "127.0.0.1", 5678)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

javascript 客户端上的代码

code on the javascript client

var ws = new WebSocket("ws://127.0.0.1:5678/");
        ws.onmessage = function (event) {
            var datapoint = JSON.parse(event.data);
            console.log(counter);
            counter++;
            data.push(datapoint);
            if (data.length > 40){
                var element = data.shift();
                render(data);
            }
            ws.send("pong");//sending dummy message to let server continue
        };

我知道这不是解决方案,我希望其他人提供更好的解决方案,但由于我有相同或非常相似的问题,并且没有其他答案,我决定发布,希望对您有所帮助.

I know it is not THE solution and i hope somebody else provides a better one but since i have the same or very similar problem and there are no other answers i decided to post and i hope it helps.

这篇关于通过同一个 websocket 连接发送和接收帧而不会阻塞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-11 17:52