我想一次异步读取一个大文件,一次20行,我们如何使用扭曲框架来做到这一点?

我的代码的一部分,但被阻止了:

with open(file_path) as f:
    if (importState.status == ImportStateFile.STATUS_AWAY):
        f.seek(importState.fileIndex, 0)

    while True:
        importState.fileIndex = importState.fileIndex + len(''.join(emails))
        d1 = get_emails(f)
        d1.addCallback(process_emails).addCallback(insert_emails_status)
        d1.addErrback(finalize_import)

最佳答案

使用扭曲的生产者和消费者系统,请通过以下链接了解更多信息:http://twistedmatrix.com/documents/current/core/howto/producers.html

我的制片人:

@implementer(IBodyProducer)
class ListEmailProducer(FileBodyProducer):

    def _writeloop(self, consumer):
        """
        Return an iterator which reads one chunk of bytes from the input file
        and writes them to the consumer for each time it is iterated.
        """
        while True:
            emails = list(islice(self._inputFile, self._readSize))
            if len(emails) == 0:
                self._inputFile.close()
                consumer.finish()
                break

            consumer.write(emails)
            yield None


我的消费者:

class ListEmailConsumer():
    producer = None
    finished = False
    unregistered = True
    importState = None

    def registerImportState(self, importState):
        self.importState = importState

    def registerProducer(self, producer):
        self.producer = producer

    def unregisterProducer(self):
        self.unregistered = True

    def finish(self):
        finalize_import(self.importState)
        reactor.callFromThread(reactor.stop)

    def write(self, emails):
        self.producer.pauseProducing()
        d = process_emails(emails, self.importState)
        d.addCallback(insert_emails_status, self.importState)
        d.addCallback(lambda ignored: self.producer.resumeProducing())


执行:

fileObj = open(file_path)

listEmailProducer = ListEmailProducer(fileObj, readSize=20)
listEmailConsumer = ListEmailConsumer()
listEmailConsumer.registerProducer(listEmailProducer)
listEmailConsumer.registerImportState(importState)
listEmailProducer.startProducing(listEmailConsumer)

关于python - 异步扭曲文件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26314586/

10-12 22:33