本文介绍了为什么我的BroadcastHashJoin是星火比ShuffledHashJoin慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我执行加入星火使用 javaHiveContext

大表是1,76Gb并拥有100百万的纪录。

The big table is 1,76Gb and has 100 millions record.

第二个表是273Mb,并具有1000万的纪录。

The second table is 273Mb and has 10 millions record.

我收到了 JavaSchemaRDD ,我叫计数()就可以了:

I get a JavaSchemaRDD and I call count() on it:

String query="select attribute7,count(*) from ft,dt where ft.chiavedt=dt.chiavedt group by attribute7";

JavaSchemaRDD rdd=sqlContext.sql(query);

System.out.println("count="+rdd.count());

如果我强迫一个 broadcastHashJoin(SET spark.sql.autoBroadcastJoinThreshold = 2.9亿) 5节点上使用5执行人与8核心和内存的20Gb它在100执行秒。
如果我不勉强广播它在30秒内执行。

If I force a broadcastHashJoin (SET spark.sql.autoBroadcastJoinThreshold=290000000) and use 5 executor on 5 node with 8 core and 20Gb of memory it is executed in 100 sec.If i don't force broadcast it is executed in 30 sec.

N.B。该表被存储为镶文件。

N.B. the tables are stored as Parquet file.

推荐答案

最有可能的问题的根源是广播的成本。为了让事情变得简单让我们假设你在较小的一个较大的RDD和300MB具有1800MB。假设5遗嘱执行人,并没有previous分区第五的所有数据都应该已经正确的机器上。它左派〜1700MB在标准情况下,加入洗牌。

Most likely the source of the problem is a cost of broadcasting. To make things simple lets assume that you have 1800MB in the larger RDD and 300MB in the smaller one. Assuming 5 executors and no previous partitioning a fifth of all data should be already on the correct machine. It lefts ~1700MB for shuffling in case of standard join.

有关广播加盟小RDD已被转移到所有节点。这意味着各地要传输的数据1500MB。如果添加与驱动程序所需的通信就意味着你要在一个更昂贵的方式​​移动数据的可比数额。广播的数据被收集首款也是唯一一款可以被转发到所有的工人了。

For broadcast join the smaller RDD has to be transfered to all nodes. It means around 1500MB data to be transfered. If you add required communication with driver it means you have to move a comparable amount of data in a much more expensive way. A broadcasted data has to be collected first and only after that can be forwarded to all the workers.

这篇关于为什么我的BroadcastHashJoin是星火比ShuffledHashJoin慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 15:24