本文介绍了我想在Python中的Google App引擎中链接多个mapreduce作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意事项:我是Google App Engine和Python的新手,但到目前为止,我已经设法在Google App Engine中实现了PageRank算法.

Caveat: I am new to Google App Engine and Python, but I have managed to implement a PageRank algorithm in Google App Engine so far.

接下来,我想将Google App Engine中的三个mapreduce作业链接在一起.但是,我不明白如何使用BlobKeys将键值对从第一个mapreduce作业传递到第二个mapreduce作业(随后将第二个mapreduce作业传递给第三个mapreduce作业).我试图遵循以下网站上介绍的内容:

Next, I would like to chain together three mapreduce jobs in Google App Engine. However, I do not understand how to pass the key-value pairs from the first mapreduce job to the second mapreduce job (and subsequently the second mapreduce job to the third) using BlobKeys. I attempted to follow that introduced on the following website:

http://mattfaus.com/2013/10/google -appengine-mapreduce-in-depth/

使用BlobKeys类将BlobKey从一个mapreduce作业传递到下一个mapreduce作业.我认为我错误地实现了python类,因为在调用时,下面的代码无法识别"third_party"对象.

Which uses a BlobKeys class to pass the BlobKey from one mapreduce job to the next. I believe that I am implementing the python Class incorrectly, as when called, the "third_party" object is not recognized in the code below.

可能有人可以指出我要去哪里.对于无法提供本地驱动的测试表示歉意.这似乎有点像野兽!

Might someone be able to point out where I am going wrong. Apologies for the inability to provide a locally-driven test. This seems to be a little bit of a beast!

这是我尝试使用的课程:

Here is the class that I am attempting to use:

class BlobKeys(mapreduce.base_handler.PipelineBase):
  """Returns a dictionary with the supplied keyword arguments."""

  def run(self, keys):
    # Remove the key from a string in this format:
    # /blobstore/<key>
    return {
        "blob_keys": [k.split("/")[-1] for k in keys]
    }

这是调用上面的类的Pipeline代码(无法识别third_party对象):

And here is the Pipeline code that calls the class above (does not recognize the third_party object):

num_shards=2
# First define the parent pipeline job
class RecommenderPipeline(base_handler.PipelineBase):
  """A pipeline to run Recommender demo.

  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """

  def run(self, filekey, blobkey, itr):
    #logging.debug("filename is %s" % filekey)
    output1 = yield mapreduce_pipeline.MapreducePipeline(
        "recommender",
        "main.recommender_group_by_user_rating_map1",
        "main.recommender_count_ratings_user_freq_reduce1",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_keys": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=num_shards)


    # Code below takes output1 and feeds into second mapreduce job.
    # Pipeline library ensures that the second pipeline depends on first and 
    # does not launch until the first has resolved.
    output2 = (
    yield mapreduce_pipeline.MapreducePipeline(
        "recommender",
        "main.recommender_pairwise_items_map2",
        "main.recommender_calc_similarity_reduce2",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params=( BlobKeys(output1)), #see BlobKeys Class!`
        # "blob_keys": [k.split("/")[-1] for k in keys]
        #"blob_keys": blobkey, # did not work since "generator pipelines cannot
        # directly access ouputs of the child Pipelines that it yields", this code
        # would require the generator pipeline to create a temporary dict object 
        # with the output of the first job - this is not allowed.
        # In addition, the string returned by BobStoreOutputWriter is in the format
        # /blobstore/<key>, but BlobStoreLineInputReader expects only "<key>"
        # To solve these problems, use the BlobKeys class above.
        #},
        #mapper_params={
        #    #"blob_keys": [k.split("/")[-1] for k in output1]
        #    "blob_keys": blobkey.split("/")[-1],
        #},
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=num_shards))

    # Code below takes output2 and feeds into third mapreduce job.
    # Pipeline library ensures that the third pipeline depends on second and 
    # does not launch until the second has resolved.
    output3 = (
    yield mapreduce_pipeline.MapreducePipeline(
        "recommender",
        "main.recommender_calculate_ranking_map3",
        "main.recommender_ranked_items_reduce3",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params=( BlobKeys(output2)), #see BobKeys Class!`
        #mapper_params={
        #    "blob_keys": blobkey.split("/")[-1],
        #},
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=num_shards))
    yield StoreOutput("Recommender", filekey, output3, itr)  #stores key to results so you can look at it.

我想知道我是否在使用Python类正确地遇到更多问题,还是在GAE中实现此问题更多.我怀疑两者的混合.任何帮助将不胜感激!谢谢!

I am wondering if I am having more of an issue using a Python Class properly, or more of an issue implementing this in GAE. I suspect a mixture of the two. Any help would be greatly appreciated! Thanks!

推荐答案

pipeline参数可以是具体值,也可以是PipelineFutures(在这种情况下,它将等待直到将来的值可用).在您的情况下,您要将PipelineFutures作为参数传递给具体值(BlobKeys).而是尝试产生BlobKeys(output1)并将其结果作为参数传递给下一个管道.例如:output1_1 = yield BlobKeys(输出1)output2 = yield mapreduce_pipeline.MapreducePipeline(...,mapper_params = output1_1,...)

A pipeline argument can be a concrete value or a PipelineFutures (and in that case it will wait until the future's value is available).In your case you are passing a PipelineFutures as a parameter to a concrete value (BlobKeys).Instead try yielding BlobKeys(output1) and passing its result as a parameter to the next pipeline.e.g:output1_1 = yield BlobKeys(output1)output2 = yield mapreduce_pipeline.MapreducePipeline(..., mapper_params=output1_1,...)

这篇关于我想在Python中的Google App引擎中链接多个mapreduce作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-10 05:17