问题描述
KAFKA 主题(测试 3)
$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning
"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs
0
0
消费者(localhost:8082
上的 kafka-rest API)
Consumer (kafka-rest API on localhost:8082
)
- 创建一个消费者
POST
请求到http://localhost:8082/consumers/rested
请求正文:
{
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false"
}
响应正文:
{
"instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",
"base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"
}
- 创建订阅使用
POST
到http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
使用标题:
Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json
和请求正文:
{
"topics": [
"test3"
]
}
返回
204 No Content
的响应.
通过向
http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records 发出
GET
请求来读取记录
使用标题:
Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json
返回响应:
{
"error_code": 50002,
"message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}
我们如何解决此问题并确保我们收到数据?
How can we fix this issue and ensure that we receive the data?
异常(在 Kafka 上)
正在运行的Kafka Rest Proxy服务器日志有如下异常:
The running Kafka Rest Proxy server log has the following exception:
rest-proxy | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211 341 (io.confluent.rest-utils.requests)
rest-proxy | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy | at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy | at java.lang.Thread.run(Thread.java:748)
消费者组 CLI
我可以在 CLI 上查看消费者组,但它没有活动成员:
I can view the consumer-group on the CLI but it has no active members:
$ kafka-consumer-groups --bootstrap-server broker:9092 --list
有结果:
console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested
但是,当我尝试检索
members
时:
However, when I attempt to retrieve the
members
:
$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members
Consumer group 'rested' has no active members.
推荐答案
TL;DR
您需要将密钥用双引号括起来,不是因为所有的密钥都需要用引号括起来,而是因为使用 JSON 解析器,您需要使密钥成为有效的 JSON,而用双引号括起来的字符串是有效的 JSON.
You need to wrap your key in double-quotes, Not because all keys need to be wrapped in quotes but with a JSON parser you need to make your key valid JSON and a string wrapped in double-quotes is valid JSON.
如果您确实需要处理此消息,则需要以不同于 JSON 的格式读取它.
If you really need to process this message you would need to read it in a different format than JSON.
长答案
您有一个键没有引号的记录,这使得该值成为无效的 JSON,因此当 Jackson JSON 解析器尝试解析该键时,它不是有效的 JSON(从错误消息中不清楚,但是当它没有没有看到引号或方括号或大括号,它开始假定它是布尔值或空值).
You have a record with a key that does not have quotes which makes the value invalid JSON so when the Jackson JSON parser tries to parse the key it's not valid JSON (which isn't clear from the error message but when it doesn't see a quote or a square or curly bracket it starts to assume it's a boolean or null).
您可以在这里看到它在哪里抓取他们的密钥并尝试将其解码为 JSON
You can see where it's grabbing they key and trying to decode it as JSON here
I was able to reproduce your error using this method
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
http://localhost:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>"key"&{"foo":"bar"}
*Ctrl-C
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
此时我可以读取记录,但是当我添加一个没有引号的键时,我得到和你一样的错误
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>key&{"foo":"bar"}
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
com.fasterxml.jackson.core.JsonParseException:无法识别的令牌'key': 期待 ('true', 'false' 或 'null')
Use this to read your topics keys also
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning
这篇关于使用 Kafka rest API 读取数据的 JSONParseException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!