本文介绍了访问DStream的集合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试访问已过滤的DStream的集合,如在此问题的解决方案中一样:

I am trying to access a collection of filtered DStreams obtained like in the solution to this question: Spark Streaming - Best way to Split Input Stream based on filter Param

我如下创建集合:

val statuCodes = Set("200","500", "404")
    spanTagStream.cache()
    val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))

我尝试通过以下方式访问statusCodeStreams:

I try to access statusCodeStreams in the following way:

for(streamTuple <- statusCodeStreams){
      streamTuple._2.foreachRDD(rdd =>
  rdd.foreachPartition(
      partitionOfRecords =>
        {
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String,String](props)

            partitionOfRecords.foreach
            {
                 x=>{
                 /* Code Writing to Kafka using streamTuple._1 as the topic-String */
                 }
            }
      })
   )
}

执行此操作时,出现以下错误:java.io.NotSerializableException:Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects

When executing this I receive the following error:java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects

我如何访问流以可序列化的方式写入Kafka?

How do I access the Streams to write to Kafka in a serializable way?

推荐答案

如异常所示,闭包正在捕获DStream定义.一个简单的选择是声明此DStream瞬态:

As the exception indicates, the DStream definition is being captured by the closure.A simple option is to declare this DStream transient:

@transient val spamTagStream = //KafkaUtils.create...

@transient标记要从某些对象的对象图的Java序列化中删除的某些对象.这种情况的关键是在闭包中使用了一些与DStream(在本例中为statusCodeStreams)相同范围的声明的val.从封包内部对该val的实际引用是outer.statusCodeStreams,从而导致序列化过程将outer的所有上下文拉"到该封包中.使用@transient,我们将DStream(以及StreamingContext)声明标记为不可序列化,并且避免了序列化问题.根据代码结构(如果在一个main函数中都是线性的(不好的做法,顺便说一句),可能有必要将 ALL DStream声明和StreamingContext实例标记为@transient.

@transient flags certain objects to be removed from the Java serialization of the object graph of some object. The key of this scenario is that some val declared in the same scope as the DStream (statusCodeStreams in this case) is used within the closure. The actual reference of that val from within the closure is outer.statusCodeStreams, causing that the serialization process to "pull" all context of outer into the closure. With @transient we mark the DStream (and also the StreamingContext) declarations as non-serializable and we avoid the serialization issue. Depending on the code structure (if it's all linear in one main function (bad practice, btw) it might be necessary to mark ALL DStream declarations + the StreamingContext instance as @transient.

如果初始过滤的唯一目的是路由"内容以分离Kafka主题,则可能有必要在foreachRDD中移动过滤.这样可以简化程序结构.

If the only intent of the initial filtering is to 'route' the content to separate Kafka topics, it might be worth moving the filtering within the foreachRDD. That would make for a simpler program structure.

spamTagStream.foreachRDD{ rdd =>
    rdd.cache()
    statuCodes.map{code =>
        val matchingCodes = rdd.filter(...)
        matchingCodes.foreachPartition{write to kafka}
    }
    rdd.unpersist(true)
}

这篇关于访问DStream的集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:49
查看更多