问题描述
我有pyspark.rdd.PipelinedRDD (Rdd1)
.当我做Rdd1.collect()
时,它给出如下结果.
I have pyspark.rdd.PipelinedRDD (Rdd1)
.when I am doing Rdd1.collect()
,it is giving result like below.
[(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
(1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
(2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
(3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
现在我想在不使用collect()方法的情况下将pyspark.rdd.PipelinedRDD转换为数据框
Now I want to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method
我的最终数据帧应如下所示.df.show()应该如下:
My final data frame should be like below.df.show() should be like:
+----------+-------+-------------------+
|CId |IID |Score |
+----------+-------+-------------------+
|10 |4 |2.9996439803387602 |
|10 |5 |1.6767412921625855 |
|10 |3 |3.616726727464709 |
|1 |4 |-1.5271512313750577|
|1 |5 |1.9665475696370045 |
|1 |3 |2.016527311459324 |
|2 |4 |4.033642544526678 |
|2 |5 |3.1517805604906313 |
|2 |3 |6.230272144805092 |
|3 |4 |2.9757316477407443 |
|3 |5 |-1.5689126834176417|
|3 |3 |-0.3924680103722977|
+----------+-------+-------------------+
接下来我可以使用collect(),iteration和最后的Data frame来实现到rdd的转换.
I can achieve this converting to rdd next applying collect() ,iteration and finally Data frame.
但是现在我想在不使用任何collect()方法的情况下将pyspark.rdd.PipelinedRDD(RDD1)转换为数据帧.
but now I want to convert pyspark.rdd.PipelinedRDD (RDD1) to Data frame with out using any collect() method.
请让我知道如何实现这一目标?
please let me know how to achieve this?
推荐答案
您要在此处做两件事:1.整理数据2.放入数据框
You want to do two things here:1. flatten your data2. put it into a dataframe
一种方法如下:
首先,让我们拉平字典:
First, let us flatten the dictionary:
rdd2 = Rdd1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])
收集数据时,您会得到类似以下内容的信息:
When collecting the data, you get something like this:
[(10, (3, 3.616726727464709)), (10, (4, 2.9996439803387602)), ...
然后我们可以格式化数据并将其转换为数据框:
Then we can format the data and turn it into a dataframe:
rdd2.map(lambda x : (x[0], x[1][0], x[1][1]))\
.toDF(("CId", "IID", "Score"))\
.show()
为您提供了这一点:
+---+---+-------------------+
|CId|IID| Score|
+---+---+-------------------+
| 10| 3| 3.616726727464709|
| 10| 4| 2.9996439803387602|
| 10| 5| 1.6767412921625855|
| 1| 3| 2.016527311459324|
| 1| 4|-1.5271512313750577|
| 1| 5| 1.9665475696370045|
| 2| 3| 6.230272144805092|
| 2| 4| 4.033642544526678|
| 2| 5| 3.1517805604906313|
| 3| 3|-0.3924680103722977|
| 3| 4| 2.9757316477407443|
| 3| 5|-1.5689126834176417|
+---+---+-------------------+
这篇关于如何在不使用Pyspark中的collect()方法的情况下将pyspark.rdd.PipelinedRDD转换为数据框?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!