本文介绍了使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
这是一个 Spark Streaming 应用程序,它使用以 Proto Buf
编码的 Kafka 消息.使用 scalapb
库.我收到以下错误.请帮忙.
This is a Spark Streaming app that consumes Kafka messages encoded in Proto Buf
. Using scalapb
library. I am getting the following error. Please help.
> com.google.protobuf.InvalidProtocolBufferException: While parsing a
> protocol message, the input ended unexpectedly in the middle of a
> field. This could mean either that the input has been truncated or
> that an embedded message misreported its own length. at
> com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:82)
> at
> com.google.protobuf.CodedInputStream.skipRawBytesSlowPath(CodedInputStream.java:1284)
> at
> com.google.protobuf.CodedInputStream.skipRawBytes(CodedInputStream.java:1267)
> at
> com.google.protobuf.CodedInputStream.skipField(CodedInputStream.java:198)
> at com.example.protos.demo.Student.mergeFrom(Student.scala:59) at
> com.example.protos.demo.Student.mergeFrom(Student.scala:11) at
> com.trueaccord.scalapb.LiteParser$.parseFrom(LiteParser.scala:9) at
> com.trueaccord.scalapb.GeneratedMessageCompanion$class.parseFrom(GeneratedMessageCompanion.scala:103)
> at com.example.protos.demo.Student$.parseFrom(Student.scala:88) at
> com.trueaccord.scalapb.GeneratedMessageCompanion$class.parseFrom(GeneratedMessageCompanion.scala:119)
> at com.example.protos.demo.Student$.parseFrom(Student.scala:88) at
> StudentConsumer$.StudentConsumer$$parseLine$1(StudentConsumer.scala:24)
> at StudentConsumer$$anonfun$1.apply(StudentConsumer.scala:30) at
> StudentConsumer$$anonfun$1.apply(StudentConsumer.scala:30) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source) at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at
> org.apache.spark.scheduler.Task.run(Task.scala:86) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
以下是我的代码...
object StudentConsumer {
import com.trueaccord.scalapb.spark._
import org.apache.spark.sql.{ SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
def parseLine(s: String): Student =
Student.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))
val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","student").load()
val ds2 = ds1.selectExpr("CAST(value AS String)").as[String].map(str => parseLine(str))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
推荐答案
感谢@thesamet 的反馈.
Thanks @thesamet for the feedback.
以下代码有效...
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
这篇关于使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!