我写了一个节点卡夫卡消费者。在极少数情况下,我会使用一个组ID启动kafka客户端,该组ID在某些偏移量可用时可用,但现在不再可用-导致调用“ offsetOutOfRange”事件。
在这种情况下,建议的行为是什么?记录错误并退出?有没有办法恢复?我一直想从上次提交的偏移量开始运行Zookeeper(如果存在且可用)。
client = new kafka.Client(ZOOKEEPER_URLS),
consumer = new Consumer(client, [], {
groupId: GROUP_ID,
fromOffset: true
});
consumer.on('offsetOutOfRange', function (topic) {
applicationLogger.error('Kafka consumer is trying to read from offset which is out of range', topic);
process.exit(1);
});
最佳答案
我想知道为什么在node-kafka-consumer中未实现此功能,但是在其他客户端中处理偏移超出范围错误的默认行为是发出OffsetRequest以获得最早或最新的可用偏移,然后设置使用者偏移达到新的价值,并继续获取。
这是完全可恢复的情况,您只需要指定要恢复到的偏移量-最早或最新可用。
关于node.js - 我应该在offsetOutOfRange上停止我的node-kafka-consumer吗?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38693546/