我正在尝试使用python中的protobuf读取一些数据流,我想使用trio使客户端读取这些数据流。protobuf有一些方法调用,当我使用trio流时发现它们不起作用。
linux机器上的Python客户机。

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
    print(f"parent: connecting to 127.0.0.1:{addr[1]}")
    client_stream = await trio.open_tcp_stream(addr[0], addr[1])

    # encoding request
    print("parent: spawing encoding request ...")
    enc_req = create_enc_req(encoding) # construct encoding request
    await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

    log.debug('get_reponse: started')
    response = await client_stream.receive_some(1024)
    m_size = struct.unpack_from('<H', response[:2]) # the size of message
    m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
    m_body = response[4:]
    m_resp = Dtc.EncodingResponse()

m_body将是一些字节数据,我不知道如何解码。Dtc.EncodingResponse()是protobuf方法,它将给出一个Dtc对象,该对象包含可读格式的响应。(Dtc是protobuf文件)。但我在这里什么也得不到。当我在没有trio的情况下编写这个脚本时,Dtc.EncodingResponse()将以可读格式给出完整的响应。
我猜问题在于“client_stream”是一个只读取字节的trio stream对象,因此我可能需要使用一个ReceiveChannel对象。但如果这是真的,我不知道怎么做。
更新:
下面由Nathaniel J.Smith给出的答案解决了我的问题。
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

我觉得很傻,但我之前没有解析fromstring的数据,就这样。非常感谢所有给予答复的人。希望这能帮助其他人。

最佳答案

就像@shmee在评论中说的,我认为你的代码被编辑弄坏了。。。你应该再检查一遍。
当我在没有trio的情况下编写这个脚本时,Dtc.EncodingResponse()将以可读的格式给出完整的响应
我想你换成三重奏的时候可能掉了线?Dtc.EncodingResponse()只创建一个新的空对象。如果要将EncodingResponse中的数据解析为新对象,则必须显式地执行此操作,方法如下:

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

不过,还有一个问题。。。它之所以被称为m_body是因为它接收一些字节,但可能不会接收您要求的所有字节。您的代码假设对receive_some的一次调用将获取响应中的所有字节,当您执行简单测试时,这可能是正确的,但通常不能保证。如果第一次调用receive_some时没有获得足够的数据,则可能需要不断重复调用它,直到获得所有数据。
这其实很标准。。。插座的工作原理相同。这就是为什么您的服务器在开始时首先发送一个receive_some字段-这样您就可以知道是否已经获得了所有数据!
不幸的是,截至2019年6月,Trio并没有为您提供帮助,您可以在this issue中跟踪这方面的进展。同时,你也可以自己写。我想这样的办法应该行得通:
async def receive_exactly(stream, count):
    buf = bytearray()
    while len(buf) < count:
        new_data = await stream.receive_some(count - len(buf))
        if not new_data:
            raise RuntimeError("other side closed the connection unexpectedly")
        buf += new data
    return buf

async def receive_encoding_response(stream):
    header = await receive_exactly(stream, 4)
    (m_size, m_type) = struct.unpack('<HH', header)
    m_body = await receive_exactly(stream, m_size)
    m_resp = Dtc.EncodingResponse()
    m_resp.ParseFromString(m_size)
    return m_resp

关于python - 如何在Google Protocol Buffer 中使用python-trio?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56592227/

10-11 10:40