本文介绍了扭曲:等待子任务完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的code,我有两个假设的任务:一是使用Twisted的合作者发电机和批量下载它们获取URL和其他需要下载的来源和异步分析它。我想所有的提取和分析任务封装成当所有的页面下载和所有来源进行解析调用回一个Deferred对象。

In my code, I have two hypothetical tasks: one gets urls from a generator and batch downloads them using Twisted's Cooperator, and the other takes a downloaded source and asynchronously parses it. I'm trying to encapsulate all of the fetch and parse tasks into a single Deferred object that calls back when all pages are downloaded and all sources are parsed.

我已经想出了以下解决方案:

I've come up with the following solution:

from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage


BATCH_SIZE = 5

def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}

    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)

    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)

    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred

    def on_finish(r):
        state['done'] = True

    deferreds = []

    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))

    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)

    return defer.DeferredList([main_tasks, result])

# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)

在code的作品,但我觉得,如果我不是失去了一些东西公然明显,或无知的简单扭曲的模式,将让这个简单了很多的。有没有更好的方式来返回一个延迟调用,当所有的读取和解析结束了?

The code works, but I feel as if I'm either missing something blatantly obvious, or ignorant of a simple Twisted pattern that would make this a lot simpler. Is there a better way to return a single Deferred that calls back when all fetching and parsing is finished?

推荐答案

由于目前编写的,它看起来对我来说,这个code将并行下载的数量有限,但解析平行作业的数量不受限制。那是故意的吗?我将承担不,如果你的网络情况,因为是快速和解析器恰好是缓慢的,因为URL的数量接近无穷大,所以做你的内存使用。)

As currently written, it looks to me like this code will have a limited number of parallel downloads, but an unlimited number of parallel parse jobs. Is that intentional? I'm going to assume "no", since if your network happens to be fast and your parser happens to be slow, as the number of URLs approaches infinity, so does your memory usage :).

所以这里将有有限的并行性,但执行的事情解析依次下载,而不是:

So here's a thing that will have limited parallelism but carry out parses sequentially with downloads, instead:

from twisted.internet import defer, task
from twisted.web.client import getPage

BATCH_SIZE = 5

def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))

task.react(main_task)

这工作,因为,因为解析(显然)返回递延,将其添加为回调返回的通过 GETPAGE 的结果递延,将不会调用由 coiterate 直到解析已完成其业务。

This works because since parse (apparently) returns a Deferred, adding it as a callback to the one returned by getPage results in a Deferred that won't call the callback added by coiterate until parse has done its business.

既然你问地道的扭曲code,我也采取了现代化有点(使用 task.react ,而不是运行反应器的自由手动,内联前pressions使事情简短等等)。

Since you were asking about idiomatic Twisted code, I've also taken the liberty of modernizing it a bit (using task.react rather than running the reactor manually, inlining expressions to make things briefer and so on).

如果你确实想拥有比并行读取的详细解析平行,这样的事情可能会更好地工作,然后:

If you really do want to have more parallel parses than parallel fetches, something like this might work better then:

from twisted.internet import defer, task
from twisted.web.client import getPage

PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10

def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)

    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)

    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))

task.react(main_task)

您可以看到 parseWhenReady 返回获得返回$ C>,所以并行抓取会尽快并行解析可以的开始的继续,因此,你不会继续乱取,即使解析器超载。然而, parallelParse 小心地从返回弃权的递延解析或发布,因为取应能继续为那些跟进。

You can see that parseWhenReady returns the Deferred returned from acquire, so parallel fetching will continue as soon as the parallel parse can begin, and therefore you won't continue fetching indiscriminately even when the parser is overloaded. However, parallelParse carefully abstains from returning the Deferred returned by parse or release, since fetching should be able continue as those are proceeding.

(请注意,由于您最初的例子是不可运行,我没有测试过其中任一都没有。希望的意图是明确的,即使有错误。)

(Please note that since your initial example was not runnable, I haven't tested either of these at all. Hopefully the intent is clear even if there are bugs.)

这篇关于扭曲:等待子任务完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 16:00