我有一个MovieRatings数据库,其中包含userIdmovieIdmovie-categoryIdreviewIdmovieRatingreviewDate列。

在我的映射器中,我要提取userId->(movieId,movieRating)

然后在 reducer 中,我想按用户将所有movieId,movieRating对分组。

这是我的尝试:

map 功能:

var map = function() {
    var values={movieId : this.movieId, movieRating : this.movieRating};
    emit(this.userId, values);}

简化函数:
var reduce = function(key,values) {
    var ratings = [];
    values.forEach(function(V){
        var temp = {movieId : V.movieId, movieRating : V.movieRating};
        Array.prototype.push.apply(ratings, temp);
        });
    return {userId : key, ratings : ratings };
}

运行MapReduce :
db.ratings.mapReduce(map, reduce, { out: "map_reduce_step1" })

输出:db.map_reduce_step1.find()
{ "_id" : 1, "value" : { "userId" : 1, "ratings" : [ ] } }
{ "_id" : 2, "value" : { "userId" : 2, "ratings" : [ ] } }
{ "_id" : 3, "value" : { "userId" : 3, "ratings" : [ ] } }
{ "_id" : 4, "value" : { "userId" : 4, "ratings" : [ ] } }
{ "_id" : 5, "value" : { "userId" : 5, "ratings" : [ ] } }
{ "_id" : 6, "value" : { "userId" : 6, "ratings" : [ ] } }
{ "_id" : 7, "value" : { "userId" : 7, "ratings" : [ ] } }
{ "_id" : 8, "value" : { "userId" : 8, "ratings" : [ ] } }
{ "_id" : 9, "value" : { "userId" : 9, "ratings" : [ ] } }
{ "_id" : 10, "value" : { "userId" : 10, "ratings" : [ ] } }
{ "_id" : 11, "value" : { "userId" : 11, "ratings" : [ ] } }
{ "_id" : 12, "value" : { "userId" : 12, "ratings" : [ ] } }
{ "_id" : 13, "value" : { "userId" : 13, "ratings" : [ ] } }
{ "_id" : 14, "value" : { "userId" : 14, "ratings" : [ ] } }
{ "_id" : 15, "value" : { "movieId" : 1, "movieRating" : 3 } }
{ "_id" : 16, "value" : { "userId" : 16, "ratings" : [ ] } }

我没有得到预期的输出。实际上,此输出对我来说毫无意义!

这是我在reducer中试图做的python等效项(以防上面没有弄清reducer的目的):
def reducer_ratings_by_user(self, user_id, itemRatings):
        #Group (item, rating) pairs by userID
        ratings = []
        for movieID, rating in itemRatings:
            ratings.append((movieID, rating))
        yield user_id, ratings

编辑1 @chridam

这是我在这里真正想要做的概述:

Movies.csv 文件如下所示:

userId,movieId,movie-categoryId,reviewId,movieRating,reviewDate
1,1,1,1,5,7 / 12/2000
2,1,1,2,5,7 / 12/2000
3,1,1,3,5,7 / 12/2000
4,1,1,4,4,7 / 12/2000
5,1,1,5,4,7 / 12/2000
6,1,1,6,5,7 / 15/2000
1,2,1,7,4,7 / 25/2000
8,1,1,8,4,7 / 28/2000
9,1,1,9,3,8 / 3/2000
...
...

我将其导入mongoDB:
mongoimport --db SomeName --collection ratings --type csv --headerline --file Movies.csv

然后我尝试应用上面定义的map-reduce函数。之后,我将通过执行以下操作将其导出回csv:
mongoexport --db SomeName --collection map_reduce_step1 --csv --out movie_ratings_out.csv --fields ...

movie_ratings_out.csv文件应类似于:

userId,movieId1,rating1,movieId2,rating2,...
1,1,5,2,4
...
...

因此,每一行包含每个用户的所有(电影,评分)对。

编辑2

样品:
db.ratings.find().pretty()
{
    "_id" : ObjectId("57f4a0dd9cb74fc4d344a40f"),
    "userId" : 4,
    "movieId" : 1,
    "movie-categoryId" : 1,
    "reviewId" : 4,
    "movieRating" : 4,
    "reviewDate" : "7/12/2000"
}
{
    "_id" : ObjectId("57f4a0dd9cb74fc4d344a410"),
    "userId" : 5,
    "movieId" : 1,
    "movie-categoryId" : 1,
    "reviewId" : 5,
    "movieRating" : 4,
    "reviewDate" : "7/12/2000"
}
{
    "_id" : ObjectId("57f4a0dd9cb74fc4d344a411"),
    "userId" : 4,
    "movieId" : 2,
    "movie-categoryId" : 1,
    "reviewId" : 6,
    "movieRating" : 5,
    "reviewDate" : "7/15/2000"
}
{
    "_id" : ObjectId("57f4a0dd9cb74fc4d344a412"),
    "userId" : 4,
    "movieId" : 3,
    "movie-categoryId" : 1,
    "reviewId" : 2,
    "movieRating" : 5,
    "reviewDate" : "7/12/2000"
}
...

然后在MapReduce之后,预期的输出json是:
{
    "_id" : ....,
    "userId" : 4,
    "movieList" : [ {
           "movieId" : 2
           "movieRating" : 5
         },
         {
           "movieId" : 1
           "movieRating" : 4
         }
         ...
        ]
   }
   {
    "_id" : ....,
    "userId" : 5,
    "movieList" : ...
   }
   ...

最佳答案

您只需要运行一个聚合管道,该管道由对文档进行汇总的 $group 阶段组成。这通过指定的标识符表达式对输入文档进行分组,并应用累加器表达式。 $group 管道运算符类似于SQL的GROUP BY子句。在SQL中,除非使用任何聚合函数,否则不能使用GROUP BY。同样,您还必须在MongoDB中使用聚合函数。您可以在此处阅读有关聚合功能的更多信息。

您需要创建movieList数组的累加器运算符是 $push

$group 阶段之后的另一个管道是 $project 运算符,该运算符用于选择或重定流中的每个文档,包括,排除或重命名字段,注入(inject)计算字段,使用数学表达式创建子文档字段,日期,字符串和/或逻辑(比较, bool(boolean) 值,控制)表达式-与使用SQL SELECT子句类似。

最后一步是 $out 管道,该管道将聚合管道的结果文档写入集合。它必须是管道中的最后阶段。

因此,您可以运行以下聚合操作:

db.ratings.aggregate([
    {
        "$group": {
            "_id": "$userId",
            "movieList": {
                "$push": {
                    "movieId": "$movieId",
                    "movieRating": "$movieRating",
                }
            }
        }
    },
    {
        "$project": {
            "_id": 0, "userId": "$_id", "movieList": 1
        }
    },
    { "$out": "movie_ratings_out" }
])

使用上面的示例5文档,如果您查询db.getCollection('movie_ratings_out').find({}),示例输出将产生:
/* 1 */
{
    "_id" : ObjectId("57f52636b9c3ea346ab1d399"),
    "movieList" : [
        {
            "movieId" : 1.0,
            "movieRating" : 4.0
        }
    ],
    "userId" : 5.0
}

/* 2 */
{
    "_id" : ObjectId("57f52636b9c3ea346ab1d39a"),
    "movieList" : [
        {
            "movieId" : 1.0,
            "movieRating" : 4.0
        },
        {
            "movieId" : 2.0,
            "movieRating" : 5.0
        },
        {
            "movieId" : 3.0,
            "movieRating" : 5.0
        }
    ],
    "userId" : 4.0
}

关于mongodb - MongoDB MapReduce-如何在reduce函数中填充数组?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/39868058/

10-16 01:47