问题描述
我正在尝试从__consumer_offsets主题进行消费,因为这似乎是检索有关诸如消息滞后之类的消费者的kafka指标的最简单方法.理想的方法是从jmx访问它,但想首先尝试使用此方法以及回来似乎是加密的或不可读的形式.尝试添加stringDeserializer属性.有人对如何更正此有任何建议吗?同样,对此的引用是
的副本虽然可以直接从 __ consumer_offsets
主题中进行读取,但这不是推荐或最简单的方法.
如果可以使用Kafka 2.0,最好的方法是使用AdminClient API来描述组:
- listConsumerGroupOffsets():查找特定组的所有偏移量
- describeConsumerGroups():查找有关组成员的详细信息
以防万一,您绝对希望直接读取 __ consumer_offset
形式的内容,需要对记录进行解码以使它们易于阅读.可以使用 GroupMetadataManager
类完成此操作:
-
GroupMetadataManager.readMessageKey()可用于解码消息密钥并检索该条目引用的主题分区.这可以返回2种类型的对象,对于消费者职位,您只对
OffsetKey
对象感兴趣. -
GroupMetadataManager.readOffsetMessageValue()可用于解码消息值(对于
OffsetKey
的键)并查找偏移量信息.
您链接的问题中的答案包含执行所有操作的框架代码.
还请注意,您不应将记录反序列化为字符串,而应将其保留为原始字节,以使这些方法能够正确地对其进行解码.
I am trying to consume from the __consumer_offsets topic as it seemed this may be the easiest way to retrieve kafka metrics about consumers like message lag etc. The ideal way is accessing it from jmx but wanted to try this first and the messages that come back seem to be encrypted or in unreadable form. Tried adding stringDeserializer property as well. Does anyone have any suggestions on how to correct this? Again the reference to this being a duplicate of
is not helpful as it does not reference my issue which is to read message as a string in java. Updated the code as well to try a consumerRecord using kafka.client consumer.
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
while (it.hasNext()) {
try {
String mesg = new String(it.next().message());
System.out.println( mesg);
code changes:
try {
// errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");
Properties consumerProps = new Properties();
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , "test");
consumerProps.put("bootstrap.servers", servers);
consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
//ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
// consumerConfig);
//Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//topicCountMap.put(topic, new Integer(1));
//Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps);
kconsumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = kconsumer.poll(10);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
kconsumer.close();
}
And the snapshot of what the message looks like below; at the bottom of the image:
While it's possible to directly read from the __consumer_offsets
topic, this is not the recommended or easiest method.
If you can use the Kafka 2.0, the best is to use the AdminClient APIs to describe groups:
- listConsumerGroupOffsets(): to find all offsets for a specific group
- describeConsumerGroups(): to find details about members of a group
In case, you absolutely want to read directly form __consumer_offset
, you need to decode the record to make them human readable. This can be done using the GroupMetadataManager
class:
GroupMetadataManager.readMessageKey() can be used to decode the message key and retrieve the topic-partition this entry refers to. This can return 2 types of objects, for consumer positions, you are only interested in
OffsetKey
objects.GroupMetadataManager.readOffsetMessageValue() can be used to decode message values (for keys that were
OffsetKey
) and find the offsets information.
This answer from the question you linked contains skeleton code to perform all that.
Also note that you should not deserialize the records as string but instead keep them as raw bytes for these methods to be able to decode them correctly.
这篇关于消费者阅读__consumer_offsets传递不可读的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!