本文介绍了是否可以在不包括其他列的值的情况下分解一列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的方案中,我分解一个数组列,以便每行有一条记录,这样我就可以执行联接,然后将这些分解的列重新组合在一起

+--------------+-------+------------------------+
|     body     |  ID   |     array_column       |
+--------------+-------+------------------------+
| (large data) | guid1 |     (entry1,entry2)    |
+--------------+-------+------------------------+
| (large data) | guid2 | (entry3,entry4,entry5) |
+--------------+-------+------------------------+

->;

+--------------+-------+-----------------+
|     body     |  ID   |  array_column   |
+--------------+-------+-----------------+
| (large data) | guid1 |      entry1     |
+--------------+-------+-----------------+
|     null     | guid1 |      entry2     |
+--------------+-------+-----------------+
| (large data) | guid2 |      entry3     |
+--------------+-------+-----------------+
|     null     | guid2 |      entry4     |
+--------------+-------+-----------------+
|     null     | guid2 |      entry5     |
+--------------+-------+-----------------+

->;

+--------------+-------+---------------------------------------------------+
|     body     |  ID   |                   array_column                    |
+--------------+-------+---------------------------------------------------+
| (large data) | guid1 |            (entry1_enriched,entry2_enriched)      |
+--------------+-------+---------------------------------------------------+
| (large data) | guid2 | (entry3_enriched,entry4_enriched,entry5_enriched) |
+--------------+-------+---------------------------------------------------+
请注意,在分解之后,主体只存在于其中一个分解中,否则它将填充空值。这就是我希望发生的事情。现在,它被每一行中的大主体填满了,这给我们带来了内存问题。我曾考虑拆分表、删除列,并在稍后将其与自身重新联接,但因为我是在对流数据进行操作,所以这不是一个真正的选择。

附注:我不关心重新联接(此部分正在工作),除非有一种简单的方法可以用哑值填充某些分解行的列,以最大限度地减少空间消耗

推荐答案

RDD.flatMap可以使用。函数(在下面的代码中称为flatten)从输入数据中获取一行,然后在结果行列表上返回迭代器。在此列表中,第一行包含large data,而对于所有其他行,此列设置为None

df= ...

def flatten(row):
    body=row[0]
    id=row[1]
    entries=row[2]
    return iter([(body, id, entries[0])] + [(None, id, e) for e in entries[1:]])

flattend_rdd=df.rdd.flatMap(flatten)

spark.createDataFrame(flattend_rdd, df.columns).show()

输出:

+--------------+-----+------------+
|          body|   ID|array_column|
+--------------+-----+------------+
|(large data 1)|guid1|      entry1|
|          null|guid1|      entry2|
|(large data 2)|guid2|      entry3|
|          null|guid2|      entry4|
|          null|guid2|      entry5|
+--------------+-----+------------+

这篇关于是否可以在不包括其他列的值的情况下分解一列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-20 15:08