我想做这样的事情。

val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....


上面的代码中the_accumulator_ojbect的位置应该是什么?写ac就好了吗?

另外,在功能上

def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
    .....
}


上面的函数中TypeOfAccumulator的位置应该是什么?

最佳答案

有关火花蓄能器的其他信息,请参见here

根据有关累加器创建的scala-docs:


/ ** *创建一个[[org.apache.spark.Accumulator]]变量
给定类型,并在Spark UI中显示*。任务可以
使用+=方法将值“累加”到累加器。只有 *
驱动程序可以访问累加器的value。 * /


默认的累加器类型为int。您可以设置自己的类型,但是需要正确实现+=方法以将值添加到自己的累加器类型中:

val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")


您的主要代码片段将类似于:

val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)


someFunction方法的植入方式如下:

def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
    ...
    ac += 1
    ...
}

10-06 06:51