问题描述
我正在寻找一种方法,使用MapReduce在数据存储区的查询中进行GROUP BY操作。 AFAIK App Engine在GQL中不支持GROUP BY本身,其他开发人员建议的一种好方法是使用。
我下载了,我正在研究,我试着在我的情况下实现。但我没有成功。这是我如何尝试去做的。也许我所做的一切都是错误的。所以如果有人能帮我做到这一点,我会感谢。
我想要做的是:我有一个数据存储中的一堆联系人,每个联系人都有一个日期。在同一日期有一堆重复的联系人。我想要做的就是简化群体,收集与同一日期相同的联系人。
例如:
让我们说我有这个联系人:
- CONTACT_NAME:Foo1 |日期:01-10-2012
- CONTACT_NAME:Foo2 |日期:02-05-2012
- CONTACT_NAME:Foo1 |日期:01-10-2012
- CONTACT_NAME:Foo1 |日期:01-10-2012
- CONTACT_NAME:Foo2 |日期:2012年2月2日
- CONTACT_NAME: Foo1 | DATE: 01-10-2012
- CONTACT_NAME: Foo2 | DATE: 02-05-2012
- CONTACT_NAME: Foo1 | DATE: 01-10-2012
- CONTACT_NAME: Foo1 | DATE: 01-10-2012
- CONTACT_NAME: Foo2 | DATE: 02-05-2012
所以在MapReduce操作之后它会是这样的:
对于GROUP BY功能,我认为字数是可行的。
编辑
log是:
$ b
END EDIT
如果我做错了什么,如果我使用错误的方法做GROUP BY使用MapReduce,帮助我如何做到这一点与MapReduce。
这是我的代码:
来自联系人进口联系人
来自google.appengine.ext从google.appengine.ext.webapp进口webapp
导入模板
from google.appengine.ext.webapp.util从google.appengine.api导入run_wsgi_app
从google.appengine.ext导入邮件
.db从google.appengine.ext导入GqlQuery
导入db
$ b从google.appengine.api导入任务队列
从google.appengine.api导入用户$ b从mapreduce.lib导入文件
$从mapreduce导入文件
从mapreduce导入base_handler
导入mapreduce_pipeline $ b $从mapreduce导入操作作为op
从mapreduce导入混洗器
import simplejson,logging,re
$ b $ class GetContactData(webapp.RequestHandler):
#根据用户标识$ b $获取调用(自我):
contactId = self.request.get('contactId')
query_contacts = Contact.all()
query_contacts.filter('contact_id =',int(contactId ))
query_contacts.order(' - timestamp_')
contact_data = []
如果query_contacts!=无:
用于query_contacts中的联系人:
pipeline = WordCountPipeline( contact.date)
管道.start()
record = {contact_id:contact.contact_id,
contact_name:contact.contact_name,
contact_number:contact.contact_number,
timestamp :contact.timestamp_,
current_time:contact.current_time_,
type:contact.type_,
current_date:contact.date}
contact_data.append(record )
self.response.headers ['Content-Type'] ='application / json'
self.response.out.write(simplejson.dumps(contact_data))
class WordCountPipeline(base_handler.PipelineBase):
运行Word计数演示的管道。
参数:
blobkey:以字符串形式处理的blobkey。应该是一个带有
文本文件的zip压缩文件。
def run(self,date):
output = yield mapreduce_pipeline.MapreducePipeline(
word_count,
main.word_count_map ,
main.word_count_reduce,
mapreduce.input_readers.DatastoreInputReader,
mapreduce.output_writers.BlobstoreOutputWriter,
mapper_params = {
date :date,
},
reducer_params = {
mime_type:text / plain,
},
shards = 16)
yield StoreOutput (WordCount,输出)
类StoreOutput(base_handler.PipelineBase):
将MapReduce作业结果存储在数据库中的管道。
参数:
mr_type:mapreduce作业运行的类型(例如WordCount,Index)
encoded_key:与该作业的元数据相对应的DB键
输出:存储作业输出的blobstore位置
def run(self,mr_type,output):
logging.info(输出)#here I应该在JSON中追加分组持续时间
我基于在这个中提供的代码@autumngard,并进行了修改以适合我的目的,并且它工作正常。
I'm looking for a way to make a GROUP BY operation in a query in datastore using MapReduce. AFAIK App Engine doesn't support GROUP BY itself in GQL and a good approach suggested by other developers is use MapReduce.
I downloaded the source code and I'm studying the demo code, and I tryied to implement in my case. But I hadn't success. Here is how I tryied to do it. Maybe everything I did is wrong. So if anyone could help me to do that, I would thank.
What I want to do is: I have a bunch of contacts in the datastore, and each contact have a date. There are a bunch of repeated contacts with the same date. What I want to do is simple the group by, gather the same contacts with the same date.
E.g:
Let's say I have this contacts:
So after the MapReduce operation It would be something like this:
For a GROUP BY functionality I think word count does the work.
EDIT
The only thing that is shown in the log is:
END EDIT
If I'm doing something wrong, and if I'm using a wrong approach to do a GROUP BY with MapReduce, help me in how to do that with MapReduce.
Here is my code:
from Contacts import Contacts
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.api import mail
from google.appengine.ext.db import GqlQuery
from google.appengine.ext import db
from google.appengine.api import taskqueue
from google.appengine.api import users
from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce import shuffler
import simplejson, logging, re
class GetContactData(webapp.RequestHandler):
# Get the calls based on the user id
def get(self):
contactId = self.request.get('contactId')
query_contacts = Contact.all()
query_contacts.filter('contact_id =', int(contactId))
query_contacts.order('-timestamp_')
contact_data = []
if query_contacts != None:
for contact in query_contacts:
pipeline = WordCountPipeline(contact.date)
pipeline.start()
record = { "contact_id":contact.contact_id,
"contact_name":contact.contact_name,
"contact_number":contact.contact_number,
"timestamp":contact.timestamp_,
"current_time":contact.current_time_,
"type":contact.type_,
"current_date":contact.date }
contact_data.append(record)
self.response.headers['Content-Type'] = 'application/json'
self.response.out.write(simplejson.dumps(contact_data))
class WordCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Word count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, date):
output = yield mapreduce_pipeline.MapreducePipeline(
"word_count",
"main.word_count_map",
"main.word_count_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"date": date,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)
yield StoreOutput("WordCount", output)
class StoreOutput(base_handler.PipelineBase):
"""A pipeline to store the result of the MapReduce job in the database.
Args:
mr_type: the type of mapreduce job run (e.g., WordCount, Index)
encoded_key: the DB key corresponding to the metadata of this job
output: the blobstore location where the output of the job is stored
"""
def run(self, mr_type, output):
logging.info(output) # here I should append the grouped duration in JSON
I based on the code @autumngard provided in this question and modified to fit my purpose and it worked.
这篇关于在App Engine中使用MapReduce创建GROUP BY的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!