问题描述
我使用 Spark 2.1.
I use Spark 2.1.
我正在尝试使用 Spark Structured Streaming 从 Kafka 读取记录,反序列化它们并在之后应用聚合.
I am trying to read records from Kafka using Spark Structured Streaming, deserialize them and apply aggregations afterwards.
我有以下代码:
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
我想要的是将 value
字段反序列化到我的对象中,而不是将其转换为 String
.
What I want is to deserialize the value
field into my object instead of casting as String
.
我有一个自定义的解串器.
I have a custom deserializer for this.
public StatisticsRecord deserialize(String s, byte[] bytes)
如何在 Java 中执行此操作?
How can I do this in Java?
我找到的唯一相关链接是这个 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html,但这是针对 Scala 的.
The only relevant link I have found is this https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html, but this is for Scala.
推荐答案
定义 JSON 消息的架构.
Define schema for your JSON messages.
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("Id", DataTypes.IntegerType, false),
DataTypes.createStructField("Name", DataTypes.StringType, false),
DataTypes.createStructField("DOB", DataTypes.DateType, false) });
现在阅读如下消息.MessageData 是用于 JSON 消息的 JavaBean.
Now read Messages like below. MessageData is JavaBean for your JSON message.
Dataset<MessageData> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(MessageData.class));
这篇关于如何使用 Java 中的结构化流从 Kafka 反序列化记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!