我有一个MovieRatings数据库,其中包含userId
,movieId
,movie-categoryId
,reviewId
,movieRating
和reviewDate
列。
在我的映射器中,我要提取userId->(movieId,movieRating)
然后在 reducer 中,我想按用户将所有movieId,movieRating对分组。
这是我的尝试:
map 功能:
var map = function() {
var values={movieId : this.movieId, movieRating : this.movieRating};
emit(this.userId, values);}
简化函数:
var reduce = function(key,values) {
var ratings = [];
values.forEach(function(V){
var temp = {movieId : V.movieId, movieRating : V.movieRating};
Array.prototype.push.apply(ratings, temp);
});
return {userId : key, ratings : ratings };
}
运行MapReduce :
db.ratings.mapReduce(map, reduce, { out: "map_reduce_step1" })
输出:
db.map_reduce_step1.find()
{ "_id" : 1, "value" : { "userId" : 1, "ratings" : [ ] } }
{ "_id" : 2, "value" : { "userId" : 2, "ratings" : [ ] } }
{ "_id" : 3, "value" : { "userId" : 3, "ratings" : [ ] } }
{ "_id" : 4, "value" : { "userId" : 4, "ratings" : [ ] } }
{ "_id" : 5, "value" : { "userId" : 5, "ratings" : [ ] } }
{ "_id" : 6, "value" : { "userId" : 6, "ratings" : [ ] } }
{ "_id" : 7, "value" : { "userId" : 7, "ratings" : [ ] } }
{ "_id" : 8, "value" : { "userId" : 8, "ratings" : [ ] } }
{ "_id" : 9, "value" : { "userId" : 9, "ratings" : [ ] } }
{ "_id" : 10, "value" : { "userId" : 10, "ratings" : [ ] } }
{ "_id" : 11, "value" : { "userId" : 11, "ratings" : [ ] } }
{ "_id" : 12, "value" : { "userId" : 12, "ratings" : [ ] } }
{ "_id" : 13, "value" : { "userId" : 13, "ratings" : [ ] } }
{ "_id" : 14, "value" : { "userId" : 14, "ratings" : [ ] } }
{ "_id" : 15, "value" : { "movieId" : 1, "movieRating" : 3 } }
{ "_id" : 16, "value" : { "userId" : 16, "ratings" : [ ] } }
我没有得到预期的输出。实际上,此输出对我来说毫无意义!
这是我在reducer中试图做的python等效项(以防上面没有弄清reducer的目的):
def reducer_ratings_by_user(self, user_id, itemRatings):
#Group (item, rating) pairs by userID
ratings = []
for movieID, rating in itemRatings:
ratings.append((movieID, rating))
yield user_id, ratings
编辑1 @chridam
这是我在这里真正想要做的概述:
Movies.csv 文件如下所示:
userId,movieId,movie-categoryId,reviewId,movieRating,reviewDate
1,1,1,1,5,7 / 12/2000
2,1,1,2,5,7 / 12/2000
3,1,1,3,5,7 / 12/2000
4,1,1,4,4,7 / 12/2000
5,1,1,5,4,7 / 12/2000
6,1,1,6,5,7 / 15/2000
1,2,1,7,4,7 / 25/2000
8,1,1,8,4,7 / 28/2000
9,1,1,9,3,8 / 3/2000
...
...
我将其导入mongoDB:
mongoimport --db SomeName --collection ratings --type csv --headerline --file Movies.csv
然后我尝试应用上面定义的map-reduce函数。之后,我将通过执行以下操作将其导出回csv:
mongoexport --db SomeName --collection map_reduce_step1 --csv --out movie_ratings_out.csv --fields ...
此
movie_ratings_out.csv
文件应类似于:userId,movieId1,rating1,movieId2,rating2,...
1,1,5,2,4
...
...
因此,每一行包含每个用户的所有(电影,评分)对。
编辑2
样品:
db.ratings.find().pretty()
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a40f"),
"userId" : 4,
"movieId" : 1,
"movie-categoryId" : 1,
"reviewId" : 4,
"movieRating" : 4,
"reviewDate" : "7/12/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a410"),
"userId" : 5,
"movieId" : 1,
"movie-categoryId" : 1,
"reviewId" : 5,
"movieRating" : 4,
"reviewDate" : "7/12/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a411"),
"userId" : 4,
"movieId" : 2,
"movie-categoryId" : 1,
"reviewId" : 6,
"movieRating" : 5,
"reviewDate" : "7/15/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a412"),
"userId" : 4,
"movieId" : 3,
"movie-categoryId" : 1,
"reviewId" : 2,
"movieRating" : 5,
"reviewDate" : "7/12/2000"
}
...
然后在MapReduce之后,预期的输出json是:
{
"_id" : ....,
"userId" : 4,
"movieList" : [ {
"movieId" : 2
"movieRating" : 5
},
{
"movieId" : 1
"movieRating" : 4
}
...
]
}
{
"_id" : ....,
"userId" : 5,
"movieList" : ...
}
...
最佳答案
您只需要运行一个聚合管道,该管道由对文档进行汇总的 $group
阶段组成。这通过指定的标识符表达式对输入文档进行分组,并应用累加器表达式。 $group
管道运算符类似于SQL的GROUP BY
子句。在SQL中,除非使用任何聚合函数,否则不能使用GROUP BY
。同样,您还必须在MongoDB中使用聚合函数。您可以在此处阅读有关聚合功能的更多信息。
您需要创建movieList
数组的累加器运算符是 $push
。
$group
阶段之后的另一个管道是 $project
运算符,该运算符用于选择或重定流中的每个文档,包括,排除或重命名字段,注入(inject)计算字段,使用数学表达式创建子文档字段,日期,字符串和/或逻辑(比较, bool(boolean) 值,控制)表达式-与使用SQL SELECT
子句类似。
最后一步是 $out
管道,该管道将聚合管道的结果文档写入集合。它必须是管道中的最后阶段。
因此,您可以运行以下聚合操作:
db.ratings.aggregate([
{
"$group": {
"_id": "$userId",
"movieList": {
"$push": {
"movieId": "$movieId",
"movieRating": "$movieRating",
}
}
}
},
{
"$project": {
"_id": 0, "userId": "$_id", "movieList": 1
}
},
{ "$out": "movie_ratings_out" }
])
使用上面的示例5文档,如果您查询
db.getCollection('movie_ratings_out').find({})
,示例输出将产生:/* 1 */
{
"_id" : ObjectId("57f52636b9c3ea346ab1d399"),
"movieList" : [
{
"movieId" : 1.0,
"movieRating" : 4.0
}
],
"userId" : 5.0
}
/* 2 */
{
"_id" : ObjectId("57f52636b9c3ea346ab1d39a"),
"movieList" : [
{
"movieId" : 1.0,
"movieRating" : 4.0
},
{
"movieId" : 2.0,
"movieRating" : 5.0
},
{
"movieId" : 3.0,
"movieRating" : 5.0
}
],
"userId" : 4.0
}
关于mongodb - MongoDB MapReduce-如何在reduce函数中填充数组?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/39868058/