本文介绍了如何使用Scala实现卡夫卡消费者deserialisation?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的卡夫卡消费者的code以下行。

I have the following line in my kafka consumer's code.

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)

如何反序列化此流线到原来的对象? Serialisability是通过扩展类serialisable在卡夫卡生产者实施。我使用Scala实现这一点火花。

How to deserialize this stream "lines" into original object? Serialisability was implemented in the kafka producer by extending class to serialisable. I am implementing this in spark using scala.

推荐答案

您需要实现自定义的并共同提供期望的类型信息与德codeR到createStream功能。

You need to implement a custom Decoder and provide the expected type information together with the decoder to the createStream function.

KafkaUtils.createStream [关键字类型,值类型,KeyDe codeR,ValueDe codeR](...)

例如,如果您使用的是字符串为键和 CustomContainer 的价值,你的流创作的样子这样的:

For example, if you are using String as key and CustomContainer as value, your stream creation will look like this:

val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)

既然你是enconding消息卡夫卡为新KeyedMessage [字符串,字符串] ,权德codeR是一个字符串去codeR像这样的:

Given that you are enconding the messages to kafka as new KeyedMessage[String,String], the right decoder is a string decoder like this:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)

这会给你一个 DSTREAM [字符串,字符串] 为您处理的依据。

that will give you a DStream[String,String] as basis for your processing.

如果您要发送/接收需要实现一个卡夫卡特定对象类型的并的它。
幸运的是, PcapPacket 已经实现了,你需要做到这一点的方法:

If you want to send/receive a specific object type you need to implement a Kafka Encoder and Decoder for it.Luckily for you, PcapPacket already implements the methods that you require to do that:


  • PcapPacket - >字节[]:<一href=\"http://sourceforge.net/p/jnetpcap/$c$c/HEAD/tree/jnetpcap/releases/jnetpcap-1.3/release-1.3.0-1/src/java1.5/org/jnetpcap/packet/PcapPacket.java#l939\"相对=nofollow>公众诠释transferStateAndDataTo(字节[]缓冲区)

字节[] - > PcapPacket:<一href=\"http://sourceforge.net/p/jnetpcap/$c$c/HEAD/tree/jnetpcap/releases/jnetpcap-1.3/release-1.3.0-1/src/java1.5/org/jnetpcap/packet/PcapPacket.java#l400\"相对=nofollow>公共PcapPacket(字节[]缓冲区)

byte[] -> PcapPacket: public PcapPacket(byte[] buffer)

剩下的就是样板code落实卡夫卡所需要的恩codeR /德codeR接口。

The rest is boilerplate code to implement the Encoder/Decoder interfaces required by Kafka.

这篇关于如何使用Scala实现卡夫卡消费者deserialisation?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 16:59