我的spark应用程序使用自定义hadoop输入格式处理文件(平均大小为20 MB),并将结果存储在HDFS中。
以下是代码段。
Configuration conf = new Configuration();
JavaPairRDD<Text, Text> baseRDD = ctx
.newAPIHadoopFile(input, CustomInputFormat.class,Text.class, Text.class, conf);
JavaRDD<myClass> mapPartitionsRDD = baseRDD
.mapPartitions(new FlatMapFunction<Iterator<Tuple2<Text, Text>>, myClass>() {
//my logic goes here
}
//few more translformations
result.saveAsTextFile(path);
该应用程序为每个文件创建1个任务/分区,并处理并将相应的零件文件存储在HDFS中。
即,对于10,000个输入文件,创建10,000个任务,并将10,000个零件文件存储在HDFS中。
baseRDD上的mapPartitions和map操作都为每个文件创建1个任务。
所以问题
How to set the number of partitions for newAPIHadoopFile?
建议设置
conf.setInt("mapred.max.split.size", 4);
,用于配置任何分区。但是,设置此参数后,将最大程度地利用CPU,即使经过很长时间也不会启动任何阶段。
如果我没有设置此参数,那么应用程序将如上所述成功完成。
如何使用newAPIHadoopFile设置分区数并提高效率?
mapred.max.split.size 选项会怎样?
============
更新:
mapred.max.split.size 选项会怎样?
在我的用例中,文件大小很小,因此此处更改拆分大小选项无关紧要。
这样的更多信息:Behavior of the parameter "mapred.min.split.size" in HDFS
最佳答案
只需使用baseRDD.repartition(<a sane amount>).mapPartitions(...)
即可。这会将结果操作移至更少的分区,尤其是在文件较小的情况下。