问题描述
给出一个数据框:
+---+-----------+---------+-------+------------+
| id| score|tx_amount|isValid| greeting|
+---+-----------+---------+-------+------------+
| 1| 0.2| 23.78| true| hello_world|
| 2| 0.6| 12.41| false|byebye_world|
+---+-----------+---------+-------+------------+
我想使用输入数据框的类型将这些列分解为名为"col_value"的行.
I want to explode these columns into a Row named "col_value" using the types of the input Dataframe.
df.dtypes
[('id', 'int'), ('model_score', 'double'), ('tx_amount', 'double'), ('isValid', 'boolean'), ('greeting', 'string')]
预期输出:
+---+------------+--------+---------+----------+-------+---------+
| id| col_value|is_score|is_amount|is_boolean|is_text|col_name |
+---+------------+--------+---------+----------+-------+---------+
| 1| 0.2| Y| N| N| N|score |
| 1| 23.78| N| Y| N| N|tx_amount|
| 1| true| N| N| Y| N|isValid |
| 1| hello_world| N| N| N| Y|greeting |
| 2| 0.6| Y| N| N| N|score |
| 2| 12.41| N| Y| N| N|tx_amount|
| 2| false| N| N| Y| N|isValid |
| 2|byebye_world| N| N| N| Y|greeting |
+---+------------+--------+---------+----------+-------+---------+
到目前为止我所拥有的:
What I have so far:
df.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) \
.select("id", F.col("cols.*")) \
...
但是当我尝试压缩要在爆炸中使用的cols时,它给出了有关类型的错误:
But it gives an error about types when I try to zip the cols to use in the explode:
pyspark.sql.utils.AnalysisException: "cannot resolve 'array(`id`, `model_score`, `tx_amount`, `isValid`, `greeting`)' due to data type mismatch: input to function array should all be the same type, but it's [int, double, double, boolean, string]
当输入列的类型可以全部不同时,该怎么办?
How can I do this when the types of the input columns can be all different?
推荐答案
示例数据框:
Sample DataFrame:
df.show()
df.printSchema()
+---+-----------+---------+-------+------------+
| id|model_score|tx_amount|isValid| greeting|
+---+-----------+---------+-------+------------+
| 1| 0.2| 23.78| true| hello_world|
| 2| 0.6| 12.41| false|byebye_world|
+---+-----------+---------+-------+------------+
root
|-- id: integer (nullable = true)
|-- model_score: double (nullable = true)
|-- tx_amount: double (nullable = true)
|-- isValid: boolean (nullable = true)
|-- greeting: string (nullable = true)
我试图使它对列的任何输入保持动态.它将采用 df.dtypes [1:]
中的类型,因为 id
中不包含 col_value
,这就是为什么 跳过(1:)
的原因. Array
在其中仅接受 相同类型
,这就是为什么我们将所有列转换为字符串的原因,然后再应用逻辑.我认为它应该适合您的用例.您可以从此处构建 Y/N
列.
I tried to keep it dynamic for any input of columns. It will take type from df.dtypes[1:]
because id
is not included in col_value
that is why skipping it(1:)
. Array
only accepts same type
in it, thats why we will convert all cols to string before applying the logic. I think it should work for your use case. You can build your Y/N
cols from here.
df.select([F.col(c).cast("string") for c in df.columns])\
.withColumn("cols", F.explode(F.arrays_zip(F.array([F.array(x[0],F.lit(x[1]),F.lit(x[0]))\
for x in df.dtypes[1:]]))))\
.select("id", F.col("cols.*")).withColumn("col_value", F.element_at("0",1))\
.withColumn("col_type", F.element_at("0",2))\
.withColumn("col_name", F.element_at("0",3)).drop("0").show()
+---+------------+--------+-----------+
| id| col_value|col_type| col_name|
+---+------------+--------+-----------+
| 1| 0.2| double|model_score|
| 1| 23.78| double| tx_amount|
| 1| true| boolean| isValid|
| 1| hello_world| string| greeting|
| 2| 0.6| double|model_score|
| 2| 12.41| double| tx_amount|
| 2| false| boolean| isValid|
| 2|byebye_world| string| greeting|
+---+------------+--------+-----------+
这篇关于PySpark-根据列的类型将列分解为行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!