本文介绍了广播,而在星火1.6加入dataframes没有发生的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我在运行示例code。这个火花作业运行时,数据框加入使用sortmergejoin代替broadcastjoin正在发生。

Below is the sample code that I am running. when this spark job runs, Dataframe joins are happening using sortmergejoin instead of broadcastjoin.

def joinedDf (sqlContext: SQLContext,
              txnTable:   DataFrame,
              countriesDfBroadcast: Broadcast[DataFrame]): 
              DataFrame = {
                    txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
                    $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
              }
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")  

在我指定的连接语句广播()暗示该broadcastjoin没有发生偶数。

The broadcastjoin is not happening even when I specify a broadcast() hint in the join statement.

优化器hashpartitioning数据框,它是造成数据扭曲。

The optimizer is hashpartitioning the dataframe and it is causing data skew.

有没有人见过这种行为?

Has anyone seen this behavior?

我正在使用的纱线星火1.6和HiveContext作为SQLContext运行此。火花作业200执行人运行。和txnTable的数据大小是240GB和countriesDf的数据大小是5MB

I am running this on yarn using Spark 1.6 and HiveContext as SQLContext. The spark job runs on 200 executors. and the data size of the txnTable is 240GB and the datasize of countriesDf is 5mb.

推荐答案

双方你怎么播数据帧,以及如何访问它是不正确的方式。

Both the way how you broadcast DataFrame and how you access it are incorrect.


  • 标准的广播不能被用于处理分布式的数据结构。如果要执行对数据帧你应该使用这标志着给 广播功能广播加盟数据帧广播:

  • Standard broadcasts cannot be used to handle distributed data structures. If you want to perform broadcast join on a DataFrame you should use broadcast functions which marks given DataFrame for broadcasting:

import org.apache.spark.sql.functions.broadcast

val countriesDf: DataFrame = ???
val tmp: DataFrame = broadcast(
  countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
) 

txnTable.as("df1").join(
  broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")

在内部将收集 TMP 不受内部和事后广播转换。

Internally it will collect tmp without converting from internal and broadcast afterwards.

加入参数急切地评估。甚至有可能使用 SparkContext.broadcast 分布式数据结构广播值之前本地计算加入被调用。这就是'为什么你的功能在所有的工作,但不执行广播加盟。

join arguments are eagerly evaluated. Even it was possible to use SparkContext.broadcast with distributed data structure broadcast value is evaluated locally before join is called. Thats' why your function work at all but doesn't perform broadcast join.

这篇关于广播,而在星火1.6加入dataframes没有发生的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 00:51