我有一个源自Google Analytics(分析)的Spark数据框,如下所示:
id customDimensions (Array<Struct>)
100 [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101 [ {"index": 1, "value": "Mars" }]
我还有一个“自定义维度元数据”数据框,如下所示:
index name
1 planet
2 continent
我将使用元数据df中的索引,以将自定义维度扩展为列。结果应如下所示:
id planet continent
100 Earth Europe
101 Mars null
我尝试了以下方法,并且效果很好,但是效果极差。我想知道是否有更好的方法。
# Select the two relevant columns
cd = df.select('id', 'customDimensions')
# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))
# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')
# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
.join(metadata, cd.index == metadata.index, 'left')
.drop(metadata.index))
# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))
# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)
假设:
最多有250个自定义维度索引,并且仅通过元数据数据框知道名称
原始数据框还有其他几列我要维护(因此在解决方案末尾加入了)
最佳答案
联接是非常昂贵的操作,因为它会导致数据混排。如果可以,则应避免使用它或对其进行优化。
您的代码中有两个联接。可以完全避免最后一次联接使列退回。可以优化与元数据数据帧的其他联接。由于元数据df只有250行且非常多,因此可以在联接中使用broadcast()
提示。这将避免改组较大的数据帧。
我进行了一些建议的代码更改,但由于没有您的数据,因此未经过测试。
# df columns list
df_columns = df.columns
# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))
# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')
# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')
# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))
return piv
关于apache-spark - 将结构数组扩展为PySpark中的列,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53670923/