编写自定义Kafka序列化器

编写自定义Kafka序列化器

本文介绍了编写自定义Kafka序列化器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在带有大量String数据类型的Kafka消息中使用自己的类.

I am using my own class in a Kafka message which has a bunch of String data types.

因此,我无法使用默认的序列化程序类或Kafka库随附的StringSerializer.

I therefore cannot use the default serializer class or the StringSerializer that comes with Kafka library.

我想我需要编写自己的序列化器并将其提供给生产者属性?

I guess I need to write my own serializer and feed it to the producer properties?

推荐答案

编辑

在较新的Kafka Clients中,实施Serializer而不是Encoder.

In newer Kafka Clients, implement Serializer rather than Encoder.

编写自定义序列化程序所需的条件是:

The things required for writing a custom serializer are:

  1. 使用为通用对象指定的对象实施Encoder
    • 需要提供VerifiableProperties构造函数
  1. Implement Encoder with an object specified for the generic
    • Supplying a VerifiableProperties constructor is required

为生产者声明自定义序列化器

正如您在问题中所指出的,Kafka提供了一种为生产者声明特定序列化器的方法.序列化程序类在ProducerConfig实例中设置,并且该实例用于构造所需的Producer类.

Declaring a custom serializer for a producer

As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig instance and that instance is used to construct the desired Producer class.

如果您遵循卡夫卡的制作人示例,将通过Properties对象构造ProducerConfig.构建属性文件时,请确保包括:

If you follow Kafka's Producer Example you will construct ProducerConfig via a Properties object. When building your properties file be sure to include:

props.put("serializer.class", "path.to.your.CustomSerializer");

有了类的路径,您希望Kafka在将消息附加到日志之前使用它来序列化消息.

With the path to the class you want Kafka to use to serialize messages before appending them to the log.

编写Kafka可以正确解释的自定义序列化程序需要实现Kafka提供的Encoder[T] scala类. 在Java中实现特征很奇怪,但是以下方法有效在我的项目中序列化JSON:

Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T] scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

您的问题听起来像是您正在对附加到日志中的所有消息使用一个对象(我们称其为CustomMessage).如果是这种情况,您的序列化器可能看起来像这样:

Your question makes it sound like you are using one object (lets call it CustomMessage) for all messages appended to your log. If that's the case, your serializer could look more like this:

package com.project.serializer;

public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

这会让您的属性配置看起来像这样:

Which would leave your property config to look like this:

props.put("serializer.class", "path.to.your.CustomSerializer");

这篇关于编写自定义Kafka序列化器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-11 06:01