Kafka+zookeeper安全认证技术介绍

Kafka 目前支持多种认证方式,生产环境常见应用的SASL有以下几种:

企业在选择认证方式时,需要结合业务特点,考虑到企业数据安全性,又要考虑部署后的性能和部署复杂度等因素。在生产实际部署时,可以根据企业安全政策、网络因素、对性能的考量等因素来正确选择合适的认证方式,确保选型的认证机制符合本企业需求。

环境配置

本方案需要测试环境主机裸金属环境为3台主机,

由于登录Kafka、zookeeper认证会严重依赖服务器主机名,所以需先配置/etc/hosts。
设置每个主机的主机名,并确保每台主机名不重复,执行

hostnamectl set-hostname kafka1  &&  bash
cat >>/etc/hosts <<EOF
x.x.x.15 kafka1
x.x.x.16 kafka2
x.x.x.17 kafka3
EOF

配置SASL/PLAIN+ACL认证

kafka认证配置

目前需要编辑两个文件,一个是server.properties,另一个是kafka_server_jaas.conf。server.properties是kafka的主要配置文件,里面有很多参数可以调整。kafka_server_jaas.conf是sasl认证的配置文件,里面定义了认证的方式和用户信息。这两个文件都在kafka的config目录下。或者把kafka_server_jaas.conf设置在单独路径,但必须在kafka脚本里添加该路径变量。

Kafka的broker端配置:
若要开启SASL机制,需要在broker服务端进行三个方面的设置:
第一:配置“server.properties”的文件内容
第二:编写JAAS配置文件kafka_server_jaas.conf
第三:修改broker启动脚本等各种脚本,并指定JAAS的配置文件kafka_server_jaas.conf存放路径

编写broker服务端server.properties的SASL+ACL配置

分别在3台kafka节点修改,下面参数为SASL相关参数,其他参数正常配置。

vim  server.properties     
listeners=SASL_PLAINTEXT://x.x.x.15:9092  security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer   
super.users=User:sdx

参数解释:

全配置文件示例如下:

broker.id=11
zookeeper.request.timeout=30000
listeners=SASL_PLAINTEXT://x.x.x.15:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:sdx
delete.topic.enable=true
log.index.size.max.bytes=1024000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data1/kafka
log.index.size.max.bytes=1024000
log.index.interval.bytes=4000000
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=x.x.x.15:2181,x.x.x.16:2181,x.x.x.17:2181/kafka
zookeeper.connection.timeout.ms=180000
zookeeper.session.timeout.ms=60000
zookeeper.sync.time.ms =2000
group.initial.rebalance.delay.ms=0

编写broker端JAAS配置文件kafka_server_jaas.conf

创建包含所有客户端连接kafka认证用户信息的JAAS文件,认证用户可根据业务需要任意增加用户数量。
创建存放各种jaas配置文件目录

mkdir  /usr/local/kafka_2.13-2.7.1/else_config

文件内容如下:

vim  /usr/local/kafka_2.13-2.7.1/else_config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="sdx"       #kafka超级管理员,为broker间通信使用
password="sdx123"    #kafka超级管理员密码
user_pj="sdx123"     #客户端连kafka认证使用的用户sdx,密码sdx123
user_usera="usera1"   #客户端连kafka认证使用的用户usera,密码usera1。
user_userb="userb2";    #用户userb,密码userb2。末尾有分号,别漏掉!
};    #末尾有分号,别漏掉
Client {    # Client是broker作为客户端连接zk服务端的认证
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kz"     # broker连接zookeeper认证的用户
    password="kz123"; #broker连接zk认证的用户密码。末尾有分号别漏掉
};    #末尾有分号,别漏掉
注:
1)、user_name语句,是用来配置客户端连接Broker时使用的用户名和密码,也就是新建用户的用户名都以user_开头,等号后面是密码,密码可以是明文且没有复杂度要求。完整语句是user_USERNAME="PASSWORD"2)、规划用户usera给生产者使用,用户userb给消费者使用。
3.1.3	修改broker启动脚本,并指定JAAS的配置文件存放路径
下面黄色标识为新增变量,位置如下
vim   /usr/local/kafka_2.13-2.7.1/bin/kafka-run-class.sh    
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
KAFKA_OPTS='-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/kafka_server_jaas.conf'

编辑认证携带文件

因为开启了安全认证,所以执行命令需要携带含有认证用户信息的认证文件。认证文件仍可放在/usr/local/kafka_2.13-2.7.1/else_config路径下。
编写生产者用户usera的认证文件:

vim   usera_producer.properties     
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="usera" \
        password="usera1";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

编写消费者用户userb的认证文件:
如果后续指定此配置文件无法消费,需要先查出消费者组名称,然后在文件第一行添加group.id参数,并指定消费者组。

vim   userb_consumer.properties 
#group.id=console-consumer-94652
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="userb" \
        password="userb2";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
userb_consumer.properties的使用方法是通过--consumer.config参数携带。
编写生产者用户usera的认证文件:
vim   usera-writer-jaas.conf
KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="usera"
   password="usera1";
};
编写消费者用户userb的认证文件:
vim  userb-read-jaas.conf
KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="userb"
   password="userb2";
};

分别修改生产者脚本和消费者脚本

vim   /usr/local/kafka_2.13-2.7.1/bin/kafka-console-producer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/usera-writer-jaas.conf"
vim  /usr/local/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/userb-read-jaas.conf"

重启kafka

如果重启kafka会无法启动,会报与zookeeper认证失败。我们继续完善zookeeper认证策略设置。最后重启kakfa。

zookeeper认证配置

配置认证文件

编辑zookeeper服务端认证文件,Zookeeper服务端启用sasl认证,添加下面3行认证参数,其他参数正常设置。

vim  /usr/local/apache-zookeeper-3.6.4-bin/conf/zoo.cfg
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider       #authProvider.1表示开启认证功能,zookeeper配置SASL支持
requireClientAuthScheme=sasl  #开启SASL的认证方式
zookeeper.sasl.client=true    #开启SASL身份验证
Zookeeper配置文件全文如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data1/zookeeper/snapshot
dataLogDir=/data1/zookeeper/datalogdir
4lw.commands.whitelist=*
clientPort=2181
server.11=x.x.x.15:2888:3888
server.22=x.x.x.16:2888:3888
server.33=x.x.x.17:2888:3888
zookeeper.session.timeout.ms=60000
zookeeper.connection.timeout.ms=60000
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
jaas用于定义认证身份信息,我们使用LoginModule实现,举例如org.apache.zookeeper.server.auth.DigestLoginModule,它是基于明文用户、密码的形式进行身份认证。
vim  /usr/local/apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="sdx"     #pj用户是zk集群之间认证使用的
password="sdx123"  #pj用户密码
user_sdx="sdx123"
user_kz="kz123"   #用户kz是客户端broker(或zkCli.sh)与zk之间使用的,密码是kz123要与kafka_server_jaas.conf的Clinet里的用户密码相同。
user_usera="usera1"    #生产用户,供生产者程序认证使用
user_userb="userb2";   #消费用户,供消费者程序认证使用
};

导入kafka认证的相关jar到zookeeper

Zookeeper的认证机制是使用插件,所以需要导入Kafka相关jar包,kafka-clients的相关jar包,在kafka服务下的lib目录中可以找到,因kafka版本不同,相关jar包版本会有所变化。
创建存放kafk的jar包目录

/usr/local/apache-zookeeper-3.6.4-bin/lib

从kafka/lib目录下复制以下5个jar包到每个zookeeper主机的lib目录下,包名分别如下:

kafka-clients-2.3.0.jar   
lz4-java-1.6.0.jar   
slf4j-api-1.7.25.jar   
slf4j-log4j12-1.7.25.jar   
snappy-java-1.1.7.3.jar

修改zookeeper的zkEnv.sh脚本文件

添加jaas配置文件添加到zookeeper的环境变量,添加如下新增变量SERVER_JVMFLAGS。黄色标识为新增变量。

vim  /usr/local/apache-zookeeper-3.6.4-bin/bin/zkEnv.sh
ZOOBINDIR="${ZOOBINDIR:-/usr/bin}"
ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf"
#check to see if the conf dir is given as an optional argument

重启组件

先使用脚本关闭3台kafka进程,再关闭3台zookeeper组件进程,确认进程停止后,最后启动zookeeper进程,再启动3台kakfa进程。

进入zookeeper客户端查看ACL节点

发现会多出与ACL认证授权相关的节点

# /usr/local/apache-zookeeper-3.6.4-bin/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[kafka, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, kafka-acl, kafka-acl-changes, kafka-acl-extended, kafka-acl-extended-changes, latest_producer_id_block, log_dir_event_notification]

kafka命令使用方法举例

创建topic和删除topic

需要使用(必须使用)–zookeeper方式创建:
如果使用–bootstrap-server x.x.x.15:9092方式创建,则kafka日志会报Failed authentication

  1. 不指定用户创建
 ./bin/kafka-topics.sh --zookeeper  x.x.x.15:2181/kafka  --create --topic abc --replication-factor 3 --partitions 3
  1. 指定用户创建
 ./bin/kafka-topics.sh --zookeeper  x.x.x.15:2181/kafka  --create --topic abc --replication-factor 3 --partitions 3 --command-config  /usr/local/kafka_2.13-2.7.1/else_config/usera_producer.properties
  1. 删除topic操作
 ./bin/kafka-topics.sh --zookeeper  x.x.x.15:2181/kafka --delete --topic dsc

查询topic

需要使用–zookeeper方式查询

./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --list
./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --describe

改topic分区数量

需要使用 --zookeeper方式

./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --topic abc  --alter --partitions 4

输出如下

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

ACL授权操作

对生产类用户授权

生产用户(如usera)对某topic授权某操作(如生产、写数据)权限:(acl授权过程也可通过业务端java代码实现,也可通过运维手动授权实现)

# ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --add --allow-principal User:usera --operation Write --topic abc

对消费类用户授权

消费用户(如userb)对某topic、某主机授权某操作(如消费、读数据)权限:(acl授权过程也可通过业务端java代码实现,也可通过运维手动授权实现)

./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --add --allow-principal User:userb --operation Read --topic abc

查看授权情况

./bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.x.15:2181/kkafka
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topic-new, patternType=LITERAL)`: 
 	(principal=User:usera, host=*, operation=WRITE, permissionType=ALLOW)
	(principal=User:userb, host=*, operation=READ, permissionType=ALLOW) 

对消费者组授权

对消费者组进行acl授权:对消费者组console-consumer-48035认证授权消费topic abc

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --allow-principal User:userb  --consumer --topic=abc --group=console-consumer-48035  --add

查看acl授权信息

也可–topic指定topic,–group指定消费组

./bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka

生产和消费

生产

必须使用–bootstrap-server方式生产,必须携带认证文件

./bin/kafka-console-producer.sh --bootstrap-server x.x.x.15:9092  --topic abc --producer.config  /usr/local/kafka_2.13-2.7.1/else_config/usera_producer.properties

消费

必须使用–bootstrap-server方式消费,必须携带认证文件

./bin/kafka-console-consumer.sh --bootstrap-server x.x.x.15:9092 --topic abc --from-beginning --consumer.config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

如果消费报错如 Not authorized to access group: console-consumer-48035,需要对console-consumer-48035消费者组认证。
方式如下:只添加group.id参数即可

vim /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties
group.id=console-consumer-48035
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="userb" \
        password="userb";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

消费者组相关操作

查询消费组–list

查询消费者组(粗查询),需要使用–bootstrap-server

 ./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.15:9092 --list  --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

查询消费组–describe

查询消费者组(细查询–group指定消费组 或–all-groups查所有组),需要使用–bootstrap-server查询。

./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.15:9092 --describe --all-groups   --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

查看消费组消费偏移量

需要使用–bootstrap-server查询。

./bin/kafka-consumer-groups.sh  --describe --bootstrap-server x.x.x.15:9092    --group console-consumer-48035   --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

查看log日志文件内容

查看Log文件基本数据信息

# /usr/local/kafka_2.13-2.7.1/bin/kafka-dump-log.sh --files /data1/kafka/pctest-1/00000000000000000001.log --print-data-log
Dumping /data1/kafka/pctest-1/00000000000000000001.log
Starting offset: 1
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1715322009004 size: 72 magic: 2 compresscodec: NONE crc: 3420315536 isvalid: true
| offset: 1 CreateTime: 1715322009004 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: test

查看Log文件具体数据信息

# /usr/local/kafka_2.13-2.7.1/bin/kafka-dump-log.sh --files /data1/kafka/aaa-0/00000000000000000000.log --print-data-log
Dumping /data1/kafka/aaa-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 0 CreateTime: 1718268803238 size: 200 magic: 2 compresscodec: NONE crc: 710512664 isvalid: true
| offset: 0 CreateTime: 1718268803238 keysize: -1 valuesize: 130 sequence: -1 headerKeys: [] payload: ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ

查看index文件具体内容

./kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.index
offset: 972865 position: 25163202
offset: 973495 position: 25179579
offset: 974125 position: 25195956
offset: 974755 position: 25212333
offset: 975385 position: 25228710
offset: 976015 position: 25245087
offset: 976645 position: 25261464
offset: 977275 position: 25277841

查看timeindex文件具体内容

./kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.timeindex

输出如下:

timestamp: 1691292274425 offset: 475709
timestamp: 1691292274426 offset: 476947
timestamp: 1691292274427 offset: 478255
timestamp: 1691292274428 offset: 479543
timestamp: 1691292274429 offset: 480848
timestamp: 1691292274430 offset: 481767
timestamp: 1691292274431 offset: 483209
timestamp: 1691292274432 offset: 484869
timestamp: 1691292274433 offset: 486408

哪里不懂可以在评论中评论~

10-15 16:37