在Spark Dataset.filter中获取此null错误

输入CSV:

name,age,stat
abc,22,m
xyz,,s

工作代码:
case class Person(name: String, age: Long, stat: String)

val peopleDS = spark.read.option("inferSchema","true")
  .option("header", "true").option("delimiter", ",")
  .csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()

代码失败(添加以下行将返回错误):
val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()

返回空错误
java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

最佳答案

您得到的异常应该解释所有问题,但让我们逐步进行:

  • 当使用csv数据源加载数据时,所有字段都标记为nullable:

    val path: String = ???
    
    val peopleDF = spark.read
      .option("inferSchema","true")
      .option("header", "true")
      .option("delimiter", ",")
      .csv(path)
    
    peopleDF.printSchema
    

    root
    |-- name: string (nullable = true)
    |-- age: integer (nullable = true)
    |-- stat: string (nullable = true)
    
  • 缺少的字段表示为SQL NULL
    peopleDF.where($"age".isNull).show
    

    +----+----+----+
    |name| age|stat|
    +----+----+----+
    | xyz|null|   s|
    +----+----+----+
    
  • 接下来,您将Dataset[Row]转换为Dataset[Person],后者使用Long编码age字段。 Scala中的Long不能是null。由于输入架构为nullable,因此尽管如此,输出架构仍为nullable:

    val peopleDS = peopleDF.as[Person]
    
    peopleDS.printSchema
    

    root
     |-- name: string (nullable = true)
     |-- age: integer (nullable = true)
     |-- stat: string (nullable = true)
    

    请注意,它as[T]完全不影响架构。
  • 当您使用SQL(在已注册的表上)或Dataset API查询DataFrame时,Spark不会反序列化对象。由于模式仍然是nullable,我们可以执行:

    peopleDS.where($"age" > 30).show
    

    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    没有任何问题。这只是简单的SQL逻辑,而NULL是有效值。
  • 当我们使用静态类型的Dataset API时:

    peopleDS.filter(_.age > 30)
    

    Spark必须反序列化对象。因为Long不能是null(SQL NULL),所以它会失败,但出现了您所看到的异常。

    如果不是那样的话,您将获得NPE。
  • 正确的数据静态类型表示应使用Optional类型:

    case class Person(name: String, age: Option[Long], stat: String)
    

    调整后的滤镜功能:

    peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
    

    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    如果您愿意,可以使用模式匹配:
    peopleDS.filter {
      case Some(age) => age > 30
      case _         => false     // or case None => false
    }
    

    请注意,您不必(但仍建议这样做)对namestat使用可选类型。因为Scala String只是Java的String,所以可以是null。当然,如果采用这种方法,则必须显式检查访问的值是否为null

  • 相关Spark 2.0 Dataset vs DataFrame

    关于scala - Spark 2 Dataset Null值异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41665183/

    10-14 19:06
    查看更多