问题描述
注意事项:我是Google App Engine和Python的新手,但到目前为止,我已经设法在Google App Engine中实现了PageRank算法.
Caveat: I am new to Google App Engine and Python, but I have managed to implement a PageRank algorithm in Google App Engine so far.
接下来,我想将Google App Engine中的三个mapreduce作业链接在一起.但是,我不明白如何使用BlobKeys将键值对从第一个mapreduce作业传递到第二个mapreduce作业(随后将第二个mapreduce作业传递给第三个mapreduce作业).我试图遵循以下网站上介绍的内容:
Next, I would like to chain together three mapreduce jobs in Google App Engine. However, I do not understand how to pass the key-value pairs from the first mapreduce job to the second mapreduce job (and subsequently the second mapreduce job to the third) using BlobKeys. I attempted to follow that introduced on the following website:
http://mattfaus.com/2013/10/google -appengine-mapreduce-in-depth/
使用BlobKeys类将BlobKey从一个mapreduce作业传递到下一个mapreduce作业.我认为我错误地实现了python类,因为在调用时,下面的代码无法识别"third_party"对象.
Which uses a BlobKeys class to pass the BlobKey from one mapreduce job to the next. I believe that I am implementing the python Class incorrectly, as when called, the "third_party" object is not recognized in the code below.
可能有人可以指出我要去哪里.对于无法提供本地驱动的测试表示歉意.这似乎有点像野兽!
Might someone be able to point out where I am going wrong. Apologies for the inability to provide a locally-driven test. This seems to be a little bit of a beast!
这是我尝试使用的课程:
Here is the class that I am attempting to use:
class BlobKeys(mapreduce.base_handler.PipelineBase):
"""Returns a dictionary with the supplied keyword arguments."""
def run(self, keys):
# Remove the key from a string in this format:
# /blobstore/<key>
return {
"blob_keys": [k.split("/")[-1] for k in keys]
}
这是调用上面的类的Pipeline代码(无法识别third_party对象):
And here is the Pipeline code that calls the class above (does not recognize the third_party object):
num_shards=2
# First define the parent pipeline job
class RecommenderPipeline(base_handler.PipelineBase):
"""A pipeline to run Recommender demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey, itr):
#logging.debug("filename is %s" % filekey)
output1 = yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_group_by_user_rating_map1",
"main.recommender_count_ratings_user_freq_reduce1",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_keys": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards)
# Code below takes output1 and feeds into second mapreduce job.
# Pipeline library ensures that the second pipeline depends on first and
# does not launch until the first has resolved.
output2 = (
yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_pairwise_items_map2",
"main.recommender_calc_similarity_reduce2",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params=( BlobKeys(output1)), #see BlobKeys Class!`
# "blob_keys": [k.split("/")[-1] for k in keys]
#"blob_keys": blobkey, # did not work since "generator pipelines cannot
# directly access ouputs of the child Pipelines that it yields", this code
# would require the generator pipeline to create a temporary dict object
# with the output of the first job - this is not allowed.
# In addition, the string returned by BobStoreOutputWriter is in the format
# /blobstore/<key>, but BlobStoreLineInputReader expects only "<key>"
# To solve these problems, use the BlobKeys class above.
#},
#mapper_params={
# #"blob_keys": [k.split("/")[-1] for k in output1]
# "blob_keys": blobkey.split("/")[-1],
#},
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards))
# Code below takes output2 and feeds into third mapreduce job.
# Pipeline library ensures that the third pipeline depends on second and
# does not launch until the second has resolved.
output3 = (
yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_calculate_ranking_map3",
"main.recommender_ranked_items_reduce3",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params=( BlobKeys(output2)), #see BobKeys Class!`
#mapper_params={
# "blob_keys": blobkey.split("/")[-1],
#},
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards))
yield StoreOutput("Recommender", filekey, output3, itr) #stores key to results so you can look at it.
我想知道我是否在使用Python类正确地遇到更多问题,还是在GAE中实现此问题更多.我怀疑两者的混合.任何帮助将不胜感激!谢谢!
I am wondering if I am having more of an issue using a Python Class properly, or more of an issue implementing this in GAE. I suspect a mixture of the two. Any help would be greatly appreciated! Thanks!
推荐答案
pipeline参数可以是具体值,也可以是PipelineFutures(在这种情况下,它将等待直到将来的值可用).在您的情况下,您要将PipelineFutures作为参数传递给具体值(BlobKeys).而是尝试产生BlobKeys(output1)并将其结果作为参数传递给下一个管道.例如:output1_1 = yield BlobKeys(输出1)output2 = yield mapreduce_pipeline.MapreducePipeline(...,mapper_params = output1_1,...)
A pipeline argument can be a concrete value or a PipelineFutures (and in that case it will wait until the future's value is available).In your case you are passing a PipelineFutures as a parameter to a concrete value (BlobKeys).Instead try yielding BlobKeys(output1) and passing its result as a parameter to the next pipeline.e.g:output1_1 = yield BlobKeys(output1)output2 = yield mapreduce_pipeline.MapreducePipeline(..., mapper_params=output1_1,...)
这篇关于我想在Python中的Google App引擎中链接多个mapreduce作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!