在Spark Dataset.filter中获取此null错误
输入CSV:
name,age,stat
abc,22,m
xyz,,s
工作代码:
case class Person(name: String, age: Long, stat: String)
val peopleDS = spark.read.option("inferSchema","true")
.option("header", "true").option("delimiter", ",")
.csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()
代码失败(添加以下行将返回错误):
val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()
返回空错误
java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
最佳答案
您得到的异常应该解释所有问题,但让我们逐步进行:
csv
数据源加载数据时,所有字段都标记为nullable
:val path: String = ???
val peopleDF = spark.read
.option("inferSchema","true")
.option("header", "true")
.option("delimiter", ",")
.csv(path)
peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
NULL
peopleDF.where($"age".isNull).show
+----+----+----+
|name| age|stat|
+----+----+----+
| xyz|null| s|
+----+----+----+
Dataset[Row]
转换为Dataset[Person]
,后者使用Long
编码age
字段。 Scala中的Long
不能是null
。由于输入架构为nullable
,因此尽管如此,输出架构仍为nullable
:val peopleDS = peopleDF.as[Person]
peopleDS.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
请注意,它
as[T]
完全不影响架构。 Dataset
API查询DataFrame
时,Spark不会反序列化对象。由于模式仍然是nullable
,我们可以执行:peopleDS.where($"age" > 30).show
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
没有任何问题。这只是简单的SQL逻辑,而
NULL
是有效值。 Dataset
API时:peopleDS.filter(_.age > 30)
Spark必须反序列化对象。因为
Long
不能是null
(SQL NULL
),所以它会失败,但出现了您所看到的异常。如果不是那样的话,您将获得NPE。
Optional
类型:case class Person(name: String, age: Option[Long], stat: String)
调整后的滤镜功能:
peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
如果您愿意,可以使用模式匹配:
peopleDS.filter {
case Some(age) => age > 30
case _ => false // or case None => false
}
请注意,您不必(但仍建议这样做)对
name
和stat
使用可选类型。因为Scala String
只是Java的String
,所以可以是null
。当然,如果采用这种方法,则必须显式检查访问的值是否为null
。 相关Spark 2.0 Dataset vs DataFrame
关于scala - Spark 2 Dataset Null值异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41665183/