问题描述
我有一个 DatasetinputDS
有 4 列,即 Id, List时间,列表value,aggregateType
我想使用 map 函数向 Dataset
value_new
添加一列,该 map 函数需要列 time
,value
和 aggregateType
将其传递给函数 getAggregate(String aggregateType, List time, List value)
并返回一个双精度值处理参数的值.getAggregate
方法返回的 Double
值将是新的列值,即 value_new
I have a Dataset<Row> inputDS
which has 4 columns namely Id, List<long> time, List<String> value, aggregateType
I want to add one more column to the Dataset
value_new
using map function, that map function takes columns time
, value
and aggregateType
passes that to a function getAggregate(String aggregateType, List<long> time, List<String> value)
and return a double value on processing the parameters. The Double
value returned by the method getAggregate
will be the new column value i.e value of value_new
数据集输入DS
+------+---+-----------+---------------------------------------------+---------------+
| Id| value | time |aggregateType |
+------+---------------+---------------------------------------------+---------------+
|0001 | [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum |
+------+---------------+---------------------------------------------+---------------+
预期数据集输出DS
+------+---------------+---------------------------------------------+---------------+-----------+
| Id| value | time |aggregateType | value_new |
+------+---------------+---------------------------------------------+---------------+-----------+
|0001 | [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum | 9.4 |
+------+---------------+---------------------------------------------+---------------+-----------+
我尝试过的代码.
inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){
public double call(Row row){
String aggregateType = row.getAS("aggregateType");
List<long> timeList = row.getList("time");
List<long> valueList= row.getList("value");
return getAggregate(aggregateType ,timeList,valueList);
}}),Encoders.DOUBLE())));
错误
Unsupported literal type class org.apache.spark.sql.Dataset [value:double]
注意对不起,如果我错误地使用了map
函数,如果有任何解决方法,请建议我.
Note Sorry if I used map
function wrongly and please suggest me if there is any workaround.
谢谢.
推荐答案
您收到错误,因为您正在尝试使用 Dataset 的结果创建函数文字 (
,你可以在文档中看到它是一个数据集.您可以在 lit()
).map()Dataset.withColumn()
的 API 中看到您需要一个作为列的参数.
You get the error because you are trying to create a function literal (lit()
) using the result of Dataset.map()
, which you can see in docs is a Dataset. You can see in the API for Dataset.withColumn()
that you need a argument that is a column.
看来您需要创建一个用户定义的函数.看看如何调用使用 JAVA 的 Spark DataFrame 上的 UDF?
It seems like you need to create a user-defined function. Take a look at How do I call a UDF on a Spark DataFrame using JAVA?
这篇关于如何使用 Java UDF 向 Spark 数据框添加新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!