本文介绍了无待处理的回复:ConsumerRecord的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用ReplyingKafkaTemplate,并且间歇性地继续看到以下消息.

I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.

这将来自下面的代码

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
  if (this.sharedReplyTopic) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug(missingCorrelationLogMessage(record, correlationId));
    }
  }
  else if (this.logger.isErrorEnabled()) {
    this.logger.error(missingCorrelationLogMessage(record, correlationId));
  }
}

但只是偶尔发生

我还如下将共享的ReplyTopic设置为false,并试图强制更长的超时时间

I have also set the shared replyTopic to false as below and attempted to force a longer timeout

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
        replyKafkaTemplate.setSharedReplyTopic(false);
        replyKafkaTemplate.setReplyTimeout(10000);
        return replyKafkaTemplate;

我的容器如下

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setIdleEventInterval(10000L);
    factory.setConcurrency(3);
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}

推荐答案

如果是断断续续的,则很可能回复时间太长.消息似乎很清楚

If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear

每个客户端实例必须使用其自己的回复主题或专用分区.

Each client side instance must use it's own reply topic or dedicated partition.

编辑

如果收到的消息的相关性ID与this.futures中的当前条目不匹配(待答复),则会得到日志.这只能在以下情况下发生:

You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:

  1. 请求超时(在这种情况下,将有一个对应的WARN日志).
  2. 对模板进行stop()操作(在这种情况下,将清除this.futures).
  3. 由于某些原因(不会发生)重新发送已经处理过的回复.
  4. 将密钥添加到this.futures之前已收到答复.(因为它是在send()保存记录之前插入的,因此不会发生.)
  5. 服务器端针对同一请求发送2个或更多回复.
  6. 其他一些应用程序正在将数据发送到相同的答复主题.如果您可以使用DEBUG日志重现它,那将会有所帮助,因为然后我们还将相关密钥也记录在发送中.

这篇关于无待处理的回复:ConsumerRecord的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-21 06:51