在使用Scala 2.10的项目中使用Spark时,我注意到一个奇怪的行为,我正在读取属性文件,并将所有内容写入Map(loadConfig)内,并且我还创建了一个简单的方法来返回给定键的值。

问题是,当我在lazy val类变量中获得所有blackListed名称时,由于所有namesBlackList都具有“完全访问权”标签,因此Person似乎为空,这是不正确的

但是,当我在namesBlackList中写入filterAccess时,一切工作都很好。

ConfigManager.scala

object ConfigManager extends Serializable {

  private var configMap = Map.empty[String, String]

  def loadConfig(configPath:String) = {
    // Reads a key/value properties file and writes it in the configmap
  }

  def getParameter(parameter: String): String = configMap.getOrElse(parameter, s"${parameter}=>UNKNOWN")
}


AnalyseData.scala

object AnalyseData extends Serializable {

    private lazy val namesBlackList = ConfigManager.getParameter("names.blacklist").toSet

    def filterAccess(rdd:RDD[Person]) : RDD[Person] = {
        rdd.map {person =>
          if (namesBlackList.contains(person.firstName))
            (person.firstName,person.lastName,"limited access")
          else
            (person.firstName,person.lastName,"full Access")
       }
    }
}


AnalyseService.scala

object AnalyseService extends Serializable {
    def main(path:String) {
        ConfigManager.loadConfig(path)

        val datas = createNameRdd // reads from a db and create a RDD[Person]

        val filteredData = AnalyseData.filterAccess(datas)

    }
}


我试图调整代码中的所有内容,但由于Spark以map方式执行lazy方法,因此在lazy val类变量中设置Singleton对象的结果将不会产生正确的结果。
我不明白为什么它不起作用,更重要的是,除了在方法内部调用namesBlackList之外,我真的找不到解决方法。

感谢您的意见。

最佳答案

有关所需的某些术语和概念的说明,请参见https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka。您的情况会怎样(我认为):


ConfigManager.loadConfig(path)在驱动程序节点上运行。 configMap在此处初始化。
filterAccess中,namesBlackList实际上是一个方法调用。因此,当在工作节点上执行map中的代码时,将在此处进行此调用并访问同一节点上的configMap,该节点为空。
但是,当您“在filterAccess内写入namesBlackList”时,它是一个局部变量,并且确实成为闭包的一部分并被序列化。


要解决此问题,您需要对configMap使用broadcast variable。就像是

object ConfigManager extends Serializable {

  private var configMap: Broadcast[Map[String, String]] = _

  def loadConfig(configPath:String) = {
    // Reads a key/value properties file and writes it in the configmap
  }

  def getParameter(parameter: String): String = configMap.value.getOrElse(parameter, s"${parameter}=>UNKNOWN")
}


甚至最好避免使用var

def main(path:String) {
    val configMap = ConfigManager.loadConfig(path)

    val datas = createNameRdd(configMap) // reads from a db and create a RDD[Person]

    val filteredData = AnalyseData.filterAccess(datas, configMap)
}

10-08 17:52