本文介绍了200个默认分区的spark.sql.shuffle.partitions难题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于帖子改组,分区,JOIN,AGGR等原因,在许多帖子中都有这样的声明-如下所示,以某种形式显示:

In many posts there is the statement - as shown below in some form or another - due to some question on shuffling, partitioning, due to JOIN, AGGR, whatever, etc.:

...通常,每当您进行Spark sql聚合或将数据进行混洗的联接时,这就是结果分区的数量= 200.这是由spark.sql.shuffle.partitions设置的. ...

... In general whenever you do a spark sql aggregation or join which shuffles data this is the number of resulting partitions = 200.This is set by spark.sql.shuffle.partitions. ...

所以,我的问题是:

  • 我们的意思是说,例如,如果我们为DF设置了分区765,
    • 该处理针对765个分区进行,但是输出合并/标准地重新分区为200-在这里指单词 resulting ?
    • 还是在合并,AGGR之前将200个分区合并/重新分区为200个分区后进行处理?
    • Do we mean that if we have set partitioning at 765 for a DF, for example,
      • That the processing occurs against 765 partitions, but that the output is coalesced / re-partitioned standardly to 200 - referring here to word resulting?
      • Or does it do the processing using 200 partitions after coalescing / re-partitioning to 200 partitions before JOINing, AGGR?

      我问,因为我从未见过清晰的观点.

      I ask as I never see a clear viewpoint.

      我做了以下测试:

      // genned ad DS of some 20M short rows
      df0.count
      val ds1 = df0.repartition(765)
      ds1.count
      val ds2 = df0.repartition(765)
      ds2.count
      
      sqlContext.setConf("spark.sql.shuffle.partitions", "765")
      // The above not included on 1st run, the above included on 2nd run.
      
      ds1.rdd.partitions.size
      ds2.rdd.partitions.size
      
      val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer") 
      joined.rdd.partitions.size
      joined.count
      joined.rdd.partitions.size
      

      在第一次测试中-未定义 sqlContext.setConf("spark.sql.shuffle.partitions","765"),得到的是处理分区和num分区200.即使SO post 45704156声明它可能不适用于DF,也就是DS.

      On the 1st test - not defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 200. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

      在第二项测试中-定义 sqlContext.setConf("spark.sql.shuffle.partitions","765"),得到的处理分区和num分区为765即使SO post 45704156声明它可能不适用于DF-这是DS.

      On the 2nd test - defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 765. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

      推荐答案

      Spark.sql.shuffle.partitions是一个参数,它决定进行像连接或聚合这样的混洗时分区的数量,即节点之间的数据移动情况.另一部分spark.default.parallelism将根据您的数据大小和最大块大小进行计算,在HDFS中为128mb.因此,如果您的作业没有进行任何改组,它将考虑默认的并行度值,或者如果您使用的是rdd,则可以自行设置.改组发生时将需要200.

      Spark.sql.shuffle.partitions is the parameter which decides the number of partitions while doing shuffles like joins or aggregation i.e where data movement is there across the nodes. The other part spark.default.parallelism will be calculated on basis of your data size and max block size, in HDFS it’s 128mb. So if your job does not do any shuffle it will consider the default parallelism value or if you are using rdd you can set it by your own. While shuffling happens it will take 200.

      Val df = sc.parallelize(List(1,2,3,4,5),4).toDF()df.count()//这将使用4个分区

      Val df = sc.parallelize(List(1,2,3,4,5),4).toDF()df.count() // this will use 4 partitions

      Val df1 = dfdf1.except(df).count//将生成200个具有2个阶段的分区

      Val df1 = dfdf1.except(df).count // will generate 200 partitions having 2 stages

      这篇关于200个默认分区的spark.sql.shuffle.partitions难题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-17 06:56