我有一个带有VideoID-UserID的键值对的dstream,按VideoID计算不同的UserID组的一种好的做法是什么?
// VideoID,UserID
foo,1
foo,2
bar,1
bar,2
foo,1
bar,2
如上所述,我想通过随时删除多余的
foo,1
和bar,2
来获取VideoID-CountUserID,因此结果应为:foo: 2
bar: 2
换句话说,我想在内存中保存一个大状态数据集。当一批新的dstream到达时,将其与数据集进行比较以计算每个视频的不同用户。
怎么做?
我正在使用Spark 1.6,但是可以接受更高版本的答案。 Python代码(如果可能)。
最佳答案
为了获得按视频ID分组的不同用户ID计数,请考虑使用aggregateByKey。抱歉,这是Scala,所以您必须翻译。
val rdd = sc.textFile("your_file.txt")
val initialSet = Set.empty[Int]
val addToSet = (s: Set[Int], v:Int) => s + v
val mergeSets = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2
val distinctValSets = rdd.aggregateByKey(initialSet)(addToSet, mergeSets)
val distinctValCountd = rdd.map({case(k,s) => (k,s.size)})
初始集合是聚合对象的初始值,addToSet和mergeSets指定如何向集合中添加值以及如何基于键合并不同的集合。这应该为您提供与每个视频相关的不同用户数量,并且比reduceByKey和groupByKey便宜(在空间上)。
关于python - Spark Streaming-计算状态中的不同元素,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42645253/