本文介绍了o.s.k.r.ReplyingKafkaTemplate:回复超时:ProducerRecord的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用带有replyingKafkaTemplate"的 kafka 消费者发布一些消息.我的主要目的是订阅消息,修改它并发回修改后的消息.我试过增加replykafkaTemplate 的replyTimeout.但即使如此,我也没有收到订阅者的回复.Producer 控制台显示如下.

I am trying to publish some message using kafka consumer with "replyingKafkaTemplate". My main prupose is to subscribe the message, modify it and sending back the modified message.I have tried increasing replyTimeout of replykafkaTemplate. But even that I am getting no response from subscriber. Producer console is showing following.

我尝试增加事务超时、请求超时.但没有什么对我有用.任何帮助将不胜感激.

I have tried increasing transaction timeout, request timeout. But nothing works for me. Any help would be appreciated.

提前致谢

这些是我的配置 bean :

These are my config beans :

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
    return properties;
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return properties;
}

 @Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
        KafkaMessageListenerContainer<String, User> container) {

    ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
    replyTemplate.setReplyTimeout(30000);

    return replyTemplate;
}

这是我的消费者:

@KafkaListener(topics = "user",containerFactory="kafkaListenerContainerFactory")
@SendTo
public User listen(User user) throws InterruptedException {
    System.out.println("************* message published *************");
    user.setName("myName");
  return user;
}

WARN 8088 --- [TaskScheduler-1] oskrReplyingKafkaTemplate:回复超时:ProducerRecord(topic=user, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [117, 115, 101, 114]), RecordHeader(key = kafka_correlationId, value = [85, 92, 37, -119, 89, 32, 77, -1, -75, -107, 106, 42, 68, 12, -124, -105]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 107, 97, 102, 107, 97, 46, 109, 111, 100, 101, 6,108,85, 115, 101, 114])], isReadOnly = true), key=null, value=com.kafka.model.User@71178924, timestamp=null) 与相关性 ID: [11346283228369987274421912218068072]

推荐答案

您可能需要在配置 ReplyingKafkaTemplate 中添加 sharedReplyTopic :

You may be need to added sharedReplyTopic in your config ReplyingKafkaTemplate :

@Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
    KafkaMessageListenerContainer<String, User> container) {
ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyTemplate.setReplyTimeout(30000);
replyTemplate.setSharedReplyTopic(true);
return replyTemplate;

}

这是我的完整示例完整配置:

And here is my full example full config:

@Override
@Bean
public Map<String, Object> consumerConfigs() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put("ssl.endpoint.identification.algorithm", ``sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);

return props;
}

@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
    new StringDeserializer());
}

@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate);
return factory;
}

@Override
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer() {
ContainerProperties containerProperties = new ContainerProperties(customerIndexTopic,customerReplyTopic);
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
 }
 @Override
 @Bean
 public Map<String, Object> producerConfigs() {

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);

return props;
}

@Override
@Bean
public ProducerFactory<String, String> producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Override
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());
}

@Override
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(
  KafkaMessageListenerContainer<String, String> container) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory(),container);
replyingKafkaTemplate.setSharedReplyTopic(true);
replyingKafkaTemplate.setReplyTimeout(10000);
return replyingKafkaTemplate;
}

这篇关于o.s.k.r.ReplyingKafkaTemplate:回复超时:ProducerRecord的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-13 20:31