本文介绍了你如何使用 Python 在 Spark 中执行两个 RDD 表的基本连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您将如何使用 Python 在 Spark 中执行基本连接?在 R 中,您可以使用 merg() 来执行此操作.在 spark 上使用 python 的语法是什么:

How would you perform basic joins in Spark using python? In R you could use merg() to do this. What is the syntax using python on spark for:

  1. 内连接
  2. 左外连接
  3. 交叉连接

有两个表 (RDD),每个表中有一个具有公共键的列.

With two tables (RDD) with a single column in each that has a common key.

RDD(1):(key,U)
RDD(2):(key,V)

我认为内部联接是这样的:

I think an inner join is something like this:

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));

是吗?我在互联网上搜索过,但找不到一个很好的连接示例.提前致谢.

Is that right? I have searched the internet and can't find a good example of joins. Thanks in advance.

推荐答案

可以使用 PairRDDFunctions 或 Spark Data Frames 来完成.由于数据框操作受益于 Catalyst Optimizer 第二个选项值得考虑.

It can be done either using PairRDDFunctions or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizer the second option is worth considering.

假设您的数据如下所示:

Assuming your data looks as follows:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

使用 PairRDD:

内连接:

rdd1.join(rdd2)

左外连接:

rdd1.leftOuterJoin(rdd2)

笛卡尔积(不需要RDD[(T, U)]):

rdd1.cartesian(rdd2)

广播加入(不需要RDD[(T, U)]):

最后是 cogroup,它没有直接的 SQL 等价物,但在某些情况下很有用:

Finally there is cogroup which has no direct SQL equivalent but can be useful in some situations:

cogrouped = rdd1.cogroup(rdd2)

cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]

使用 Spark 数据帧

您可以使用 SQL DSL 或使用 sqlContext.sql 执行原始 SQL.

df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))

# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')

内连接:

# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')

左外连接:

df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')

交叉联接(Spark 中需要显式交叉联接或配置更改.2.0 - spark.sql.crossJoin.enabled for Spark 2.x):

Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):

df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')

从 1.6(Scala 中的 1.5)开始,每一个都可以与 broadcast 功能结合:

Since 1.6 (1.5 in Scala) each of these can be combined with broadcast function:

from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), df1.k == df2.k)

执行广播加入.另请参阅为什么我的 BroadcastHashJoin 比 Spark 中的 ShuffledHashJoin 慢

to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark

这篇关于你如何使用 Python 在 Spark 中执行两个 RDD 表的基本连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:32