我在https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java中看到了示例代码

该代码使用单例来包装广播变量,如下所示:

class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
  if (instance == null) {
    synchronized (JavaWordBlacklist.class) {
      if (instance == null) {
        List<String> wordBlacklist = Arrays.asList("a", "b", "c");
        instance = jsc.broadcast(wordBlacklist);
      }
    }
  }
  return instance;
}
}


并在wordCounts.foreachRDD((rdd, time) -> {...}中初始化广播变量

我的问题是,为什么不只在父类(即private static volatile Broadcast<List<String>> instance = null;)中声明JavaRecoverableNetworkWordCount

(我认为,由于广播变量是在单个驱动程序线程中执行的foreachRDD()中初始化的,因此此处不会发生争用情况,因此不需要单例保护。)

最佳答案

这样做是为了解决检查点恢复中出现的问题。请记住,检查点仅捕获元数据和/或分布式状态,而不捕获广播变量,累加器和本地对象。从检查点重新启动应用程序后,必须手动恢复所有状态。

否可以解决您的观点:


  由于广播变量是在单个驱动程序线程中执行的foreachRDD()中初始化的,


驱动程序不是单线程的,并且出于与数据处理(簿记,报告)不同的目的访问广播变量,也可以同时由多个流访问。

10-06 02:21