1.创建一个累加变量
public <T> Accumulator<T> accumulator(T initialValue,
AccumulatorParam<T> param)
Create an Accumulator variable of a given type, which tasks can "add" values to using the += method. Only the driver can access the accumulator's value.
Parameters:
initialValue - (undocumented)
param - (undocumented)
Returns:
(undocumented)
使用SparkContext的如上方法,可以创建一个累加变量。默认情况下,这里的T是int或者double,因此如果想要创建T为long的累加变量是不行的。
2.AccumulatorParam介绍
概念:
initialValue:Accumulator的初始值,也就是调用SparkContext.accululator时传递的initialValue
zeroValue:AccumulatorParam的初始值,也就是zero方法的返回值。
假设样本数据集合为simple={1,2,3,4}
执行顺序:
1.调用zero(initialValue),返回zeroValue
2.调用addAccumulator(zeroValue,1) 返回v1.
调用addAccumulator(v1,2)返回v2.
调用addAccumulator(v2,3)返回v3.
调用addAccumulator(v3,4)返回v4.
3.调用addInPlace(initialValue,v4)
因此最终结果是zeroValue+1+2+3+4+initialValue.
3.实现AccumulatorParam
import org.apache.spark.AccumulatorParam; public class LongAccumulator implements AccumulatorParam<Long>{ //执行完addAccumulator方法之后,最后会执行这个方法,将value加到init。
@Override
public Long addInPlace(Long init, Long value) {
// TODO Auto-generated method stub
// return arg0+arg1;
System.out.println(init+":"+value);
return init+value;
} /*
* init 就是SparkContext.accumulator(init)参数init。
* 这里的返回值是累计的起始值。注意哦,他可以不等于init。
*
* 如果init=10,zero(init)=0,那么运算过程如下:
* v1:=0+step
* v1:=v1+step
* ...
* ...
* 最后v1:=v1+init
**/
@Override
public Long zero(Long init) {
// TODO Auto-generated method stub
System.out.println(init);
return 0l;
} @Override
public Long addAccumulator(Long value, Long step) {
// TODO Auto-generated method stub
System.out.println(value+","+step);
return value+step;
} }
接下来使用它。
import java.util.Arrays;
import java.util.List; import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction; public class AccumulatorDemo {
public static void main(String[]args){
SparkConf conf=new SparkConf().setAppName("AccumulatorDemo").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf); Accumulator<Long> acc=sc.accumulator(0L,new LongAccumulator()); List<Long> seq=Arrays.asList(1L,2L,3L,4L);
JavaRDD<Long> rdd=sc.parallelize(seq); rdd.foreach(new VoidFunction<Long>(){ @Override
public void call(Long arg0) throws Exception {
acc.add(arg0);
} }); System.out.println(acc.value());;
}