问题描述
问:有没有办法合并两个数据框或将一个数据框的列复制到PySpark中的另一列?
Q: Is there is any way to merge two dataframes or copy a column of a dataframe to another in PySpark?
例如,我有两个数据框:
For example, I have two Dataframes:
DF1
C1 C2
23397414 20875.7353
5213970 20497.5582
41323308 20935.7956
123276113 18884.0477
76456078 18389.9269
第二个数据帧
DF2
C3 C4
2008-02-04 262.00
2008-02-05 257.25
2008-02-06 262.75
2008-02-07 237.00
2008-02-08 231.00
然后我要像这样将DF2的C3添加到DF1:
Then i want to add C3 of DF2 to DF1 like this:
New DF
C1 C2 C3
23397414 20875.7353 2008-02-04
5213970 20497.5582 2008-02-05
41323308 20935.7956 2008-02-06
123276113 18884.0477 2008-02-07
76456078 18389.9269 2008-02-08
我希望这个例子很清楚.
I hope this example was clear.
推荐答案
rownum +窗口函数,即解决方案1或zipWithIndex.map
,即解决方案2在这种情况下应该有所帮助.
rownum + window function i.e solution 1 or zipWithIndex.map
i.e solution 2 should help in this case.
然后我建议您将行号作为其他列名添加到Dataframe
说df1.
Then I would suggest you to add rownumber as additional column name to Dataframe
say df1.
DF1
C1 C2 columnindex
23397414 20875.7353 1
5213970 20497.5582 2
41323308 20935.7956 3
123276113 18884.0477 4
76456078 18389.9269 5
第二个数据帧
DF2
C3 C4 columnindex
2008-02-04 262.00 1
2008-02-05 257.25 2
2008-02-06 262.75 3
2008-02-07 237.00 4
2008-02-08 231.00 5
现在..进行df1和df2的内部联接,仅此而已... 您将获得低于产量的结果
Now .. do inner join of df1 and df2 that's all... you will get below ouput
类似的东西
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df1 = .... // as showed above df1
df2 = .... // as shown above df2
df11 = df1.withColumn("columnindex", rowNumber().over(w))
df22 = df2.withColumn("columnindex", rowNumber().over(w))
newDF = df11.join(df22, df11.columnindex == df22.columnindex, 'inner').drop(df22.columnindex)
newDF.show()
New DF
C1 C2 C3
23397414 20875.7353 2008-02-04
5213970 20497.5582 2008-02-05
41323308 20935.7956 2008-02-06
123276113 18884.0477 2008-02-07
76456078 18389.9269 2008-02-08
解决方案2:在Scala中的另一种好方法(可能是最好的:)),您可以将其翻译为pyspark:
/**
* Add Column Index to dataframe
*/
def addColumnIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add Column index
df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
// Create schema
StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)
// Add index now...
val df1WithIndex = addColumnIndex(df1)
val df2WithIndex = addColumnIndex(df2)
// Now time to join ...
val newone = df1WithIndex
.join(df2WithIndex , Seq("columnindex"))
.drop("columnindex")
这篇关于Spark:通过在两个数据帧上添加行索引/数字来合并2个数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!