本文介绍了无法在foreachRDD中序列化SparkContext的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将流式数据从Kafka保存到cassandra.我能够读取和解析数据,但是当我在下面的行中调用以保存数据时,出现了Task not Serializable异常.我的课程正在扩展可序列化,但不确定为什么我会看到此错误,在谷歌搜索3个小时后没有得到太多帮助,某些机构可以提供任何指针吗?

I am trying to save the streaming data to cassandra from Kafka. I am able to read and parse the data but when I call below lines to save the data i am getting a Task not Serializable Exception. My class is extending serializable but not sure why i am seeing this error, didn't get much help ever after googling for 3 hours, can some body give any pointers ?

val collection = sc.parallelize(Seq((obj.id, obj.data)))
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))` 


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._

import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime

object StreamProcessor extends Serializable {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
      .set("spark.cassandra.connection.host", "127.0.0.1")

    val sc = new SparkContext(sparkConf)

    val ssc = new StreamingContext(sc, Seconds(2))

    val sqlContext = new SQLContext(sc)

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")

    val topics = args.toSet

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)

    stream.foreachRDD { rdd =>

      if (!rdd.isEmpty()) {
        try {

          rdd.foreachPartition { iter =>
            iter.foreach {
              case (key, msg) =>

                val obj = msgParseMaster(msg)

                val collection = sc.parallelize(Seq((obj.id, obj.data)))
                collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))

            }
          }

        }

      }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  import org.json4s._
  import org.json4s.native.JsonMethods._
  case class wordCount(id: Long, data: String) extends serializable
  implicit val formats = DefaultFormats
  def msgParseMaster(msg: String): wordCount = {
    val m = parse(msg).extract[wordCount]
    return m

  }

}

我得到

下面是完整的日志

推荐答案

SparkContext不可序列化,您不能在foreachRDD内使用它,并且从图形的使用中就不需要它.相反,您可以简单地映射每个RDD,解析出相关数据,然后将新的RDD保存到cassandra:

SparkContext isn't serializable, you can't use it inside foreachRDD, and from the use of your graph you don't need it. Instead, you can simply map over each RDD, parse out the relevant data and save that new RDD to cassandra:

stream
  .map { 
    case (_, msg) => 
      val result = msgParseMaster(msg)
      (result.id, result.data)
   }
  .foreachRDD(rdd => if (!rdd.isEmpty)
                       rdd.saveToCassandra("testKS",
                                           "testTable",
                                            SomeColumns("id", "data")))

这篇关于无法在foreachRDD中序列化SparkContext的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 08:31