问题描述
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)
或
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)
使用以上两种代码之一,我试图在我的RDD中制作8个分区{其中,我希望将数据均匀地分布在所有分区上} .当我打印 {rdd.getNumPartitions()} 时,显示的分区数仅为8,但是在 Spark UI 上,我观察到虽然创建了8个分区,但全部二进制文件数据全部放在一个分区上.
Using either of the above codes, I am trying to make 8 partitions in my RDD {wherein, I want the data to be distributed evenly on all the partitions}. When I am printing {rdd.getNumPartitions()}, the number of partitions shown are 8 only, but on Spark UI, I have observed that though 8 partitions are made but the all the whole binary file data is put on only one partition.
注意: minPartition 属性不起作用.即使将minPartitions设置为5,在RDD中创建的分区数也仅为1.因此,使用了partitionBy/repartition函数.
Note: minPartition attribute is not working. Even after setting minPartitions = 5,the number of partitions made in RDD is 1 only. Thus, used partitionBy/repartition Functions.
这是所期望的行为还是我错过了某些东西?
Is this is the desired behaviour or Am I missing something?
推荐答案
Spark 2.4 + ,该问题应已解决,请参见此答案下方的@Rahul评论.
Spark 2.4+, the problem should be fixed, see @Rahul's comment below this answer.
Spark 2.1-2.3 ,binaryFiles()
的minPartitions
参数将被忽略.参见 Spark-16575 和提交对setMinPartitions()函数的更改..请注意,提交中的更改更改了函数中不再使用minPartitions
的方式!
Spark 2.1-2.3, the minPartitions
argument of binaryFiles()
is ignored. See Spark-16575 and the commit changes to function setMinPartitions(). Notice in the commit changes how minPartitions
isn't used anymore in the function!
如果使用binaryFiles()
读取多个二进制文件,则输入文件将基于以下内容合并为分区:
If you are reading multiple binary files with binaryFiles()
, the input files will be coalesced into partitions based on the following:
-
spark.files.maxPartitionBytes
,默认为128 MB -
spark.files.openCostInBytes
,默认4 MB -
spark.default.parallelism
- 您输入的总大小
spark.files.maxPartitionBytes
, default 128 MBspark.files.openCostInBytes
, default 4 MBspark.default.parallelism
- the total size of your input
在此处中描述了前三个配置项.请参阅上面的提交更改以查看实际计算.
The first three config items are described here. See the commit change above to see the actual calculation.
我有一个场景,我希望每个输入分区最多40 MB,因此每个任务40 MB ...以在解析时增加并行度. (Spark将128 MB放入每个分区,这降低了我的应用程序的速度.)在调用binaryFiles()
之前,我将spark.files.maxPartitionBytes
设置为40 M:
I had a scenario where I wanted a max of 40 MB per input partition, hence 40 MB per task... to increase parallelism while parsing. (Spark was putting 128 MB into each partition, slowing down my app.) I set spark.files.maxPartitionBytes
to 40 M before calling binaryFiles()
:
spark = SparkSession \
.builder \
.config("spark.files.maxPartitionBytes", 40*1024*1024)
对于仅一个输入文件,@ user9864979的答案是正确的:仅使用binaryFiles()
不能将单个文件拆分为多个分区.
For only one input file, @user9864979's answer is correct: a single file cannot be split into multiple partitions using just binaryFiles()
.
使用 Spark 1.6 读取多个文件时,minPartitions
参数确实有效,您必须使用它.否则,您将遇到 Spark-16575 问题:您所有的输入文件将仅被读取到两个分区中!
When reading multiple files with Spark 1.6, the minPartitions
argument does work, and you have to use it. If you don't, you'll experience the Spark-16575 problem: all of your input files will be read into only two partitions!
您会发现,Spark通常会为您提供比您要求的更少的输入分区.我有一种情况,我希望每两个输入二进制文件有一个输入分区.我发现将minPartitions
设置为输入文件数* 7/10"可以大致满足我的需求.
我有另一种情况,我希望每个输入文件都具有一个输入分区.我发现将minPartitions
设置为输入文件数* 2"可以满足我的需求.
You will find that Spark will normally give you fewer input partitions than you request. I had a scenario where I wanted one input partition for every two input binary files. I found that setting minPartitions
to "the # of input files * 7 / 10" gave me roughly what I wanted.
I had another scenario where I wanted one input partition for each input file. I found that setting minPartitions
to "the # of input files * 2" gave me what I wanted.
Spark 1.5 行为:您为每个输入文件获得一个分区.
Spark 1.5 behavior of binaryFiles()
: you get one partition for each input file.
这篇关于PySpark:使用binaryFiles()函数读取二进制文件时进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!