与Scala类型相关的问题相比,这实际上不是Spark问题,但Spark粉丝可能会对我正在做的事情感兴趣,因此我在问题的框架中始终保留“ Spark”,即:


我想递归地转换StructType的Spark sql模式,
其中包含一个列表,其元素可以是StructType或
StructField的。转换的结果应该是原始模式的版本,该模式不允许在任何字段中使用null。不幸的是,StructType和StructField不扩展
从一个共同的标记特征。这导致了我的最初实施
该方法接受“ Any”并将结果显式转换回
StructType。


初步实施

object SchemaTools extends App  {
  import org.apache.spark.sql.types._

  def noNullSchema(schema: StructType): StructType = {
    def go(element: Any): Product = element match {
      case x: StructField => x.copy(nullable = false)
      case x: StructType => StructType(x.fields.map(_.copy(nullable = false)))
      case bad => sys.error(s"element of unexpected type: $bad")
    }

    go(schema).asInstanceOf[StructType]
  }

  type Rec = (String, Seq[(Int, Int, String)])
  val schema: StructType = Encoders.product[Rec].schema

  System.out.println("pr:" + schema.prettyJson)
  System.out.println("pr:" + noNullSchema(schema).prettyJson)
}


更新

我接受Tim的回答,因为他友善地指出了我的愚蠢错误,即我没有重复使用嵌套结构。我在下面包含了去零化器的上述“概念验证”的修改版本。这适用于我的示例输入,并说明了我将采用的一般方法。使用此实现,我不会遇到与类型相关的问题。我的错! :我误解了StructType内部的内容(它始终是StructField的数组,而不是StructField或StructType的数组)。数组中的字段本身可能是数据类型“ StructType”,这导致了递归需求。无论如何……下面是一个经过修改的“玩具”实现,它说明了如果我需要一个完整的解决方案(而不是仅出于学习目的而实现),我将如何解决该问题。此代码绝对不适合生产,并且在更复杂的输入上将失败。它说明了一种可能的方法。

注意:我了解到的另一件事是关于空值和模式,这是非常重要的,要牢记...。即使正确实现了模式“ de-nuller”,Spark在解析过程中也不会强制执行空性检查。此处将对此进行详细讨论:Nullability in Spark sql schemas is advisory by default. What is best way to strictly enforce it?

*概念证明...类型不再有问题*

object SchemaTools extends App  {
  import org.apache.spark.sql.types._

  def noNullSchema(field: StructField): StructField = {
    field.dataType match {
      case  ArrayType(StructType(fields), containsNull) =>
       StructField(
         field.name,
         ArrayType(noNullSchema(StructType(fields)), containsNull),
         nullable = false,
         field.metadata)
      case _ => field.copy(nullable = false)
    }
  }

  def noNullSchema(schema: StructType): StructType =
    StructType (
      schema.fields.map { f =>
        System.out.println("f:" + f);
        noNullSchema(f)
      }
    )

  type Rec = (String, Seq[(Int, String, String)])
  val schema: StructType = Encoders.product[Rec].schema

  System.out.println("pr:" + schema.prettyJson)
  System.out.println("pr:" + noNullSchema(schema).prettyJson)
}

最佳答案

除非我对这个问题有误解,否则我认为您只需要调整原始解决方案即可进行更改

go(schema).asInstanceOf[StructType]


进入

StructType(schema.fields.map(go))


另外,go的参数和结果的类型应与StructType.fields的元素的类型相同。

由于框架对fields的元素使用通用类型,因此必须有一些处理该通用类型的代码。因此,如果该类型为Any,则必须处理Any,而类型类将无济于事。

10-06 13:23