I am getting used to asyncio and find the task handling quite nice, but it can be difficult to mix async libraries with traditional io libraries. The problem I am currently facing is how to properly decode an async StreamReader.
个字节字符串块,然后解码每个块-请参见下面的代码. (在我的程序中,我不会打印每个块,而是将其解码为字符串并将其发送到另一种处理方法中):
The simplest solution is to read()
chunks of byte strings, and then decode each chunk - see code below. (In my program, I wouldn't print each chunk, but decode it into a string and send it into another method for processing):
import asyncio
import aiohttp
async def get_data(port):
url = 'http://localhost:{}/'.format(port)
r = await aiohttp.get(url)
stream = r.content
while not stream.at_eof():
data = await stream.read(4)
这很好,直到有一个utf-8字符被分割成太多的块为止.例如,如果响应为b'M\xc3\xa4dchen mit Bi\xc3\x9f\n'
This works fine, until there is a utf-8 character that is split between too chunks. For example if the response is b'M\xc3\xa4dchen mit Bi\xc3\x9f\n'
, then reading chunks of 3 will work, but chunks of 4 will not (as \xc3
and \x9f
are in different chunks and decoding the chunk ending with \xc3
will raise the following error:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data
我研究了解决此问题的适当方法,至少在阻塞世界中,它似乎是io.TextIOWrapper或codecs.StreamReaderWriter(其差异在 PEP 0400 ).但是,这两个都依赖于典型的阻塞流.
I looked at proper solutions to this problem, and at least in the blocking world, seems to be either io.TextIOWrapper or codecs.StreamReaderWriter (the differences of which are discussed in PEP 0400). However, both of these rely on typical blocking streams.
我花了30分钟用asyncio搜索示例,并一直在寻找我的define()解决方案.有谁知道更好的解决方案,还是这是python asyncio中缺少的功能?
I spent 30 minutes searching for examples with asyncio and kept finding my decode() solution. Does anyone know of a better solution or is this a missing feature in python's asyncio?
For reference, here are the results from using the two "standard" decoders with async streams.
Using the codec stream reader:
r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)
File "echo_client.py", line 13, in get_data
data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator
(它直接调用read(),而不是yield from
(it calls read() directly, rather than yield from
or await
I also tried wrapping stream with io.TextIOWrapper:
stream = TextIOWrapper(r.content)
File "echo_client.py", line 10, in get_data
stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'
P.S. If you want a sample test case for this, please look at this gist. You can run it with python3.5 to reproduce the error. If you change the chunk size from 4 to 3 (or 30), it will work correctly.
The accepted answer fixed this like a charm. Thanks! If someone else has this issue, here is a simple wrapper class I made to handle the decoding on a StreamReader:
import codecs
class DecodingStreamReader:
def __init__(self, stream, encoding='utf-8', errors='strict'):
self.stream = stream
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
async def read(self, n=-1):
data = await self.stream.read(n)
if isinstance(data, (bytes, bytearray)):
data = self.decoder.decode(data)
return data
def at_eof(self):
return self.stream.at_eof()
Utf8Decoder = codecs.getincrementaldecoder('utf-8')
decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
data = await stream.read(4)
print(decoder.decode(data), end='')
Mädchen mit Biß