我有一个类似于下面的数据框
new_df = spark.createDataFrame([
([{'product_code': '12', 'color': 'red'}, {'product_code': '212', 'color': 'white'}], 7),
([{'product_code': '1112', 'color': 'black'}], 8),
([{'product_code': '212', 'color': 'blue'}], 3)
], ["items", "frequency"])
我需要创建一个类似于下面的数据框架,以便可以轻松地保存到csv中:(相同列表数据的规则编号相同)
+-------------------------------------------
# |rule | product_code |color |
# +-------------------------------------------
# |1 | 12 | red |
# |1 | 212 | white|
# |2 | 1122 | black|
# |3 | 212 | blue |
# +--------------------------------------------
最佳答案
您可以添加addmonotonically_increasing_id
作为标识符,然后添加:
from pyspark.sql.functions import explode, monotonically_increasing_id, col
(new_df
.withColumn("rule", monotonically_increasing_id())
.withColumn("items", explode("items"))
.select(
"rule",
col("items")["product_code"].alias("product_code"),
col("items")["color"].alias("color"))
.show())
# +-----------+------------+-----+
# | rule|product_code|color|
# +-----------+------------+-----+
# | 8589934592| 12| red|
# | 8589934592| 212|white|
# |17179869184| 1112|black|
# |25769803776| 212| blue|
# +-----------+------------+-----+
使用
explode
可以获得连续的id,但需要在Python RDD之间进行昂贵的转换。