在使用Scala 2.10的项目中使用Spark时,我注意到一个奇怪的行为,我正在读取属性文件,并将所有内容写入Map(loadConfig)内,并且我还创建了一个简单的方法来返回给定键的值。
问题是,当我在lazy val
类变量中获得所有blackListed名称时,由于所有namesBlackList
都具有“完全访问权”标签,因此Person
似乎为空,这是不正确的
但是,当我在namesBlackList
中写入filterAccess
时,一切工作都很好。
ConfigManager.scala
object ConfigManager extends Serializable {
private var configMap = Map.empty[String, String]
def loadConfig(configPath:String) = {
// Reads a key/value properties file and writes it in the configmap
}
def getParameter(parameter: String): String = configMap.getOrElse(parameter, s"${parameter}=>UNKNOWN")
}
AnalyseData.scala
object AnalyseData extends Serializable {
private lazy val namesBlackList = ConfigManager.getParameter("names.blacklist").toSet
def filterAccess(rdd:RDD[Person]) : RDD[Person] = {
rdd.map {person =>
if (namesBlackList.contains(person.firstName))
(person.firstName,person.lastName,"limited access")
else
(person.firstName,person.lastName,"full Access")
}
}
}
AnalyseService.scala
object AnalyseService extends Serializable {
def main(path:String) {
ConfigManager.loadConfig(path)
val datas = createNameRdd // reads from a db and create a RDD[Person]
val filteredData = AnalyseData.filterAccess(datas)
}
}
我试图调整代码中的所有内容,但由于Spark以
map
方式执行lazy
方法,因此在lazy val
类变量中设置Singleton对象的结果将不会产生正确的结果。我不明白为什么它不起作用,更重要的是,除了在方法内部调用
namesBlackList
之外,我真的找不到解决方法。感谢您的意见。
最佳答案
有关所需的某些术语和概念的说明,请参见https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka。您的情况会怎样(我认为):ConfigManager.loadConfig(path)
在驱动程序节点上运行。 configMap
在此处初始化。
在filterAccess
中,namesBlackList
实际上是一个方法调用。因此,当在工作节点上执行map
中的代码时,将在此处进行此调用并访问同一节点上的configMap
,该节点为空。
但是,当您“在filterAccess内写入namesBlackList”时,它是一个局部变量,并且确实成为闭包的一部分并被序列化。
要解决此问题,您需要对configMap
使用broadcast variable。就像是
object ConfigManager extends Serializable {
private var configMap: Broadcast[Map[String, String]] = _
def loadConfig(configPath:String) = {
// Reads a key/value properties file and writes it in the configmap
}
def getParameter(parameter: String): String = configMap.value.getOrElse(parameter, s"${parameter}=>UNKNOWN")
}
甚至最好避免使用
var
:def main(path:String) {
val configMap = ConfigManager.loadConfig(path)
val datas = createNameRdd(configMap) // reads from a db and create a RDD[Person]
val filteredData = AnalyseData.filterAccess(datas, configMap)
}