kafka 0.11.0.0版本丰富了kafka-consumer-groups脚本的功能,用户可以直接使用该脚本很方便地为已有的consumer group重新设置位移。

前提必须consumer group必须是inactive的,即不能是处于正在工作中的状态(即要停止正在消费它的程序或应用)。否则会出现以下报错:

先查看消费进度:

bin/kafka-consumer-groups.sh --bootstrap-server 172.16.10.91:9092,172.16.10.92:9092,172.16.10.93:9092 --describe --group logstash

注:LAG列为未消费的记录,如果有很多,说明消费延迟很严重。 

重设消费位移:

bin/kafka-consumer-groups.sh --bootstrap-server 172.16.10.91:9092,172.16.10.92:9092,172.16.10.93:9092 --group logstash --reset-offsets --topic logstash-log-dev --to-latest --execute

注:出现以上描述,则说明重设成功。 

参数说明:

  • 确定topic作用域——当前有3种作用域指定方式:--all-topics(为consumer group下所有topic的所有分区调整位移),--topic t1 --topic t2(为指定的若干个topic的所有分区调整位移),--topic t1:0,1,2(为指定的topic分区调整位移)

  • 确定位移重设策略——当前支持8种设置规则:

    • --to-earliest:把位移调整到分区当前最小位移

    • --to-latest:把位移调整到分区当前最新位移

    • --to-current:把位移调整到分区当前位移

    • --to-offset <offset>: 把位移调整到指定位移处

    • --shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动

    • --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

    • --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S

    • --from-file <file>:从CSV文件中读取调整策略

  • 确定执行方案——当前支持3种方案:

    • 什么参数都不加:只是打印出位移调整方案,不具体执行

    • --execute:执行真正的位移调整

    • --export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

01-16 23:02