有没有办法使用JavaScript解码主题__consumer_offsets中的数据?
现在我有类似的东西:
consumer.on('data: ', function(data) {
if(data.value) {
console.log('data.value', data.value.toString())
}
})
并打印诸如
data.value:消费者范围,rdkafka-78bfd5c4-2a48-4573-ae6b-5f5957332b0b,rdkafka-78bfd5c4-2a48-4573-ae6b-5f5957332b0brdkafka / 192.168.7.190u0u0__consumer_offsets .__ consumer_offsets2
最佳答案
它看起来不像node-rdkafka
(或者说kafka-node
)包含必要的消息格式化程序,因此缺少自己编写的信息,可能不是直接编写的。
最好的选择可能是从命令行中的child_process
中简单地读取它。
运行的命令如下所示:
kafka-console-consumer --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 \
--topic __consumer_offsets
可以作为子进程生成的代码如下所示:
import { spawn } from 'child_process';
const child = spawn('kafka-console-consumer', ['--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"', '--zookeeper', 'localhost:2181', '--topic', '__consumer_offsets'], { stdio: 'pipe' });
// create a handleData and handleError function
child.stdout.on('data', handleData);
child.stderr.on('data', handleError);
无需编写您自己的消息格式化程序来转换该主题的二进制消息,这可能是您最简单的方法。
以下是有关阅读Kafka消费者补偿的更多信息:https://www.ctheu.com/2017/08/07/looking-at-kafka-s-consumers-offsets/#consuming-consumer-offsets