我有Spark Streaming作业,该作业从kafka分区(one executor per partition)中读取数据。
我需要将转换后的值保存到HDFS,但需要避免创建空文件。
我尝试使用isEmpty,但这在并非所有分区都为空的情况下无济于事。
附言由于性能下降,重新分配是 Not Acceptable 解决方案。
最佳答案
该代码仅适用于PairRDD。
文字代码:
val conf = ssc.sparkContext.hadoopConfiguration
conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
classOf[TextOutputFormat[Text, NullWritable]]
classOf[OutputFormat[Text, NullWritable]])
kafkaRdd.map(_.value -> NullWritable.get)
.saveAsNewAPIHadoopFile(basePath,
classOf[Text],
classOf[NullWritable],
classOf[LazyOutputFormat[Text, NullWritable]],
conf)
avro的代码:
val avro: RDD[(AvroKey[MyEvent], NullWritable)]) = ....
val conf = ssc.sparkContext.hadoopConfiguration
conf.set("avro.schema.output.key", MyEvent.SCHEMA$.toString)
conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
classOf[AvroKeyOutputFormat[MyEvent]],
classOf[OutputFormat[AvroKey[MyEvent], NullWritable]])
avro.saveAsNewAPIHadoopFile(basePath,
classOf[AvroKey[MyEvent]],
classOf[NullWritable],
classOf[LazyOutputFormat[AvroKey[MyEvent], NullWritable]],
conf)