问题描述
我试图在Scala shell(驱动程序)中定义类型为String的累加器变量,但我不断收到以下错误:-
I am trying to define an accumulator variable of type String in Scala shell (driver) but I keep getting the following error:-
scala> val myacc = sc.accumulator("Test")
<console>:21: error: could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[String]
val myacc = sc.accumulator("Test")
^
对于Int或Double类型的累加器,这似乎没有问题.
This seems to be no issue for Int or Double type of accumulator.
谢谢
推荐答案
这是因为默认情况下,Spark仅提供类型为Long
,Double
和Float
的累加器.如果您还需要其他内容,则必须扩展AccumulatorParam
.
That's because Spark by default provides only accumulators of type Long
, Double
and Float
. If you need something else you have to extend AccumulatorParam
.
import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {
def zero(initialValue: String): String = {
""
}
def addInPlace(s1: String, s2: String): String = {
s"$s1 $s2"
}
}
val stringAccum = sc.accumulator("")(StringAccumulatorParam)
val rdd = sc.parallelize("foo" :: "bar" :: Nil, 2)
rdd.foreach(s => stringAccum += s)
stringAccum.value
注意:
通常,应避免将累加器用于数据可能随时间显着增长的任务.它的行为将类似于group
和collect
,并且在最坏的情况下,由于缺少资源而可能会失败.累加器主要用于简单的诊断任务,例如跟踪基本统计信息.
In general you should avoid using accumulators for tasks where data may grow significantly over time. Its behavior will similar to group
an collect
and in the worst case scenario can fail due to lack of resources. Accumulators are useful mostly for simple diagnostics tasks like keeping track of basic statistics.
这篇关于无法声明字符串类型累加器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!