问题描述
我正在与Kafka合作,并尝试通过设置以下文章.唯一的区别是我创建了自己的抽象类,处理程序以使设计更简单.
I am working with Kafka and trying to setup consumer group by follwing this article. The only difference is I have created my own abstract class, handler to make design simpler.
下面是我的抽象类:
public abstract class Consumer implements Runnable {
private final Properties consumerProps;
private final String consumerName;
public Consumer(String consumerName, Properties consumerProps) {
this.consumerName = consumerName;
this.consumerProps = consumerProps;
}
protected abstract void shutdown();
protected abstract void run(String consumerName, Properties consumerProps);
@Override
public final void run() {
run(consumerName, consumerProps);
}
}
下面是我的KafkaConsumerA
,它扩展到抽象类之上:
Below is my KafkaConsumerA
which extends above abstract class:
public class KafkaConsumerA extends Consumer {
private KafkaConsumer<byte[], DataHolder> consumer;
public KafkaConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}
@Override
public void shutdown() {
consumer.wakeup();
}
@Override
protected void run(String consumerName, Properties consumerProps) {
// exception comes from below line from two of the threads and the remaining one thread works fine.
consumer = new KafkaConsumer<>(consumerProps);
List<String> topics = getTopicsBasisOnConsumerName(consumerName);
try {
consumer.subscribe(topics);
// Setup the schema config
Map<String, Object> config = new HashMap<>();
config.put("urls", "https://abc.qa.host.com");
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
while (true) {
ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
for (ConsumerRecord<byte[], DataHolder> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out
.println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
}
}
} catch (WakeupException ex) {
ex.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
consumer.close();
}
}
}
下面是我的Handler类:
And below is my Handler class:
// looks like something is wrong in this class
public final class ConsumerHandler {
private final ExecutorService executorServiceProcess;
private final Consumer consumer;
private final List<Consumer> consumers = new ArrayList<>();
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
consumers.add(consumer);
executorServiceProcess.submit(consumer);
}
}
public void shutdown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Consumer consumer : consumers) {
consumer.shutdown();
}
executorServiceProcess.shutdown();
try {
executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}
}
这是我要从主要类别中开始我的消费者组中所有消费者的事情:
And here is I am starting all my consumers in the consumer group from the main class:
public static void main(String[] args) {
ConsumerHandler handlerA =
new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
// run KafkaConsumerB here
handlerA.shutdown();
// shutdown KafkaConsumerB here
}
因此,我的计划是建立一个消费者组,其中三个消费者位于KafkaConsumerA
中,所有三个消费者都订阅了相同的主题.
So with this - my plan is to setup a consumer group with three consumers in KafkaConsumerA
and all three subscribed to same topics.
错误:-
每当我运行此命令时,看起来消费者组中只有一个消费者有效,而其他两个则无效.我从这两个控制台上看到了此异常:
Whenever I run this, looks like only one consumer in the consumer group works and other two doesn't work. And I see this exception on the console from those two:
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]
我在这里做什么错? getConsumerProps()
方法返回属性对象,该对象中具有client.id
和group.id
的值对于该使用者组中的所有三个使用者均具有相同的值.
What is wrong I am doing here? getConsumerProps()
method return properties object which has client.id
and group.id
in it with same value for all three consumers in that consumer group.
以下是我的设计详细信息:
- 我的
KafkaConsumerA
在一个消费者组中将有三个消费者,每个消费者将在topicA
上工作. - 我的
KafkaConsumerB
(类似于KafkaConsumerA)将在不同的消费者组中有两个消费者,并且每个消费者都将在topicB
上工作.
- My
KafkaConsumerA
will have three consumers in a consumer group and each consumer will work ontopicA
. - My
KafkaConsumerB
(similar to KafkaConsumerA) will have two consumers in a different consumer group and each of those consumer will work ontopicB
.
这两个使用者KafkaConsumerA
和KafkaConsumerB
将在同一个盒子上运行,并且不同的使用者组彼此独立.
And these two consumers KafkaConsumerA
and KafkaConsumerB
will be running on same box with different consumer group independent of each other.
推荐答案
Kafka尝试注册 MBeans 用于应用程序监视,并且正在使用client.id
进行监视.如您所说,您已在抽象类中注入了属性,并为每个使用者在组A
中注入了相同的client.id
和group.id
.但是,您有不同的客户端,因此应给他们自己的client.id
,但要保留相同的group.id
.这将在同一消费者组中注册不同的客户/消费者,并使他们一起工作,但不会在MBeans注册上发生冲突.
Kafka is trying to register MBeans for application monitoring and is using the client.id
to do so. As you said, you have the properties injected in your abstract class and inject for every consumer the same client.id
and group.id
in group A
. However, you have different clients, so you should give them their own client.id
, but keep the same group.id
. This will register the different client/consumers in the same consumer group and make them work together, but not clash on the MBeans registration.
这篇关于来自Kafka使用者的InstanceAlreadyExistsException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!