我在https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java中看到了示例代码
该代码使用单例来包装广播变量,如下所示:
class JavaWordBlacklist {
private static volatile Broadcast<List<String>> instance = null;
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}
并在
wordCounts.foreachRDD((rdd, time) -> {...}
中初始化广播变量我的问题是,为什么不只在父类(即
private static volatile Broadcast<List<String>> instance = null;
)中声明JavaRecoverableNetworkWordCount
?(我认为,由于广播变量是在单个驱动程序线程中执行的
foreachRDD()
中初始化的,因此此处不会发生争用情况,因此不需要单例保护。) 最佳答案
这样做是为了解决检查点恢复中出现的问题。请记住,检查点仅捕获元数据和/或分布式状态,而不捕获广播变量,累加器和本地对象。从检查点重新启动应用程序后,必须手动恢复所有状态。
否可以解决您的观点:
由于广播变量是在单个驱动程序线程中执行的foreachRDD()中初始化的,
驱动程序不是单线程的,并且出于与数据处理(簿记,报告)不同的目的访问广播变量,也可以同时由多个流访问。