本文介绍了Pyspark:如何通过合并 spark 中的值来展平嵌套数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有 10000 个具有不同 id 的 json,每个都有 10000 个名称.如何通过在 pyspark 中通过 int 或 str 合并值来展平嵌套数组?
I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark?
我添加了列 name_10000_xvz
来解释更好的数据结构.我也更新了注释、输入 df、所需的输出 df 和输入的 json 文件.
I have added column name_10000_xvz
to explain better data structure. I have updated Notes, Input df, required output df and input json files as well.
注意事项:
- 输入数据帧有超过 10000 列 name_1_a、name_1000_xx,因此列(数组)名称不能硬编码,因为它需要写入 10000 个名称
id
、date
、val
在所有列和所有 json 中始终具有相同的命名约定- 数组大小可能会有所不同,但
date
、val
始终存在,因此可以对其进行硬编码 date
在每个数组中可以不同,例如 name_1_a 从 2001 开始,但是 name_10000_xvz for id == 1 从 2000 开始,finnish 从 2004 开始,但是对于 id == 2 开始于 1990 和以 2004 年结束
- Input dataframe has more than 10000 columns name_1_a, name_1000_xx so column(array) names can not be hardcoded as it will requires to write 10000 names
id
,date
,val
has always the same naming convention across all columns and all jsons- array size can vary but
date
,val
are always there so they can be hardcoded date
can be different in each array, for example name_1_a starts with 2001, but name_10000_xvz for id == 1 starts with 2000 and finnish with 2004, however for id == 2 starts with 1990 and finish with 2004
输入df:
root
|-- id: long (nullable = true)
|-- name_10000_xvz: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_1_a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_1_b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_2_a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|id |name_10000_xvz |name_1_a |name_1_b |name_2_a |
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|2 |[{1990, 39}, {2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}, {2004, 34}]|[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
|1 |[{2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}] |[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
所需的输出 df:
+---+---------+----------+-----------+---------+----------------+
|id | date | name_1_a | name_1_b |name_2_a | name_10000_xvz |
+---+---------+----------+-----------+---------+----------------+
|1 | 2000 | 0 | 0 | 0 | 30 |
|1 | 2001 | 1 | 4 | 21 | 31 |
|1 | 2002 | 2 | 5 | 22 | 32 |
|1 | 2003 | 3 | 6 | 23 | 33 |
|2 | 1990 | 0 | 0 | 0 | 39 |
|2 | 2000 | 0 | 0 | 0 | 30 |
|2 | 2001 | 1 | 4 | 21 | 31 |
|2 | 2002 | 2 | 5 | 22 | 32 |
|2 | 2003 | 3 | 6 | 23 | 33 |
|2 | 2004 | 0 | 0 | 0 | 34 |
+---+---------+----------+-----------+---------+----------------+
要重现输入 df:
df = spark.read.json(sc.parallelize([
"""{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
"""{"id":2,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))
有用的链接:
- 如何展平数据框在 PySpark 中使用动态嵌套结构/数组
- https://docs.databricks.com/_static/notebooks/higher-order-functions.html
推荐答案
UPDATE
正如 @werner 所提到的,有必要转换所有结构以将列名附加到其中.
UPDATE
As @werner has mentioned, it's necessary to transform all structs to append the column name into it.
import pyspark.sql.functions as f
names = [column for column in df.columns if column.startswith('name_')]
expressions = []
for name in names:
expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))
flatten_df = (df
.withColumn('flatten', f.flatten(f.array(*expressions)))
.selectExpr('id', 'inline(flatten)'))
output_df = (flatten_df
.groupBy('id', 'date')
.pivot('name', names)
.agg(f.first('val')))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1 |2000|30 |null |null |null |
|1 |2001|31 |1 |4 |21 |
|1 |2002|32 |2 |5 |22 |
|1 |2003|33 |3 |6 |23 |
|2 |1990|39 |null |null |null |
|2 |2000|30 |null |null |null |
|2 |2001|31 |1 |4 |21 |
|2 |2002|32 |2 |5 |22 |
|2 |2003|33 |3 |6 |23 |
|2 |2004|34 |null |null |null |
+---+----+--------------+--------+--------+--------+
旧
假设:
date
值始终是所有列的相同值name_1_a, name_1_b, name_2_a
它们的大小相等
date
value is always the same value all columnsname_1_a, name_1_b, name_2_a
their sizes are equals
import pyspark.sql.functions as f
output_df = (df
.withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' \
'STRUCT(name_1_a[i].date AS date, ' \
' name_1_a[i].val AS name_1_a, ' \
' name_1_b[i].val AS name_1_b, ' \
' name_2_a[i].val AS name_2_a))'))
.selectExpr('id', 'inline(flatten)'))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1 |2001|1 |4 |21 |
|1 |2002|2 |5 |22 |
|1 |2003|3 |6 |23 |
|2 |2001|1 |4 |21 |
|2 |2002|2 |5 |22 |
|2 |2003|3 |6 |23 |
+---+----+--------+--------+--------+
这篇关于Pyspark:如何通过合并 spark 中的值来展平嵌套数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!