本文介绍了Pyspark 在具有数百万条记录的 2 个数据帧之间交叉连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 2 个数据框 A(3500 万条记录)和 B(30000 条记录)

I have 2 dataframes A(35 Million records) and B(30000 records)

A

|Text |
-------
| pqr  |
-------
| xyz  |
-------

B

|Title |
-------
| a  |
-------
| b  |
-------
| c  |
-------

下面的数据帧 C 是在 A 和 B 之间交叉连接后获得的.

Below dataframe C is obtained after a crossjoin between A and B.

c = A.crossJoin(B, on = [A.text == B.Title)

C

|text | Title |
---------------
| pqr  | a    |
---------------
| pqr  | b    |
---------------
| pqr  | c    |
---------------
| xyz  | a    |
---------------
| xyz  | b    |
---------------
| xyz  | c    |
---------------

以上两列都是字符串类型.

Both the columns above are of type String.

我正在执行以下操作并导致 Spark 错误(作业因阶段失败而中止)

I am performing the below operation and it results in an Spark error(Job aborted due to stage failure)

display(c.withColumn("Contains", when(col('text').contains(col('Title')), 1).otherwise(0)).filter(col('Contains') == 0).distinct())

有关如何进行此连接以避免在结果操作中出现 Spark error() 的任何建议?

Any suggestions on how this join needs to be done to avoid the Spark error() on the resulting operations?

Spark 错误信息

推荐答案

尝试使用 broadcast 加入

from pyspark.sql.functions import broadcast
c = functions.broadcast(A).crossJoin(B)

如果您不需要额外的包含"列,您可以将其过滤为

If you don't need and extra column "Contains" column thne you can just filter it as

display(c.filter(col("text").contains(col("Title"))).distinct())

这篇关于Pyspark 在具有数百万条记录的 2 个数据帧之间交叉连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:46