问题描述
为了遍历从 Hive 表创建的 Spark Dataframe 的列并更新所有出现的所需列值,我尝试了以下代码.
To iterate through columns of a Spark Dataframe created from Hive table and update all occurrences of desired column values, I tried the following code.
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
val a: DataFrame = spark.sql(s"select * from default.table_a")
val column_names: Array[String] = a.columns
val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))
val func = udf((value: String) => { if if (value == "XXXX" || value == "WWWW" || value == "TTTT") "NULL" else value } )
val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
在 spark shell 中执行代码时出现以下错误.
When executed the code in spark shell I got the following error.
scala> val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
<console>:35: error: value a is not a member of org.apache.spark.sql.DataFrame
val b = {for (column: String <- required_column_list) { a.withColumn(column , isNull(a(column))) } a}
^
我也尝试了以下语句,但没有得到所需的输出.
Also I tried the following statement and didn't get required output.
val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
变量 b 被创建为一个 Unit 而不是 Dataframe.
The variable b is created a Unit instead of Dataframe.
scala> val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
b: Unit = ()
请建议任何更好的方法来遍历 Dataframe 的列并更新列中所有出现的值或纠正我错误的地方.任何其他解决方案也受到赞赏.提前致谢.
Please suggest any better way to iterate through the columns of Dataframe and update all occurances of values from columns or correct where I am wrong. Any other solution is also appreciated. Thanks in advance.
推荐答案
代替 for 循环,您应该使用 foldLeft
.而且你不需要udf
函数,when
内置函数就可以使用
Instead of for loop, you should go with foldLeft
. And you don't need a udf
function, when
inbuilt function can be used
val column_names: Array[String] = a.columns
val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))
import org.apache.spark.sql.functions._
val b = required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))}
希望回答对你有帮助
In
required_columns.foldLeft(a){(tempdf, colName) =>tempdf.withColumn(colName, when(col(colName) === "XXX"; || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL";).否则(col(colName)))}
required_columns
是来自 a
数据帧/数据集的列名数组,其中 _date
作为结束字符串,它们是 colName<
withColumn
required_columns
is an array of column names from a
dataframe/dataset with _date
as ending string, which are the colName
inside withColumn
tempdf
是原始数据帧/数据集,即 a
tempdf
is the original dataframe/dataset i.e. a
当函数被应用在 withColumn
中时,它替换了所有的 XXX
或 WWWWW
或 TTTT
值到 NULL
when function is applied inside withColumn
which replaces all XXX
or WWWWW
or TTTT
values to NULL
最后foldLeft
将所有转换应用的数据帧返回到b
这篇关于遍历 Spark 数据帧的列并更新指定的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!