问题描述
谁能解释一下reducebykey
、groupbykey
、aggregatebykey
和combinebykey
之间的区别?我已阅读有关此的文档,但无法理解确切的区别.
Can anyone explain the difference between reducebykey
, groupbykey
, aggregatebykey
and combinebykey
? I have read the documents regarding this, but couldn't understand the exact differences.
有例子的解释会很棒.
推荐答案
groupByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
可能会导致磁盘不足问题,因为数据通过网络发送并在减少的工作人员上收集.
groupByKey
can cause out of disk problems as data is sent over the network and collected on the reduced workers.
reduceByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
数据在每个分区进行组合,每个分区的一个密钥只有一个输出才能通过网络发送.reduceByKey
需要将所有值组合成另一个具有完全相同类型的值.
Data are combined at each partition, with only one output for one key at each partition to send over the network. reduceByKey
required combining all your values into another value with the exact same type.
aggregateByKey:
与 reduceByKey
相同,采用初始值.
same as reduceByKey
, which takes an initial value.
3 个参数作为输入一世.初始值ii.合路逻辑三、序列操作逻辑
3 parameters as inputi. initial valueii. Combiner logiciii. sequence op logic
示例:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出:按键和结果聚合酒吧 ->3富 ->5
ouput:Aggregate By Key sum Resultsbar -> 3foo -> 5
combineByKey:
3 个参数作为输入
- 初始值:与
aggregateByKey
不同,不需要总是传递常量,我们可以传递一个会返回新值的函数. - 合并功能
- 组合功能
- Initial value: unlike
aggregateByKey
, need not pass constant always, we can pass a function that will return a new value. - merging function
- combine function
示例:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey
,aggregateByKey
,combineByKey
优先于 groupByKey
这篇关于reduceByKey、groupByKey、aggregateByKey 和 combineByKey 之间的 Spark 差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!