问题描述
卡夫卡主题(test3)
$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning
"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs
0
0
消费者(localhost:8082
上的kafka-rest API)
- 创建对
http://localhost:8082/consumers/rested
的使用者POST
请求
请求正文:
{
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false"
}
响应正文:
{
"instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",
"base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"
}
- 创建
POST
到http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
的订阅
使用标题:
Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json
和请求正文:
{
"topics": [
"test3"
]
}
返回204 No Content
的响应.
- 通过向
http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records
发出
GET
请求来读取记录使用标题:
Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json
返回响应:
{
"error_code": 50002,
"message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}
我们如何解决此问题并确保我们收到数据?
例外情况(在Kafka上)
正在运行的Kafka Rest代理服务器日志具有以下异常:
rest-proxy | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211 341 (io.confluent.rest-utils.requests)
rest-proxy | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy | at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy | at java.lang.Thread.run(Thread.java:748)
消费者组CLI
我可以在CLI上查看使用者组,但是它没有活动的成员:
$ kafka-consumer-groups --bootstrap-server broker:9092 --list
具有结果:
console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested
但是,当我尝试检索members
时:
$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members
Consumer group 'rested' has no active members.
TL; DR
您需要将密钥用双引号引起来,不是因为所有密钥都需要用引号引起来,而是使用JSON解析器,您需要使密钥有效JSON,而用双引号引起来的字符串是有效JSON./p>
如果您确实需要处理此消息,则需要以不同于JSON的格式读取它.
长期回答
您有一条记录,该记录的键没有引号,该键使该值变为无效的JSON,因此当Jackson JSON解析器尝试解析该键时,它不是有效的JSON(从错误消息中看不出来,但是当它不正确时)看不到引号,方括号或大括号,就开始假定它是布尔值或null).
您可以在此处查看它们在哪里获取密钥并尝试将其解码为JSON
我能够使用此方法重现您的错误
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
http://localhost:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>"key"&{"foo":"bar"}
*Ctrl-C
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
这时我可以读取记录,但是当我添加不带引号的键时,会得到与您相同的错误
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>key&{"foo":"bar"}
现在,当我调用此代码时
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
现在我收到此错误
使用此选项还可以阅读您的主题键
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning
KAFKA TOPIC (test3)
$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning
"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs
0
0
Consumer (kafka-rest API on localhost:8082
)
- Create a consumer
POST
request tohttp://localhost:8082/consumers/rested
Request Body:
{
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false"
}
Response Body:
{
"instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",
"base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"
}
- Create a subscription usihg
POST
tohttp://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
using Headers:
Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json
and Request Body:
{
"topics": [
"test3"
]
}
returns a Response of 204 No Content
.
- Read records by making a
GET
request tohttp://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records
using the Headers:
Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json
returns the Response:
{
"error_code": 50002,
"message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}
How can we fix this issue and ensure that we receive the data?
Exception (on Kafka)
The running Kafka Rest Proxy server log has the following exception:
rest-proxy | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211 341 (io.confluent.rest-utils.requests)
rest-proxy | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy | at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy | at java.lang.Thread.run(Thread.java:748)
Consumer-Groups CLI
I can view the consumer-group on the CLI but it has no active members:
$ kafka-consumer-groups --bootstrap-server broker:9092 --list
has the result:
console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested
However, when I attempt to retrieve the members
:
$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members
Consumer group 'rested' has no active members.
TL;DR
You need to wrap your key in double-quotes, Not because all keys need to be wrapped in quotes but with a JSON parser you need to make your key valid JSON and a string wrapped in double-quotes is valid JSON.
If you really need to process this message you would need to read it in a different format than JSON.
Long Answer
You have a record with a key that does not have quotes which makes the value invalid JSON so when the Jackson JSON parser tries to parse the key it's not valid JSON (which isn't clear from the error message but when it doesn't see a quote or a square or curly bracket it starts to assume it's a boolean or null).
You can see where it's grabbing they key and trying to decode it as JSON here
I was able to reproduce your error using this method
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
http://localhost:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>"key"&{"foo":"bar"}
*Ctrl-C
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
At this point I am able to read the record but when I add a key without the quotes I get the same error as you
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>key&{"foo":"bar"}
Now when I call this code
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Now I receive this error
Use this to read your topics keys also
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning
这篇关于JSONParseException使用Kafka Rest API读取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!