本文介绍了避免在Spark Streaming中为空分区写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有Spark Streaming作业,可从kafka分区读取数据(每个分区只有一个执行程序).
我需要将转换后的值保存到HDFS,但需要避免创建空文件.
我尝试使用isEmpty,但这在并非所有分区都为空的情况下无济于事.

I have Spark Streaming job which reads data from kafka partitions (one executor per partition).
I need to save transformed values to HDFS, but need to avoid empty files creation.
I tried to use isEmpty, but this doesn't help when not all partitions are empty.

P.S.由于性能下降,重新分配是不可接受的解决方案.

P.S. repartition is not an acceptable solution due to perfomance degradation.

推荐答案

该代码仅适用于PairRDD.

The code works for PairRDD only.

文本代码:

  val conf = ssc.sparkContext.hadoopConfiguration
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[TextOutputFormat[Text, NullWritable]]
    classOf[OutputFormat[Text, NullWritable]])

  kafkaRdd.map(_.value -> NullWritable.get)
    .saveAsNewAPIHadoopFile(basePath,
      classOf[Text],
      classOf[NullWritable],
      classOf[LazyOutputFormat[Text, NullWritable]],
      conf)

avro代码:

  val avro: RDD[(AvroKey[MyEvent], NullWritable)]) = ....
  val conf = ssc.sparkContext.hadoopConfiguration

  conf.set("avro.schema.output.key", MyEvent.SCHEMA$.toString)
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[AvroKeyOutputFormat[MyEvent]],
    classOf[OutputFormat[AvroKey[MyEvent], NullWritable]])

  avro.saveAsNewAPIHadoopFile(basePath,
    classOf[AvroKey[MyEvent]],
    classOf[NullWritable],
    classOf[LazyOutputFormat[AvroKey[MyEvent], NullWritable]],
    conf)

这篇关于避免在Spark Streaming中为空分区写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:49