本文介绍了在RDD方法/闭包中使用SparkContext hadoop配置,例如foreachPartition的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark读取一堆文件,对其进行详细说明,然后将所有文件另存为Sequence文件.我想要的是每个分区有1个序列文件,所以我这样做了:

I am using Spark to read a bunch of files, elaborating on them and then saving all of them as a Sequence file. What I wanted, was to have 1 sequence file per partition, so I did this:

SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
                .setMaster("local[2]")
                .set("spark.streaming.stopGracefullyOnShutdown", "true");
        final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
        //JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));

        JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

                @Override
                public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
                        throws Exception {
                  [°°°SOME STUFF°°°]
                  SequenceFile.Writer writer = SequenceFile.createWriter(
                                     jsc.hadoopConfiguration(),
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"

有人知道如何在RDD闭包内部 中使用Hadoop配置对象吗?

Does anybody know how to use the Hadoop Configuration Object inside the RDD closures?

推荐答案

看起来无法完成,所以这是我使用的代码:

Looks like it cannot be done, so here is the code I used:

final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080";

JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

                @Override
                public void call(Iterator<Tuple2<String, PortableDataStream>> arg0)
                        throws Exception {

                    Configuration conf = new Configuration();
                    conf.set("fs.defaultFS", hdfsNameNodePath);
                    //the string above should be passed as argument
SequenceFile.Writer writer = SequenceFile.createWriter(
                                     conf,
                                     SequenceFile.Writer.file([***ETCETERA...

这篇关于在RDD方法/闭包中使用SparkContext hadoop配置,例如foreachPartition的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 06:24