本文介绍了Spark - 当我从地图调用自定义函数时收到 java.lang.UnsupportedOperationException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类似于以下结构的 DataFrame:

I have a DataFrame with a structure similar to:

root
 |-- NPAData: struct (nullable = true)
 |    |-- NPADetails: struct (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- manager: string (nullable = true)
 |    |-- service: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- serviceName: string (nullable = true)
 |    |    |    |-- serviceCode: string (nullable = true)
 |-- NPAHeader: struct (nullable = true)
 |    |    |-- npaNumber: string (nullable = true)
 |    |    |-- date: string (nullable = true)

我正在尝试的是:

  • 将具有相同npaNumber的记录分组到一个列表中
  • 在每个列表中,根据元素的日期
  • 对元素进行排序
  • 一旦我将元素分组和排序,我需要合并元素应用一些逻辑.为了执行这个列表步骤,我决定使用地图.
  • Group the records which has got the same npaNumber into a list
  • Inside each list, order the elements depending on their date
  • Once I have the elements grouped and ordered, I need merge theelements applying some logic. To perform this list step I decided touse a map.

这是我目前尝试过的:

val toUpdate = sourceDF.withColumn("count", count($"NPAHeader").over(Window.partitionBy("NPAHeader.npaNumber").orderBy($"NPAHeader.date".desc))).filter($"count" > 1)

val groupedNpa = toUpdate.groupBy($"NPAHeader.npaNumber" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))

//This is a simply version of my logic.
def pickOne(List: Seq[Row]): Row = {
      println("First element: "+List.get(0))
      List.get(0)
}

val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))))

在 groupBy 之后的一个 Row 示例是:

An example of a Row after the groupBy would be:

[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]

但是当我尝试从地图调用函数时出现异常.

But I am getting an exception when I try to invoke the function from the map.

线程main"中的异常java.lang.UnsupportedOperationException:否找到 org.apache.spark.sql.Row 的编码器- 字段(类:org.apache.spark.sql.Row",名称:_2")- 根类:scala.Tuple2"

我的理解是我无法从地图中调用函数 pickOne() (或者至少不是我尝试的方式).但我不知道我做错了什么.

What I understand is I can not invoke the function pickOne() from the map (Or at least not in the way I am trying it). But I don't know what am I doing wrong.

为什么我有那个例外?

感谢您的时间!

注意:我知道有更简单的方法可以在不调用自定义函数的情况下从列表中选取一个元素.但我需要调用它 yes 或 yes,因为在下一步中我需要放置一个更复杂的逻辑来合并行.

Note: I know there are easier ways to pick up one element from the list without invoking the custom function. But I need to invoke it yes or yes, because in the next step I need to place there a far more complex logic to merge rows.

使用 Mahesh Chand Kandpal 后的建议:

After using Mahesh Chand Kandpal suggestion:

import org.apache.spark.sql.catalyst.encoders.RowEncoder

grouped.map(row => "emdNumber: "+row.getAs[String]("emdNumber"))
val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))(RowEncoder(row.schema))))

我收到以下错误:

类型不匹配;成立 :org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row]要求:Int

我应该如何应用编码器?

How should I apply the Encoder instead?

推荐答案

map 和 dataframe 一起使用时,需要给encoder.

When you use map with dataframe, you need to give encoder.

在 spark 2.x Dataset[Row].map 是 ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

In spark 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(schema)

这篇关于Spark - 当我从地图调用自定义函数时收到 java.lang.UnsupportedOperationException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 04:43