是否可以将Java对象作为Kafka主题中的值发送,以及如何在spark中使用它?
我目前正在做apache-spark教程,想知道是否可以发送String以外的东西。本教程有这个例子
producer.send(new ProducerRecord<String, String>(topic, something_string));
可以做这样的事情吗?
Car car = new Car(brand, year, color);
producer.send(new ProducerRecord<String, Car>(topic, car));
以及以后如何在Spark中使用它?
目前我正在这样做:
String car = brand + "," + year + "," + color;
producer.send(new ProducerRecord<String, String>(topic, car));
我将所有内容都放在逗号分隔的字符串中。
问题2:此刻我以这种方式食用它。
Dataset<String> words = df
.selectExpr("CAST (value AS STRING)")
.as(Encoders.STRING());
我在哪里得到字符串:
"brand,year,color"
如何将其拆分并放在单独的列中?
最佳答案
您的帖子实际上有两个问题,您可以将它们拆分为单独的帖子。对于第一个问题,请参考this post;中心概念是您必须编写自定义序列化程序。
第二,这个概念在原理上仍然相同,但是这次您必须在Spark端编写一个自定义解串器(解码器)。参考this Spark documentation,它演示了如何从Kafka创建流。但是,请不要使用'KafkaUtil'类,请参考javadoc。它具有使用Kafka解码器类创建流的方法。