我在具有大量String数据类型的Kafka消息中使用自己的类。

因此,我不能使用默认的序列化程序类或Kafka库随附的StringSerializer

我想我需要编写自己的序列化器并将其提供给生产者属性?

最佳答案

编辑
在较新的Kafka Clients中,实现Serializer而不是Encoder

编写自定义序列化程序所需的操作是:

  • 使用为泛型指定的对象实现Encoder
  • 需要提供VerifiableProperties构造函数

  • 覆盖toBytes(...)方法,确保返回字节数组
  • 将序列化程序类注入(inject)ProducerConfig

  • 为生产者声明自定义序列化器
    正如您在问题中指出的那样,Kafka提供了一种为生产者声明特定序列化器的方法。序列化程序类在ProducerConfig实例中设置,并且该实例用于构造所需的Producer类。
    如果遵循Kafka's Producer Example,则将通过ProducerConfig对象构造Properties。构建属性文件时,请确保包括:
    props.put("serializer.class", "path.to.your.CustomSerializer");
    
    有了类的路径,您希望Kafka在将消息附加到日志之前用于序列化消息。
    创建Kafka可以理解的自定义序列化程序
    编写Kafka可以正确解释的自定义序列化程序需要实现Kafka提供的Encoder[T] scala类。 Implementing traits in java is weird,但是以下方法可用于在我的项目中序列化JSON:
    public class JsonEncoder implements Encoder<Object> {
        private static final Logger logger = Logger.getLogger(JsonEncoder.class);
        // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
        private static final ObjectMapper objectMapper = new ObjectMapper();
    
        public JsonEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }
    
        @Override
        public byte[] toBytes(Object object) {
            try {
                return objectMapper.writeValueAsString(object).getBytes();
            } catch (JsonProcessingException e) {
                logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            }
            return "".getBytes();
        }
    }
    
    您的问题听起来像是您对附加到日志中的所有消息使用一个对象(我们称其为CustomMessage)。如果是这样,您的序列化器可能看起来像这样:
    package com.project.serializer;
    
    public class CustomMessageEncoder implements Encoder<CustomMessage> {
        public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }
    
        @Override
        public byte[] toBytes(CustomMessage customMessage) {
            return customMessage.toBytes();
        }
    }
    
    这将使您的属性配置看起来像这样:
    props.put("serializer.class", "path.to.your.CustomSerializer");
    

    10-07 18:58
    查看更多