弹性分布式数据集(RDD)设计为不可变的。使它们不可变的原因之一在于容错和避免,因为它们同时由许多进程和可能的许多节点处理。这可以避免争用条件,也可以避免尝试控制那些条件所涉及的开销。
关于如何实现RDD的示例有两个(例如this one)。但是,我似乎找不到一个说明如何实现Accumulator的示例。它在Apache Spark文档的RDD部分下。
这是否意味着为值的每个增量都创建了一个新的RDD,还是完全不同的数据结构?
最佳答案
Accumulator
是执行程序的只写变量,它们可以由执行程序添加到驱动器中并只能由驱动程序读取。
executors and read by the driver only.
executor1: accumulator.add(incByExecutor1)
executor2: accumulator.add(incByExecutor2)
driver: println(accumulator.value)
Accumulators
不是线程安全的。它们并不是必须的,因为驱动程序用于在任务完成(成功或失败)之后更新累加器值的DAGScheduler.updateAccumulators方法仅在运行调度循环的单个线程上执行。除此之外,它们是具有自己的本地累加器引用的工作程序的仅写数据结构,而驱动程序仅允许访问累加器的值。累加器是可序列化的,因此可以在执行器中执行的代码中安全地引用它们,然后安全地通过网络发送执行该累加器。
val counter = sc.longAccumulator("counter")
sc.parallelize(1 to 9).foreach(x => counter.add(x))
关于python - 在Spark中,RDD是不可变的,那么如何实现累加器?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/51472810/