我有一个源自Google Analytics(分析)的Spark数据框,如下所示:

id     customDimensions (Array<Struct>)
100    [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101    [ {"index": 1, "value": "Mars" }]


我还有一个“自定义维度元数据”数据框,如下所示:

index   name
1       planet
2       continent


我将使用元数据df中的索引,以将自定义维度扩展为列。结果应如下所示:

id     planet     continent
100    Earth      Europe
101    Mars       null




我尝试了以下方法,并且效果很好,但是效果极差。我想知道是否有更好的方法。

# Select the two relevant columns
cd = df.select('id', 'customDimensions')

# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
         .join(metadata, cd.index == metadata.index, 'left')
         .drop(metadata.index))

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))

# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)




假设:


最多有250个自定义维度索引,并且仅通过元数据数据框知道名称
原始数据框还有其他几列我要维护(因此在解决方案末尾加入了)

最佳答案

联接是非常昂贵的操作,因为它会导致数据混排。如果可以,则应避免使用它或对其进行优化。

您的代码中有两个联接。可以完全避免最后一次联接使列退回。可以优化与元数据数据帧的其他联接。由于元数据df只有250行且非常多,因此可以在联接中使用broadcast()提示。这将避免改组较大的数据帧。

我进行了一些建议的代码更改,但由于没有您的数据,因此未经过测试。

# df columns list
df_columns = df.columns

# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))


return piv

关于apache-spark - 将结构数组扩展为PySpark中的列,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53670923/

10-11 05:12