本文介绍了使用 Kafka rest API 读取数据的 JSONParseException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

KAFKA 主题(测试 3)

$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning

"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs


0

0

消费者(localhost:8082 上的 kafka-rest API)

Consumer (kafka-rest API on localhost:8082)

  1. 创建一个消费者POST请求到http://localhost:8082/consumers/rested

请求正文:

 {
   "format": "json",
   "auto.offset.reset": "earliest",
   "auto.commit.enable": "false"
 }

响应正文:

{
   "instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",

   "base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"

}
  1. 创建订阅使用 POSThttp://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription

使用标题:

Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json

和请求正文:

{
    "topics": [
      "test3"
    ]
}

返回 204 No Content 的响应.

  1. 通过向 http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records 发出 GET 请求来读取记录

使用标题:

Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json

返回响应:

{
    "error_code": 50002,
    "message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}

我们如何解决此问题并确保我们收到数据?

How can we fix this issue and ensure that we receive the data?

异常(在 Kafka 上)

正在运行的Kafka Rest Proxy服务器日志有如下异常:

The running Kafka Rest Proxy server log has the following exception:

rest-proxy         | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211  341 (io.confluent.rest-utils.requests)
rest-proxy         | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy         | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         |    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy         |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy         |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy         |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy         |    at java.lang.Thread.run(Thread.java:748)

消费者组 CLI

我可以在 CLI 上查看消费者组,但它没有活动成员:

I can view the consumer-group on the CLI but it has no active members:

$ kafka-consumer-groups --bootstrap-server broker:9092 --list

有结果:

console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested

但是,当我尝试检索 members 时:

However, when I attempt to retrieve the members:

$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members

Consumer group 'rested' has no active members.

推荐答案

TL;DR

您需要将密钥用双引号括起来,不是因为所有的密钥都需要用引号括起来,而是因为使用 JSON 解析器,您需要使密钥成为有效的 JSON,而用双引号括起来的字符串是有效的 JSON.

You need to wrap your key in double-quotes, Not because all keys need to be wrapped in quotes but with a JSON parser you need to make your key valid JSON and a string wrapped in double-quotes is valid JSON.

如果您确实需要处理此消息,则需要以不同于 JSON 的格式读取它.

If you really need to process this message you would need to read it in a different format than JSON.

长答案

您有一个键没有引号的记录,这使得该值成为无效的 JSON,因此当 Jackson JSON 解析器尝试解析该键时,它不是有效的 JSON(从错误消息中不清楚,但是当它没有没有看到引号或方括号或大括号,它开始假定它是布尔值或空值).

You have a record with a key that does not have quotes which makes the value invalid JSON so when the Jackson JSON parser tries to parse the key it's not valid JSON (which isn't clear from the error message but when it doesn't see a quote or a square or curly bracket it starts to assume it's a boolean or null).

您可以在这里看到它在哪里抓取他们的密钥并尝试将其解码为 JSON

You can see where it's grabbing they key and trying to decode it as JSON here

https://github.com/confluentinc/kafka-rest/blob/a9b7cc527a26fdf09db27d148f2e71bfe3d87a6a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/State

我能够使用这种方法重现您的错误

I was able to reproduce your error using this method

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
      http://localhost:8082/consumers/my_json_consumer

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
 http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription


./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>"key"&{"foo":"bar"}

*Ctrl-C

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

此时我可以读取记录,但是当我添加一个没有引号的键时,我得到和你一样的错误

At this point I am able to read the record but when I add a key without the quotes I get the same error as you

./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>key&{"foo":"bar"}

现在当我调用这个代码

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

现在我收到这个错误

com.fasterxml.jackson.core.JsonParseException:无法识别的令牌'key': 期待 ('true', 'false' 或 'null')

也可以使用它来读取您的主题键

Use this to read your topics keys also

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning

这篇关于使用 Kafka rest API 读取数据的 JSONParseException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

查看更多