本文介绍了Spark:通过在两个数据帧上添加行索引/数字来合并2个数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问:有没有办法合并两个数据框或将一个数据框的列复制到PySpark中的另一列?

Q: Is there is any way to merge two dataframes or copy a column of a dataframe to another in PySpark?

例如,我有两个数据框:

For example, I have two Dataframes:

DF1
C1                    C2
23397414             20875.7353
5213970              20497.5582
41323308             20935.7956
123276113            18884.0477
76456078             18389.9269

第二个数据帧

DF2
C3                       C4
2008-02-04               262.00
2008-02-05               257.25
2008-02-06               262.75
2008-02-07               237.00
2008-02-08               231.00

然后我要像这样将DF2的C3添加到DF1:

Then i want to add C3 of DF2 to DF1 like this:

New DF
    C1                    C2          C3
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08

我希望这个例子很清楚.

I hope this example was clear.

推荐答案

rownum +窗口函数,即解决方案1或zipWithIndex.map,即解决方案2在这种情况下应该有所帮助.

rownum + window function i.e solution 1 or zipWithIndex.map i.e solution 2 should help in this case.

然后我建议您将行号作为其他列名添加到Dataframe说df1.

Then I would suggest you to add rownumber as additional column name to Dataframe say df1.

  DF1
    C1                    C2                 columnindex
    23397414             20875.7353            1
    5213970              20497.5582            2
    41323308             20935.7956            3
    123276113            18884.0477            4
    76456078             18389.9269            5

第二个数据帧

DF2
C3                       C4             columnindex
2008-02-04               262.00            1
2008-02-05               257.25            2
2008-02-06               262.75            3
2008-02-07               237.00            4
2008-02-08               231.00            5

现在..进行df1和df2的内部联接,仅此而已... 您将获得低于产量的结果

Now .. do inner join of df1 and df2 that's all... you will get below ouput

类似的东西

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()

df1 = ....  // as showed above df1

df2 = ....  // as shown above df2


df11 =  df1.withColumn("columnindex", rowNumber().over(w))
  df22 =  df2.withColumn("columnindex", rowNumber().over(w))

newDF = df11.join(df22, df11.columnindex == df22.columnindex, 'inner').drop(df22.columnindex)
newDF.show()



New DF
    C1                    C2          C3
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08

解决方案2:在Scala中的另一种好方法(可能是最好的:)),您可以将其翻译为pyspark:

/**
* Add Column Index to dataframe
*/
def addColumnIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add Column index
  df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
  // Create schema
  StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)

// Add index now...
val df1WithIndex = addColumnIndex(df1)
val df2WithIndex = addColumnIndex(df2)

 // Now time to join ...
val newone = df1WithIndex
  .join(df2WithIndex , Seq("columnindex"))
  .drop("columnindex")

这篇关于Spark:通过在两个数据帧上添加行索引/数字来合并2个数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-24 17:37