我想一次异步读取一个大文件,一次20行,我们如何使用扭曲框架来做到这一点?
我的代码的一部分,但被阻止了:
with open(file_path) as f:
if (importState.status == ImportStateFile.STATUS_AWAY):
f.seek(importState.fileIndex, 0)
while True:
importState.fileIndex = importState.fileIndex + len(''.join(emails))
d1 = get_emails(f)
d1.addCallback(process_emails).addCallback(insert_emails_status)
d1.addErrback(finalize_import)
最佳答案
使用扭曲的生产者和消费者系统,请通过以下链接了解更多信息:http://twistedmatrix.com/documents/current/core/howto/producers.html
我的制片人:
@implementer(IBodyProducer)
class ListEmailProducer(FileBodyProducer):
def _writeloop(self, consumer):
"""
Return an iterator which reads one chunk of bytes from the input file
and writes them to the consumer for each time it is iterated.
"""
while True:
emails = list(islice(self._inputFile, self._readSize))
if len(emails) == 0:
self._inputFile.close()
consumer.finish()
break
consumer.write(emails)
yield None
我的消费者:
class ListEmailConsumer():
producer = None
finished = False
unregistered = True
importState = None
def registerImportState(self, importState):
self.importState = importState
def registerProducer(self, producer):
self.producer = producer
def unregisterProducer(self):
self.unregistered = True
def finish(self):
finalize_import(self.importState)
reactor.callFromThread(reactor.stop)
def write(self, emails):
self.producer.pauseProducing()
d = process_emails(emails, self.importState)
d.addCallback(insert_emails_status, self.importState)
d.addCallback(lambda ignored: self.producer.resumeProducing())
执行:
fileObj = open(file_path)
listEmailProducer = ListEmailProducer(fileObj, readSize=20)
listEmailConsumer = ListEmailConsumer()
listEmailConsumer.registerProducer(listEmailProducer)
listEmailConsumer.registerImportState(importState)
listEmailProducer.startProducing(listEmailConsumer)
关于python - 异步扭曲文件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26314586/