依赖

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

配置

spring:
  kafka:
    bootstrap-servers: 外网ip:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: -1
    consumer:
      group-id: test
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 10
    listener:
      concurrency: 3
      type: batch
      ack-mode: manual

配置类

@Configuration
@EnableKafka
public class KafkaConfig {

    @Autowired
    private KafkaProperties properties;

    /**
     * 创建一个新的消费者工厂
     * 创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> kafkaConsumerFactory() {
        Map<String, Object> map =  properties.buildConsumerProperties();
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
        return factory;
    }

    /**
     * 创建一个新的消费者工厂
     * 但是修改为不自动提交
     *
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> kafkaManualConsumerFactory() {
        Map<String, Object> map =  properties.buildConsumerProperties();
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
        return factory;
    }


    /**
     * 手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaManualConsumerFactory());
        //设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //带批量功能
        factory.setBatchListener(true);
        return factory;
    }

    /**
     * 监听器工厂 批量消费
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
}

生产者

消息实体类

@AllArgsConstructor
@Data
@NoArgsConstructor
@ToString
public class Message {
    private Long id;
    private String msg;
    private Date time;
}
@Component
@Slf4j
public class KafkaProducer {
    private static final String TOPIC = "pktest";
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @SuppressWarnings("unchecked")
    public void produce(Message message) {
        try {
            ListenableFuture future = kafkaTemplate.send(TOPIC, JSON.toJSONString(message));
            SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(@Nullable SendResult<String, String> result) {
                    log.info("发送消息成功");
                }
            };
            FailureCallback failureCallback = new FailureCallback() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("发送消息失败",ex);
                    produce(message);
                }
            };
            future.addCallback(successCallback,failureCallback);
        } catch (Exception e) {
            log.error("发送消息异常",e);
        }
    }

    @Scheduled(fixedRate = 1000 * 10)
    public void send() {
        Message message = new Message(12L,"helloword",new Date());
        produce(message);
    }
}

消费者

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "pktest", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void consumer(ConsumerRecords<String, String> records, Acknowledgment ack) {
        try {
            records.partitions().stream().flatMap(partition -> records.records(partition).stream())
                    .forEach(record -> {
                        log.info("接收消息: offset = {}, key = {}, value = {} ",
                                record.offset(), record.key(), record.value());
                        ack.acknowledge();
                    });
        } catch (Exception e) {
            log.error("kafka接收消息异常", e);
        }
    }
}
04-25 12:03