通过继承AccumulatorV2可以实现自定义累加器。
官方案例可参考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
下面是我自己写的一个统计卡种数量的案例。
package com.shuai7boy.myscalacode import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2 case class Card(var card1Count: Int, var card2Count: Int) class CalcCardCount extends AccumulatorV2[Card, Card] {
var result = new Card(, ) /** *
* 判断,这个要和reset设定值一致
*
* @return
*/
override def isZero: Boolean = {
result.card1Count == && result.card2Count ==
} /** *
* 复制一个新的对象
*
* @return
*/
override def copy(): AccumulatorV2[Card, Card] = {
val newCalcCardCount = new CalcCardCount()
newCalcCardCount.result = this.result
newCalcCardCount
} /** *
* 重置每个分区的数值
*/
override def reset(): Unit = {
result.card1Count =
result.card2Count =
} /**
* 每个分区累加自己的数值
*
* @param v
*/
override def add(v: Card): Unit = {
result.card1Count += v.card1Count
result.card2Count += v.card2Count
} /** *
* 合并分区值,求得总值
*
* @param other
*/
override def merge(other: AccumulatorV2[Card, Card]): Unit = other match {
case o: CalcCardCount => {
result.card1Count += o.result.card1Count
result.card2Count += o.result.card2Count
} } //返回结果
override def value: Card = result
} object CardCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("calcCardCountDemo").setMaster("local")
val sc = new SparkContext(conf)
val cc = new CalcCardCount
sc.register(cc)
val cardList = sc.parallelize(List[String]("card1 1", "card1 3", "card1 7", "card2 5", "card2 2"), )
val cardMapRDD = cardList.map(card => {
var cardInfo = new Card(, )
card.split(" ")() match {
case "card1" => cardInfo = Card(card.split(" ")().toInt, )
case "card2" => cardInfo = Card(, card.split(" ")().toInt)
case _ => Card(, )
}
cc.add(cardInfo)
})
cardMapRDD.count() //执行action,触发上面的累加操作
println("card1总数量为:" + cc.result.card1Count + ",card2总数量为:" + cc.result.card2Count)
}
}
打印结果是:
card1总数量为:,card2总数量为:
通过上面代码,就可以同时统计两个变量的值了,当然如果需要更多,可以扩展。默认的累加器只实现了一个。