问题描述
我在我的卡夫卡消费者的code以下行。
I have the following line in my kafka consumer's code.
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
如何反序列化此流线到原来的对象? Serialisability是通过扩展类serialisable在卡夫卡生产者实施。我使用Scala实现这一点火花。
How to deserialize this stream "lines" into original object? Serialisability was implemented in the kafka producer by extending class to serialisable. I am implementing this in spark using scala.
推荐答案
您需要实现自定义的并共同提供期望的类型信息与德codeR到createStream功能。
You need to implement a custom Decoder and provide the expected type information together with the decoder to the createStream function.
KafkaUtils.createStream [关键字类型,值类型,KeyDe codeR,ValueDe codeR](...)
例如,如果您使用的是字符串
为键和 CustomContainer
的价值,你的流创作的样子这样的:
For example, if you are using String
as key and CustomContainer
as value, your stream creation will look like this:
val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)
既然你是enconding消息卡夫卡为新KeyedMessage [字符串,字符串]
,权德codeR是一个字符串去codeR像这样的:
Given that you are enconding the messages to kafka as new KeyedMessage[String,String]
, the right decoder is a string decoder like this:
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)
这会给你一个 DSTREAM [字符串,字符串]
为您处理的依据。
that will give you a DStream[String,String]
as basis for your processing.
如果您要发送/接收需要实现一个卡夫卡特定对象类型的并的它。
幸运的是, PcapPacket
已经实现了,你需要做到这一点的方法:
If you want to send/receive a specific object type you need to implement a Kafka Encoder and Decoder for it.Luckily for you, PcapPacket
already implements the methods that you require to do that:
-
PcapPacket - >字节[]:<一href=\"http://sourceforge.net/p/jnetpcap/$c$c/HEAD/tree/jnetpcap/releases/jnetpcap-1.3/release-1.3.0-1/src/java1.5/org/jnetpcap/packet/PcapPacket.java#l939\"相对=nofollow>公众诠释transferStateAndDataTo(字节[]缓冲区)
字节[] - > PcapPacket:<一href=\"http://sourceforge.net/p/jnetpcap/$c$c/HEAD/tree/jnetpcap/releases/jnetpcap-1.3/release-1.3.0-1/src/java1.5/org/jnetpcap/packet/PcapPacket.java#l400\"相对=nofollow>公共PcapPacket(字节[]缓冲区)
byte[] -> PcapPacket: public PcapPacket(byte[] buffer)
剩下的就是样板code落实卡夫卡所需要的恩codeR /德codeR接口。
The rest is boilerplate code to implement the Encoder/Decoder interfaces required by Kafka.
这篇关于如何使用Scala实现卡夫卡消费者deserialisation?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!