I am very new to spark and I want to explode my df in such a way that it will create a new column with its splited values and it also has the order or index of that particular value respective to its row.CODE:import spark.implicits._ val df = Seq("40000.0~0~0~", "0~40000.0~", "0~", "1000.0~0~0~", "1333.3333333333333~0~0~0~0", "66666.66666666667~0~0~") .toDF("VALUES") df.show(false)Input DF:+--------------------------+|VALUES |+--------------------------+|40000.0~0~0~ ||0~40000.0~ ||0~ ||1000.0~0~0~ ||1333.3333333333333~0~0~0~0||66666.66666666667~0~0~ |+--------------------------+Output DF:+------+------------------+-----------+|row_id|col |order |+------+------------------+-----------+|1 |40000.0 |1 ||1 |0 |2 ||1 |0 |3 ||1 | |4 |<== don't want this column with empty or null value|2 |0 |1 ||2 |40000.0 |2 | |2 | |3 |<== don't want this column with empty or null value|3 |0 |1 ||3 | |2 |<== don't want this column with empty or null value|4 |1000.0 |1 ||4 |0 |2 ||4 |0 |3 ||4 | |4 |<== don't want this column with empty or null value|5 |1333.3333333333333|1 ||5 |0 |2 ||5 |0 |3 ||5 |0 |4 ||5 |0 |5 ||6 |66666.66666666667 |1 ||6 |0 |2 ||6 |0 |3 ||6 | |4 |<== don't want this column with empty or null value+------+------------------+-----------+也不希望此列具有空值或空值.Also don't want this column with an empty or null value.如何在 Scala - spark 中完成此操作?How this can be done in scala - spark?推荐答案使用window函数添加row_id然后使用posexplode 和 filter 过滤掉空值.Use window function to add row_id then use posexplode and filter to filter out the empty value.示例:import org.apache.spark.sql.functions._import org.apache.spark.sql.expressions._val w=Window.orderBy(col("id"))df.withColumn("id",monotonically_increasing_id()).withColumn("row_id",row_number().over(w)).selectExpr("row_id","posexplode(split(values,'~')) as (pos, val)").withColumn("order",col("pos") + 1).drop("pos").filter(length(col("val")) !== 0).show()//or using exprdf.withColumn("id",monotonically_increasing_id()).withColumn("row_id",row_number().over(w)).withColumn("arr",expr("filter(split(values,'~'),x -> x != '')")).selectExpr("row_id","""posexplode(arr) as (pos, val)""").withColumn("order",col("pos") + 1).drop("pos").show()//+------+------------------+-----+//|row_id| val|order|//+------+------------------+-----+//| 1| 40000.0| 1|//| 1| 0| 2|//| 1| 0| 3|//| 2| 0| 1|//| 2| 40000.0| 2|//| 3| 0| 1|//| 4| 1000.0| 1|//| 4| 0| 2|//| 4| 0| 3|//| 5|1333.3333333333333| 1|//| 5| 0| 2|//| 5| 0| 3|//| 5| 0| 4|//| 5| 0| 5|//| 6| 66666.66666666667| 1|//| 6| 0| 2|//| 6| 0| 3|//+------+------------------+-----+从Spark-2.4+我们可以使用array_remove函数来过滤""df.withColumn("id",monotonically_increasing_id()).withColumn("row_id",row_number().over(w)).selectExpr("row_id","posexplode(array_remove(split(values,'~'),'')) as (pos, val)").withColumn("order",col("pos") + 1).drop("pos").show()//+------+------------------+-----+//|row_id| val|order|//+------+------------------+-----+//| 1| 40000.0| 1|//| 1| 0| 2|//| 1| 0| 3|//| 2| 0| 1|//| 2| 40000.0| 2|//| 3| 0| 1|//| 4| 1000.0| 1|//| 4| 0| 2|//| 4| 0| 3|//| 5|1333.3333333333333| 1|//| 5| 0| 2|//| 5| 0| 3|//| 5| 0| 4|//| 5| 0| 5|//| 6| 66666.66666666667| 1|//| 6| 0| 2|//| 6| 0| 3|//+------+------------------+-----+ 这篇关于Spark 爆炸/posexplode 列值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
10-10 01:39