本文介绍了Google AppEngine管道API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将我的一些任务重写为管道。主要是因为我需要一种检测任务何时完成或按特定顺序启动任务的方法。我的问题是我不确定如何将递归任务重写到管道。通过递归,我的意思是称自己是这样的任务:

$ p $ class MyTask(webapp.RequestHandler):
def post( self):
cursor = self.request.get('cursor',None)

[设置游标如果不为空]
[提取100个实体形成数据存储区]

如果len(result)> = 100:
[在队列中创建相同的任务并传递光标]

[创建任务的实际工作]

现在我真的想把它写成流水线并做类似的操作:

  class DoSomeJob(pipeline.Pipeline):

def run(self):
with pipeline.InOrder ():
yield MyTask()
yield MyOtherTask()
yield DoSomeMoreWork(message2)

任何帮助这一个将不胜感激。谢谢!

解决方案

基本的管道只返回一个值:

  class MyFirstPipeline(pipeline.Pipeline):
def run(self):
returnHello World

该值必须是JSON序列化的。

如果您需要协调多个管道,您将需要使用生成器管道和 yield 语句。

  class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
yield MyFirstPipeline()

您可以将管道的收益视为返回一个。



您可以将这个未来作为输入参数传递给另一个管道:

class My GeneratorPipeline(pipeline.Pipeline):
def run(self):
result = yield MyFirstPipeline()
yield MyOtherPipeline(result)

Pipeline API将确保运行 MyOtherPipeline 的方法只有在结果将来自 MyFirstPipeline 已被解析为实际值后,才会调用c>。



您不能在同一个方法中混合 yield 和 return 。如果您使用 yield ,则该值必须是Pipeline实例。这可能会导致一个问题,如果你想这样做:

  class MyRootPipeline(pipeline.Pipeline ):
input_args中的input_arg:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield结果

这种情况下,Pipeline API只会在您的最终产出结果行中看到一个 list ,所以它不知道在返回之前解决其中的期货你会得到一个错误。

它们没有记录,但有一个工具管道库可以帮助你:



因此,上述实际运行的版本将如下所示:

从管道导入管道
导入公共

类MyRootPipeline(pipeline.Pipeline):$ b $输入_args中的input_arg:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)$ b run run(self,* input_args):
results = []
b $ b results.append(result)
yield common.List(* results)

现在我们没问题,我们正在生成一个管道实例,Pipeline API知道正确解析它的未来值。 common.List 管道的来源非常简单:

  class List(pipeline.Pipeline):
返回一个带有提供的位置参数的列表。

def run(self,* args):
return list(args)

... / em>管道的 run 方法被称为管道API已经将列表中的所有项目解析为实际值,可以以 *的形式传递args 。



无论如何,回到你原来的例子,你可以做这样的事情:

class FetchEntitites(pipeline.Pipeline):
def run(self,cursor = None)
如果cursor不是None :
cursor = Cursor(urlsafe = cursor)

#我认为可以在这里传入None作为游标,还没有确认
结果,next_curs,more = MyModel。询问().fetch_page(100,
start_cursor = cursor)

#立即为下一页结果排队任务
future_results = []
如果更多:
future_results = yield FetchEntitites(next_curs.urlsafe())

current_results = [做一些关于`results`的工作]

#(假定current_results和future_results都是列表)
#这将需要等待
#future_results中的所有递归调用才能解析:
产生common.Extend(current_results,future_results)



进一步解释



在开始时我说我们可以对待 result = yield MyPipeline()就好像它返回'future'一样。这不是严格正确的,显然我们实际上只是产生实例化的管道。 (不用说我们的运行方法现在是一个 generator 函数。)



Python的 工作就是,尽管它看起来像,但是 yield 的值在函数外部(而不是放入您的结果 var。表达式左侧的结果 var的值也从外部中通过调用在生成器上发送(生成器是您定义的 run 方法)。

因此,通过产生一个实例化的管道,你让Pipeline API获取该实例并在其他时间在其他地方调用它的 run 方法(实际上它将被传递给一个任务队列作为一个类名和一组args和kwargs并在那里重新实例化......这就是为什么你的args和kwargs也需要成为JSON序列化的原因。)



同时,将Pipeline API 发送 sa PipelineFuture 对象到您的运行生成器,这就是你的结果 var中显示的内容。这看起来有点神奇和反直觉,但这是带有yield表达式的发生器的工作方式。

我需要花费很多的时间去研究它,这个级别,我欢迎任何我错误的任何澄清或更正。


I would like to rewrite some of my tasks as pipelines. Mainly because of the fact that I need a way of detecting when a task finished or start a tasks in specific order. My problem is that I'm not sure how to rewrite the recursive tasks to pipelines. By recursive I mean tasks that call themselves like this:

class MyTask(webapp.RequestHandler):
    def post(self):
        cursor = self.request.get('cursor', None)

        [set cursor if not null]
        [fetch 100 entities form datastore]

        if len(result) >= 100:
            [ create the same task in the queue and pass the cursor ]

        [do actual work the task was created for]

Now I would really like to write it as a pipeline and do something similar to:

class DoSomeJob(pipeline.Pipeline):

   def run(self):
       with pipeline.InOrder():
          yield MyTask()
          yield MyOtherTask()
          yield DoSomeMoreWork(message2)

Any help with this one will be greatly appreciated. Thank you!

解决方案

A basic pipeline just returns a value:

class MyFirstPipeline(pipeline.Pipeline):
    def run(self):
        return "Hello World"

The value has to be JSON serializable.

If you need to coordinate several pipelines you will need to use a generator pipeline and the yield statement.

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        yield MyFirstPipeline()

You can treat the yielding of a pipeline as if it returns a 'future'.

You can pass this future as the input arg to another pipeline:

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        result = yield MyFirstPipeline()
        yield MyOtherPipeline(result)

The Pipeline API will ensure that the run method of MyOtherPipeline is only called once the result future from MyFirstPipeline has been resolved to a real value.

You can't mix yield and return in the same method. If you are using yield the value has to be a Pipeline instance. This can lead to a problem if you want to do this:

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield results

In this case the Pipeline API just sees a list in your final yield results line, so it doesn't know to resolve the futures inside it before returning and you will get an error.

They're not documented but there is a library of utility pipelines included which can help here:
https://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py

So a version of the above which actually works would look like:

import pipeline
from pipeline import common

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield common.List(*results)

Now we're ok, we're yielding a pipeline instance and Pipeline API knows to resolve its future value properly. The source of the common.List pipeline is very simple:

class List(pipeline.Pipeline):
    """Returns a list with the supplied positional arguments."""

    def run(self, *args):
        return list(args)

...at the point that this pipeline's run method is called the Pipeline API has resolved all of the items in the list to actual values, which can be passed in as *args.

Anyway, back to your original example, you could do something like this:

class FetchEntitites(pipeline.Pipeline):
    def run(self, cursor=None)
        if cursor is not None:
            cursor = Cursor(urlsafe=cursor)

        # I think it's ok to pass None as the cursor here, haven't confirmed
        results, next_curs, more = MyModel.query().fetch_page(100,
                                                              start_cursor=cursor)

        # queue up a task for the next page of results immediately
        future_results = []
        if more:
            future_results = yield FetchEntitites(next_curs.urlsafe())

        current_results = [ do some work on `results` ]

        # (assumes current_results and future_results are both lists)
        # this will have to wait for all of the recursive calls in
        # future_results to resolve before it can resolve itself:
        yield common.Extend(current_results, future_results)

Further explanation

At the start I said we can treat result = yield MyPipeline() as if it returns a 'future'. This is not strictly true, obviously we are actually just yielding the instantiated pipeline. (Needless to say our run method is now a generator function.)

The weird part of how Python's yield expressions work is that, despite what it looks like, the value that you yield goes somewhere outside the function (to the Pipeline API apparatus) rather than into your result var. The value of the result var on the left side of the expression is also pushed in from outside the function, by calling send on the generator (the generator being the run method you defined).

So by yielding an instantiated Pipeline, you are letting the Pipeline API take that instance and call its run method somewhere else at some other time (in fact it will be passed into a task queue as a class name and a set of args and kwargs and re-instantiated there... this is why your args and kwargs need to be JSON serializable too).

Meanwhile the Pipeline API sends a PipelineFuture object into your run generator and this is what appears in your result var. It seems a bit magical and counter-intuitive but this is how generators with yield expressions work.

It's taken quite a bit of head-scratching for me to work it out to this level and I welcome any clarifications or corrections on anything I got wrong.

这篇关于Google AppEngine管道API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 01:50