本文介绍了从 RDD 访问 KafkaOffset 时出现异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个来自 Kafka 的 Spark 消费者.我正在尝试管理恰好一次语义的偏移量.

I have a Spark consumer which streams from Kafka.I am trying to manage offsets for exactly-once semantics.

但是,在访问偏移量时会抛出以下异常:

However, while accessing the offset it throws the following exception:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD无法转换为 org.apache.spark.streaming.kafka.HasOffsetRanges"

执行此操作的代码部分如下:

The part of the code that does this is as below :

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform {
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

这里的 dataStream 是使用 KafkaUtils API 创建的直接流(DStream[String]),例如:

Here dataStream is a direct stream(DStream[String]) created using KafkaUtils API something like :

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

如果有人能帮助我理解我在这里做错了什么.转换是官方文档中提到的对数据流执行的方法链中的第一个方法

If somebody can help me understand what I am doing wrong here.transform is the first method in the chain of methods performed on datastream as mentioned in the official documentation as well

谢谢.

推荐答案

您的问题是:

.map(._2)

创建一个 MapPartitionedDStream 而不是 KafkaUtils.createKafkaStream 创建的 DirectKafkaInputDStream.

Which creates a MapPartitionedDStream instead of the DirectKafkaInputDStream created by KafkaUtils.createKafkaStream.

transform后需要map:

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))

kafkaStream
  .transform {
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
  .map(_._2)
  .foreachRDD(rdd => // stuff)

这篇关于从 RDD 访问 KafkaOffset 时出现异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 09:50