问题描述
在探索Spark累加器时,我试图理解并展示Spark中的累加器和常规变量之间的区别。但是输出似乎与我的期望不符。我的意思是,累加器和计数器在程序末尾具有相同的值,并且能够在转换函数内读取累加器(根据文档,只有驱动程序才能读取累加器)。难道我做错了什么?我的理解正确吗?
object累加器扩展了应用{{b
$ b val spark = SparkSession.builder() .appName( Accmulators)。master( local [3])。getOrCreate()
val cntAccum = spark.sparkContext.longAccumulator( counterAccum)
val file = spark.sparkContext.textFile( resources / InputWithBlank)
var counter = 0
def countBlank(line:String):Array [String] = {
val trimped = line.trim
if(trimmed ==){
cntAccum.add(1)
cntAccum.value //读取累加器
counter + = 1
}
返回line.split()
}
file.flatMap(line => countBlank(line))。collect()
println(cntAccum.value)
println(counter)
}
输入文件的文本之间有9个空行
4)聚合和联接
5)Spark SQL
6)Spark应用调整
输出:
计数器和cntAccum都给出相同的结果。
计数器
是本地变量,可能正在您的本地程序 .master( local [3])
将在驱动程序上执行。假设您正在运行 yarn
模式。那么所有逻辑将以分布式方式工作,您的局部变量将不会被更新(因为其局部变量将被更新),但是累加器将被更新。由于其分布变量。假设您有2个执行程序正在运行该程序...一个执行程序将更新,而另一个执行程序可以看到最新值。
在这种情况下,您的 cntAccum
能够以纱线分布模式从其他执行者那里获得最新价值。其中作为局部变量 counter
不能...
因为
在图像执行程序ID中为localhost。如果您使用的纱线带有2-3个执行者,它将显示执行者ID。希望能有所帮助。
While exploring Spark accumulators, I tried to understand and showcase the difference between the accumulator and regular variable in Spark. But output does not seem to match my expectation. I mean both the accumulator and counter have the same value at the end of program and am able read accumulator within transformation function (as per docs only driver can read accumulator). Am i doing something wrong? Is my understanding correct?
object Accmulators extends App {
val spark = SparkSession.builder().appName("Accmulators").master("local[3]").getOrCreate()
val cntAccum = spark.sparkContext.longAccumulator("counterAccum")
val file = spark.sparkContext.textFile("resources/InputWithBlank")
var counter = 0
def countBlank(line:String):Array[String]={
val trimmed = line.trim
if(trimmed == "") {
cntAccum.add(1)
cntAccum.value //reading accumulator
counter += 1
}
return line.split(" ")
}
file.flatMap(line => countBlank(line)).collect()
println(cntAccum.value)
println(counter)
}
The input file has text with 9 empty lines in between
4) Aggregations and Joins
5) Spark SQL
6) Spark App tuning
Output :
Both counter and cntAccum giving same result.
counter
is local variable may be is working in your local program .master("local[3]")
which will execute on driver. imagine you are running yarn
mode. then all the logic will be working in a distributed way your local variable wont be updated (since its local its getting updated) but accumulator will be updated. since its distributed variable. suppose you have 2 executors running the program... one executor will update and another executor can able to see the latest value. In this case your cntAccum
is capable of getting latest value from other executors in yarn distributed mode. where as local variable counter
cant...
since accumulators are read and write. see docs here.
In the image exeutor id is localhost. if you are using yarn with 2-3 executors it will show executor ids. Hope that helps..
这篇关于Spark:累加器和局部变量之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!