我正在尝试使用python中的protobuf读取一些数据流,我想使用trio使客户端读取这些数据流。protobuf有一些方法调用,当我使用trio流时发现它们不起作用。
linux机器上的Python客户机。
import DTCProtocol_pb2 as Dtc
async def parent(addr, encoding, heartbeat_interval):
print(f"parent: connecting to 127.0.0.1:{addr[1]}")
client_stream = await trio.open_tcp_stream(addr[0], addr[1])
# encoding request
print("parent: spawing encoding request ...")
enc_req = create_enc_req(encoding) # construct encoding request
await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request
log.debug('get_reponse: started')
response = await client_stream.receive_some(1024)
m_size = struct.unpack_from('<H', response[:2]) # the size of message
m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
m_body = response[4:]
m_resp = Dtc.EncodingResponse()
m_body
将是一些字节数据,我不知道如何解码。Dtc.EncodingResponse()
是protobuf方法,它将给出一个Dtc对象,该对象包含可读格式的响应。(Dtc是protobuf文件)。但我在这里什么也得不到。当我在没有trio的情况下编写这个脚本时,Dtc.EncodingResponse()
将以可读格式给出完整的响应。我猜问题在于“client_stream”是一个只读取字节的trio stream对象,因此我可能需要使用一个
ReceiveChannel
对象。但如果这是真的,我不知道怎么做。更新:
下面由Nathaniel J.Smith给出的答案解决了我的问题。
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
我觉得很傻,但我之前没有解析fromstring的数据,就这样。非常感谢所有给予答复的人。希望这能帮助其他人。
最佳答案
就像@shmee在评论中说的,我认为你的代码被编辑弄坏了。。。你应该再检查一遍。
当我在没有trio的情况下编写这个脚本时,Dtc.EncodingResponse()
将以可读的格式给出完整的响应
我想你换成三重奏的时候可能掉了线?Dtc.EncodingResponse()
只创建一个新的空对象。如果要将EncodingResponse
中的数据解析为新对象,则必须显式地执行此操作,方法如下:
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
不过,还有一个问题。。。它之所以被称为
m_body
是因为它接收一些字节,但可能不会接收您要求的所有字节。您的代码假设对receive_some
的一次调用将获取响应中的所有字节,当您执行简单测试时,这可能是正确的,但通常不能保证。如果第一次调用receive_some
时没有获得足够的数据,则可能需要不断重复调用它,直到获得所有数据。这其实很标准。。。插座的工作原理相同。这就是为什么您的服务器在开始时首先发送一个
receive_some
字段-这样您就可以知道是否已经获得了所有数据!不幸的是,截至2019年6月,Trio并没有为您提供帮助,您可以在this issue中跟踪这方面的进展。同时,你也可以自己写。我想这样的办法应该行得通:
async def receive_exactly(stream, count):
buf = bytearray()
while len(buf) < count:
new_data = await stream.receive_some(count - len(buf))
if not new_data:
raise RuntimeError("other side closed the connection unexpectedly")
buf += new data
return buf
async def receive_encoding_response(stream):
header = await receive_exactly(stream, 4)
(m_size, m_type) = struct.unpack('<HH', header)
m_body = await receive_exactly(stream, m_size)
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_size)
return m_resp
关于python - 如何在Google Protocol Buffer 中使用python-trio?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56592227/