我创建了一个Scala类,如下所示:
case class MyObjectWithEventTime(value: MyObject, eventTime: Timestamp)
MyObject是一个Java对象。
我正在Spark结构化流作业中尝试按以下方式使用它:
implicit val myObjectEncoder: Encoder[MyObject] = Encoders.bean(classOf[MyObject])
val withEventTime = mystream
.select(from_json(col("value").cast("string"), schema).alias("value"))
.withColumn("eventTime", to_timestamp(col("value.timeArrived")))
.as[MyObjectWithEventTime]
.groupByKey(row => {... some code here
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
.filter(col("id").isNotNull)
.toJSON
.writeStream
.format("kafka")
.option("checkpointLocation", "/tmp")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", conf.KafkaProperties.outputTopic)
.option("checkpointLocation", "/tmo/checkpointLocation")
.outputMode("update")
.start()
.awaitTermination()
但是我不断收到这个错误...
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.xxx.MyObject
- field (class: "com.xxx.MyObject", name: "value")
- root class: "com.xxx.MyObjectWithEventTime"
最佳答案
尝试为MyObjectWithEventTime
定义编码器并使用Encoders.javaSerialization[T]
方法:
implicit val myObjectEncoder: Encoder[MyObject] = Encoders.javaSerialization[MyObject]
implicit val myObjectWithEventEncoder: Encoder[MyObjectWithEventTime] = Encoders.javaSerialization[MyObjectWithEventTime]
请记住,您的Java类
MyObject
应该实现Serializable,并且已经为所有字段实现了公共getter和setter。关于java - 嵌套Java类找不到编码器错误,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/60727292/