我有一个类似于下面的数据框

new_df = spark.createDataFrame([
([{'product_code': '12', 'color': 'red'}, {'product_code': '212', 'color': 'white'}], 7),
([{'product_code': '1112', 'color': 'black'}], 8),
([{'product_code': '212', 'color': 'blue'}], 3)
], ["items", "frequency"])

我需要创建一个类似于下面的数据框架,以便可以轻松地保存到csv中:(相同列表数据的规则编号相同)
+-------------------------------------------
# |rule            | product_code |color      |
# +-------------------------------------------
# |1               | 12           |       red  |
# |1               | 212          |       white|
# |2               | 1122         |       black|
# |3               | 212          |       blue |
# +--------------------------------------------

最佳答案

您可以添加addmonotonically_increasing_id作为标识符,然后添加:

from pyspark.sql.functions import explode, monotonically_increasing_id, col

(new_df
    .withColumn("rule", monotonically_increasing_id())
    .withColumn("items", explode("items"))
    .select(
        "rule",
        col("items")["product_code"].alias("product_code"),
        col("items")["color"].alias("color"))
    .show())

# +-----------+------------+-----+
# |       rule|product_code|color|
# +-----------+------------+-----+
# | 8589934592|          12|  red|
# | 8589934592|         212|white|
# |17179869184|        1112|black|
# |25769803776|         212| blue|
# +-----------+------------+-----+

使用explode可以获得连续的id,但需要在Python RDD之间进行昂贵的转换。

10-07 13:31