有没有办法使用JavaScript解码主题__consumer_offsets中的数据?

现在我有类似的东西:

consumer.on('data: ', function(data) {
  if(data.value) {
    console.log('data.value', data.value.toString())
  }
})


并打印诸如
data.value:消费者范围,rdkafka-78bfd5c4-2a48-4573-ae6b-5f5957332b0b,rdkafka-78bfd5c4-2a48-4573-ae6b-5f5957332b0brdkafka / 192.168.7.190u0u0__consumer_offsets .__ consumer_offsets2

最佳答案

它看起来不像node-rdkafka(或者说kafka-node)包含必要的消息格式化程序,因此缺少自己编写的信息,可能不是直接编写的。

最好的选择可能是从命令行中的child_process中简单地读取它。

运行的命令如下所示:

kafka-console-consumer --consumer.config /tmp/consumer.config \
  --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
  --zookeeper localhost:2181 \
  --topic __consumer_offsets


可以作为子进程生成的代码如下所示:

import { spawn } from 'child_process';

const child = spawn('kafka-console-consumer', ['--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"', '--zookeeper', 'localhost:2181', '--topic', '__consumer_offsets'], { stdio: 'pipe' });

// create a handleData and handleError function
child.stdout.on('data', handleData);
child.stderr.on('data', handleError);


无需编写您自己的消息格式化程序来转换该主题的二进制消息,这可能是您最简单的方法。

以下是有关阅读Kafka消费者补偿的更多信息:https://www.ctheu.com/2017/08/07/looking-at-kafka-s-consumers-offsets/#consuming-consumer-offsets

09-13 00:31