本文介绍了卡夫卡TOPIC_AUTHORIZATION_FAILED的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我实际上正在使用SASL纯文本设置简单的Kafka身份验证,并添加ACL授权。但是当我尝试使用数据时,我遇到了一个问题。

I'm actually working on setting up simple Kafka authentication using SASL Plain Text and add ACL authorization. But I have an issue when I try to consume data.

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 2 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 3 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 4 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 5 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 6 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 7 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 8 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 9 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 10 : {test-topic=TOPIC_AUTHORIZATION_FAILED}

下一步,您可以看到我的配置文件。

Next, you can see my configuration files.

server.properties

server.properties

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

producer.properties

producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=localhost:9092
compression.type=none

consumer.properties

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group

kafka_server_jaas.conf

kafka_server_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice-secret";
};

环境变量:

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"

命令

设置ACL:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation All --group test-consumer-group --topic test-topic

启动Kafka服务器:

start Kafka Server :

./bin/kafka-server-start.sh config/server.properties

开始制作人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties

初始消费者:

bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092

当我尝试启动使用者时,遇到了上述问题。另外,在kafka日志中,我有以下内容:

When I try to start the consumer, I have the issue described above. Also, in the kafka logs, I have this:

[2016-10-22 20:17:14,091] ERROR [KafkaApi-0] Error when handling request {group_id=test-consumer-group} (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at scala.Option.getOrElse(Option.scala:121)
    at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657)
    at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
    at java.lang.Thread.run(Thread.java:745)

如何解决此问题?

推荐答案

通过分离jaas客户端和jaas服务器来解决此问题。

Issue fixed by separating jaas client and jaas server.

kafka_server_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};

kafka_client_jaas.conf

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice-secret";
};

在同一终端上,导出jaas服务器conf文件并启动kafka代理:

On the same terminal, export jaas server conf file and start kafka broker:

$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"
$ ./bin/kafka-server-start.sh config/server.properties

在客户端上,导出客户端jaas conf文件并启动使用者:

On a client terminal, export client jaas conf file and start consumer:

$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
$ ./bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092

如果您还想生产,请在另一个终端窗口上执行:

If you also want to produce, do this on another terminal window:

$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties

这篇关于卡夫卡TOPIC_AUTHORIZATION_FAILED的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-11 15:42