我正在尝试使用mongo hadoop(https://github.com/mongodb/mongo-hadoop)库获取有关mongo集合的一些聚合操作。我使用mongo.input.query配置输入查询,该配置作为输入发送到newApiHadoopRDD。
Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat";
mongodbConfig.set("mongo.input.uri","mongodb://"+mongodbHost+"/"+database.collection);
mongodbConfig.set("mongo.input.query",query);
JavaPairRDD audienceRDD = sc.newAPIHadoopRDD(mongodbConfig, MongoInputFormat.class,Object.class, BSONObject.class);
audienceRDD.foreach(e -> System.out.println("data: "+e.toString()));
query={ "aggregate" : "__collection__" , "pipeline" : [
{ "$match" : { "date" : { "$gte" : { "$date" : "2016-08-09T00:00:00.000Z"} , "$lte" : { "$date" : "2016-08-11T00:00:00.000Z"}}}} ,
{ "$unwind" : "$segments"} ,
{ "$group" : { "_id" : "$segments" , "audienceSize" : { "$sum" : "$count"}}}]}, sort={ }, fields={ }, limit=0, notimeout=false}
如果我使用像find这样的普通查询,则操作成功。但是,当我尝试使用groupBy时,我并没有在RDD上获得任何记录。有人可以建议一种使用mongo hadoop连接器对mongo集合进行聚合操作的方法。
最佳答案
无论如何,由于运行聚合查询的16 MB限制,我最终用记录创建了一个临时集合,然后对该临时集合进行查询。将响应存储在RDD上,完成所需的操作后,便删除了临时集合。
那就是说,我认为增加使用mongo.input.query进行聚合查询的功能将是对这个漂亮的连接器库的不错补充。
关于mongodb - Mongo Hadoop Connecter支持的聚合?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38886068/