我有一个扁平的数据框(df),其结构如下:

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_Date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- email_name: string (nullable = true)
 |-- company: struct (nullable = true)
 |-- org_name: string (nullable = true)
 |-- company_phone: string (nullable = true)
 |-- partition_column: string (nullable = true)

而且我需要将此数据帧转换为类似的结构(因为我的下一个数据将采用这种格式):
root
 |-- firstName: string (nullable = true)
 |-- middleName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- currentPosition: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- startDate: string (nullable = true)
 |    |    |-- endDate: string (nullable = true)
 |    |    |-- address: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- zipCode: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |-- emailName: string (nullable = true)
 |    |    |-- company: struct (nullable = true)
 |    |    |    |-- orgName: string (nullable = true)
 |    |    |    |-- companyPhone: string (nullable = true)
 |-- partitionColumn: string (nullable = true)

到目前为止,我已经实现了这一点:
case class IndividualCompany(orgName: String,
                             companyPhone: String)

case class IndividualAddress(city: String,
                   zipCode: String,
                   state: String,
                   country: String)

case class IndividualPosition(title: String,
                              startDate: String,
                              endDate: String,
                              address: IndividualAddress,
                              emailName: String,
                              company: IndividualCompany)

case class Individual(firstName: String,
                     middleName: String,
                     lastName: String,
                     currentPosition: Seq[IndividualPosition],
                     partitionColumn: String)


val makeCompany = udf((orgName: String, companyPhone: String) => IndividualCompany(orgName, companyPhone))
val makeAddress = udf((city: String, zipCode: String, state: String, country: String) => IndividualAddress(city, zipCode, state, country))

val makePosition = udf((title: String, startDate: String, endDate: String, address: IndividualAddress, emailName: String, company: IndividualCompany)
                    => List(IndividualPosition(title, startDate, endDate, address, emailName, company)))


val selectData = df.select(
      col("first_name").as("firstName"),
      col("middle_name).as("middleName"),
      col("last_name").as("lastName"),
      makePosition(col("job_title"),
        col("start_date"),
        col("end_Date"),
        makeAddress(col("city"),
          col("zip_code"),
          col("state"),
          col("country")),
        col("email_name"),
        makeCompany(col("org_name"),
          col("company_phone"))).as("currentPosition"),
      col("partition_column").as("partitionColumn")
    ).as[Individual]

select_data.printSchema()
select_data.show(10)

我可以看到为select_data生成的适当模式,但是在试图获取一些实际数据的最后一行给出了错误。我收到一条错误消息,提示无法执行用户定义的功能。
 org.apache.spark.SparkException: Failed to execute user defined function(anonfun$4: (string, string, string, struct<city:string,zipCode:string,state:string,country:string>, string, struct<orgName:string,companyPhone:string>) => array<struct<title:string,startDate:string,endDate:string,address:struct<city:string,zipCode:string,state:string,country:string>,emailName:string,company:struct<orgName:string,companyPhone:string>>>)

有没有更好的方法来实现这一目标?

最佳答案

这里的问题是udf不能直接将IndividualAddressIndividualCompany用作输入。这些在Spark中表示为结构,并在udf中使用它们,正确的输入类型是Row。这意味着您需要将makePosition的声明更改为:

val makePosition = udf((title: String,
                        startDate: String,
                        endDate: String,
                        address: Row,
                        emailName: String,
                        company: Row)

现在在udf内部,您需要使用address.getAs[String]("city")访问case类元素,并且要整体使用该类,您需要再次创建它。

一种更简单,更好的替代方法是在一个udf中完成所有操作,如下所示:
val makePosition = udf((title: String,
    startDate: String,
    endDate: String,
    city: String,
    zipCode: String,
    state: String,
    country: String,
    emailName: String,
    orgName: String,
    companyPhone: String) =>
        Seq(
          IndividualPosition(
            title,
            startDate,
            endDate,
            IndividualAddress(city, zipCode, state, country),
            emailName,
            IndividualCompany(orgName, companyPhone)
          )
        )
)

10-04 17:45
查看更多