问题描述
在我的code,我有两个假设的任务:一是使用Twisted的合作者发电机和批量下载它们获取URL和其他需要下载的来源和异步分析它。我想所有的提取和分析任务封装成当所有的页面下载和所有来源进行解析调用回一个Deferred对象。
In my code, I have two hypothetical tasks: one gets urls from a generator and batch downloads them using Twisted's Cooperator, and the other takes a downloaded source and asynchronously parses it. I'm trying to encapsulate all of the fetch and parse tasks into a single Deferred object that calls back when all pages are downloaded and all sources are parsed.
我已经想出了以下解决方案:
I've come up with the following solution:
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task():
result = defer.Deferred()
state = {'count': 0, 'done': False}
def on_parse_finish(r):
state['count'] -= 1
if state['done'] and state['count'] == 0:
result.callback(True)
def process(source):
deferred = parse(source)
state['count'] += 1
deferred.addCallback(on_parse_finish)
def fetch_urls():
for url in get_urls():
deferred = getPage(url)
deferred.addCallback(process)
yield deferred
def on_finish(r):
state['done'] = True
deferreds = []
coop = task.Cooperator()
urls = fetch_urls()
for _ in xrange(BATCH_SIZE):
deferreds.append(coop.coiterate(urls))
main_tasks = defer.DeferredList(deferreds)
main_tasks.addCallback(on_finish)
return defer.DeferredList([main_tasks, result])
# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)
在code的作品,但我觉得,如果我不是失去了一些东西公然明显,或无知的简单扭曲的模式,将让这个简单了很多的。有没有更好的方式来返回一个延迟调用,当所有的读取和解析结束了?
The code works, but I feel as if I'm either missing something blatantly obvious, or ignorant of a simple Twisted pattern that would make this a lot simpler. Is there a better way to return a single Deferred that calls back when all fetching and parsing is finished?
推荐答案
由于目前编写的,它看起来对我来说,这个code将并行下载的数量有限,但解析平行作业的数量不受限制。那是故意的吗?我将承担不,如果你的网络情况,因为是快速和解析器恰好是缓慢的,因为URL的数量接近无穷大,所以做你的内存使用。)
As currently written, it looks to me like this code will have a limited number of parallel downloads, but an unlimited number of parallel parse jobs. Is that intentional? I'm going to assume "no", since if your network happens to be fast and your parser happens to be slow, as the number of URLs approaches infinity, so does your memory usage :).
所以这里将有有限的并行性,但执行的事情解析依次下载,而不是:
So here's a thing that will have limited parallelism but carry out parses sequentially with downloads, instead:
from twisted.internet import defer, task
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task(reactor):
def fetch_urls():
for url in get_urls():
yield getPage(url).addCallback(parse)
coop = task.Cooperator()
urls = fetch_urls()
return (defer.DeferredList([coop.coiterate(urls)
for _ in xrange(BATCH_SIZE)])
.addCallback(task_finished))
task.react(main_task)
这工作,因为,因为解析
(显然)返回递延
,将其添加为回调返回的通过 GETPAGE
的结果递延
,将不会调用由 coiterate 添加回调code>直到
解析
已完成其业务。
This works because since parse
(apparently) returns a Deferred
, adding it as a callback to the one returned by getPage
results in a Deferred
that won't call the callback added by coiterate
until parse
has done its business.
既然你问地道的扭曲code,我也采取了现代化有点(使用 task.react
,而不是运行反应器的自由手动,内联前pressions使事情简短等等)。
Since you were asking about idiomatic Twisted code, I've also taken the liberty of modernizing it a bit (using task.react
rather than running the reactor manually, inlining expressions to make things briefer and so on).
如果你确实想拥有比并行读取的详细解析平行,这样的事情可能会更好地工作,然后:
If you really do want to have more parallel parses than parallel fetches, something like this might work better then:
from twisted.internet import defer, task
from twisted.web.client import getPage
PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10
def main_task(reactor):
parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)
def parseWhenReady(r):
def parallelParse(_):
parse(r).addBoth(
lambda result: parseSemaphore.release().addCallback(
lambda _: result
)
)
return parseSemaphore.acquire().addCallback(parallelParse)
def fetch_urls():
for url in get_urls():
yield getPage(url).addCallback(parseWhenReady)
coop = task.Cooperator()
urls = fetch_urls()
return (defer.DeferredList([coop.coiterate(urls)
for _ in xrange(PARALLEL_FETCHES)])
.addCallback(lambda done:
defer.DeferredList(
[parseSemaphore.acquire()
for _ in xrange(PARALLEL_PARSES)]
))
.addCallback(task_finished))
task.react(main_task)
您可以看到 parseWhenReady
返回从
获得$ C递延
返回$ C>,所以并行抓取会尽快并行解析可以的开始的继续,因此,你不会继续乱取,即使解析器超载。然而, parallelParse
小心地从返回弃权的递延
按解析
或发布
,因为取应能继续为那些跟进。
You can see that parseWhenReady
returns the Deferred
returned from acquire
, so parallel fetching will continue as soon as the parallel parse can begin, and therefore you won't continue fetching indiscriminately even when the parser is overloaded. However, parallelParse
carefully abstains from returning the Deferred
returned by parse
or release
, since fetching should be able continue as those are proceeding.
(请注意,由于您最初的例子是不可运行,我没有测试过其中任一都没有。希望的意图是明确的,即使有错误。)
(Please note that since your initial example was not runnable, I haven't tested either of these at all. Hopefully the intent is clear even if there are bugs.)
这篇关于扭曲:等待子任务完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!