本文介绍了o.s.k.r.ReplyingKafkaTemplate:针对以下情况的回复超时:ProducerRecord的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用带有"replyingKafkaTemplate"的kafka使用者发布一些消息.我的主要目的是订阅消息,对其进行修改,然后将修改后的消息发送回去.我试图增加replykafkaTemplate的replyTimeout.但是即使那样,我也没有收到订户的任何回应.生产者控制台显示以下内容.

I am trying to publish some message using kafka consumer with "replyingKafkaTemplate". My main prupose is to subscribe the message, modify it and sending back the modified message.I have tried increasing replyTimeout of replykafkaTemplate. But even that I am getting no response from subscriber. Producer console is showing following.

我尝试增加事务超时,请求超时.但是对我来说没有任何用.任何帮助将不胜感激.

I have tried increasing transaction timeout, request timeout. But nothing works for me. Any help would be appreciated.

预先感谢

这些是我的配置bean:

These are my config beans :

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
    return properties;
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return properties;
}

 @Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
        KafkaMessageListenerContainer<String, User> container) {

    ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
    replyTemplate.setReplyTimeout(30000);

    return replyTemplate;
}

这是我的消费者:

@KafkaListener(topics = "user",containerFactory="kafkaListenerContainerFactory")
@SendTo
public User listen(User user) throws InterruptedException {
    System.out.println("************* message published *************");
    user.setName("myName");
  return user;
}

WARN 8088 --- [TaskScheduler-1] oskrReplyingKafkaTemplate:针对以下内容的回复超时:ProducerRecord(topic = user,partition = null,headers = RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic,value = [117、115、101、114]),RecordHeader(键= kafka_correlationId,值= [85、92、37,-119、89、32、77,-1,-75,-107、106、42、68、12),-124,-105]),RecordHeader(key = __TypeId__,value = [99,111,109,46,107,97,102,107,97,46,109,111,100,101,108,46,85、115、101、114])]],isReadOnly = true),key = null,value = com.kafka.model.User @ 71178924,timestamp = null),且具有relatedId:[113462832283699872744219122180807230615]

推荐答案

您可能需要在配置ReplyingKafkaTemplate中添加sharedReplyTopic

You may be need to added sharedReplyTopic in your config ReplyingKafkaTemplate :

@Bean
public ReplyingKafkaTemplate<String, User, User> replyKafkaTemplate(ProducerFactory<String, User> pf,
    KafkaMessageListenerContainer<String, User> container) {
ReplyingKafkaTemplate<String, User, User> replyTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyTemplate.setReplyTimeout(30000);
replyTemplate.setSharedReplyTopic(true);
return replyTemplate;

}

这是我的完整示例的完整配置:

And here is my full example full config:

@Override
@Bean
public Map<String, Object> consumerConfigs() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put("ssl.endpoint.identification.algorithm", ``sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);

return props;
}

@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
    new StringDeserializer());
}

@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate);
return factory;
}

@Override
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer() {
ContainerProperties containerProperties = new ContainerProperties(customerIndexTopic,customerReplyTopic);
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
 }
 @Override
 @Bean
 public Map<String, Object> producerConfigs() {

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);

return props;
}

@Override
@Bean
public ProducerFactory<String, String> producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Override
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());
}

@Override
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate(
  KafkaMessageListenerContainer<String, String> container) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory(),container);
replyingKafkaTemplate.setSharedReplyTopic(true);
replyingKafkaTemplate.setReplyTimeout(10000);
return replyingKafkaTemplate;
}

这篇关于o.s.k.r.ReplyingKafkaTemplate:针对以下情况的回复超时:ProducerRecord的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 08:24