嗨,我正在处理一种稍微困难的文件格式,我正在尝试对其进行清理以备将来处理。我一直在使用 Pyspark 将数据处理成数据帧。
该文件类似于以下内容:
AA 1234 ZXYW
BB A 890
CC B 321
AA 1234 LMNO
BB D 123
CC E 321
AA 1234 ZXYW
CC E 456
每个“AA”记录定义了一个或多个逻辑组的开始,每行上的数据都是固定长度的,其中包含我想要提取的编码信息。至少有 20-30 种不同的记录类型。它们总是在每行的开头用两个字母的代码标识。每个组中可以有 1 个或多个不同的记录类型(即并非每个组都存在所有记录类型)
作为第一阶段,我设法以这种格式将记录组合在一起:
+----------------+---------------------------------+
| index| result|
+----------------+---------------------------------+
| 1|[AA 1234 ZXYV,BB A 890,CC B 321]|
| 2|[AA 1234 LMNO,BB D 123,CC E 321]|
| 3|[AA 1234 ZXYV,CC B 321] |
+----------------+---------------------------------+
作为第二阶段,我真的想将数据放入数据框中的以下列:
+----------------+---------------------------------+-------------+--------+--------+
| index| result| AA| BB| CC|
+----------------+---------------------------------+-------------+--------+--------+
| 1|[AA 1234 ZXYV,BB A 890,CC B 321]|AA 1234 ZXYV|BB A 890|CC B 321|
| 2|[AA 1234 LMNO,BB D 123,CC E 321]|AA 1234 LMNO|BB D 123|CC E 321|
| 3|[AA 1234 ZXYV,CC B 321] |AA 1234 ZXYV| Null|CC B 321|
+----------------+---------------------------------+-------------+--------+--------+
因为在那时提取我需要的信息应该是微不足道的。
有没有人对我如何做到这一点有任何建议?
非常感谢。
最佳答案
您可以使用 flatMap
和 pivot
来实现这一点。从第一阶段的结果开始:
rdd = sc.parallelize([(1,['AA 1234 ZXYV','BB A 890','CC B 321']),
(2,['AA 1234 LMNO','BB D 123','CC E 321']),
(3,['AA 1234 ZXYV','CC B 321'])])
df = rdd.toDF(['index', 'result'])
您可以先使用
flatMap
将数组分解为多行,然后将两个字母的标识符提取到单独的列中。df_flattened = df.rdd.flatMap(lambda x: [(x[0],y, y[0:2],y[3::]) for y in x[1]])\
.toDF(['index','result', 'identifier','identifiertype'])
并使用
pivot
将两个字母标识符更改为列名:df_result = df_flattened.groupby(df_flattened.index,)\
.pivot("identifier")\
.agg(first("identifiertype"))\
.join(df,'index')
我添加了连接以获取
result
列关于python - PySpark 根据名称将列表分解为多列,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47552045/