是否可以将Java对象作为Kafka主题中的值发送,以及如何在spark中使用它?

我目前正在做apache-spark教程,想知道是否可以发送String以外的东西。本教程有这个例子

producer.send(new ProducerRecord<String, String>(topic, something_string));


可以做这样的事情吗?

Car car = new Car(brand, year, color);
producer.send(new ProducerRecord<String, Car>(topic, car));


以及以后如何在Spark中使用它?

目前我正在这样做:

String car = brand + "," + year + "," + color;
producer.send(new ProducerRecord<String, String>(topic, car));


我将所有内容都放在逗号分隔的字符串中。

问题2:此刻我以这种方式食用它。

Dataset<String> words = df
.selectExpr("CAST (value AS STRING)")
.as(Encoders.STRING());


我在哪里得到字符串:
    "brand,year,color"

如何将其拆分并放在单独的列中?

最佳答案

您的帖子实际上有两个问题,您可以将它们拆分为单独的帖子。对于第一个问题,请参考this post;中心概念是您必须编写自定义序列化程序。

第二,这个概念在原理上仍然相同,但是这次您必须在Spark端编写一个自定义解串器(解码器)。参考this Spark documentation,它演示了如何从Kafka创建流。但是,请不要使用'KafkaUtil'类,请参考javadoc。它具有使用Kafka解码器类创建流的方法。

07-24 19:18
查看更多