本文介绍了JSONParseException使用Kafka Rest API读取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

卡夫卡主题(test3)

$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning

"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs


0

0

消费者(localhost:8082上的kafka-rest API)

  1. 创建对http://localhost:8082/consumers/rested的使用者POST请求

请求正文:

 {
   "format": "json",
   "auto.offset.reset": "earliest",
   "auto.commit.enable": "false"
 }

响应正文:

{
   "instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",

   "base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"

}
  1. 创建POSThttp://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
  2. 的订阅

使用标题:

Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json

和请求正文:

{
    "topics": [
      "test3"
    ]
}

返回204 No Content的响应.

  1. 通过向http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records
  2. 发出GET请求来读取记录

使用标题:

Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json

返回响应:

{
    "error_code": 50002,
    "message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}

我们如何解决此问题并确保我们收到数据?

例外情况(在Kafka上)

正在运行的Kafka Rest代理服务器日志具有以下异常:

rest-proxy         | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211  341 (io.confluent.rest-utils.requests)
rest-proxy         | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy         | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         |    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy         |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy         |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy         |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy         |    at java.lang.Thread.run(Thread.java:748)

消费者组CLI

我可以在CLI上查看使用者组,但是它没有活动的成员:

$ kafka-consumer-groups --bootstrap-server broker:9092 --list

具有结果:

console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested

但是,当我尝试检索members时:

$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members

Consumer group 'rested' has no active members.
解决方案

TL; DR

您需要将密钥用双引号引起来,不是因为所有密钥都需要用引号引起来,而是使用JSON解析器,您需要使密钥有效JSON,而用双引号引起来的字符串是有效JSON./p>

如果您确实需要处理此消息,则需要以不同于JSON的格式读取它.

长期回答

您有一条记录,该记录的键没有引号,该键使该值变为无效的JSON,因此当Jackson JSON解析器尝试解析该键时,它不是有效的JSON(从错误消息中看不出来,但是当它不正确时)看不到引号,方括号或大括号,就开始假定它是布尔值或null).

您可以在此处查看它们在哪里获取密钥并尝试将其解码为JSON

我能够使用此方法重现您的错误

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
      http://localhost:8082/consumers/my_json_consumer

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
 http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription


./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>"key"&{"foo":"bar"}

*Ctrl-C

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

这时我可以读取记录,但是当我添加不带引号的键时,会得到与您相同的错误

./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>key&{"foo":"bar"}

现在,当我调用此代码时

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

现在我收到此错误

使用此选项还可以阅读您的主题键

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning

KAFKA TOPIC (test3)

$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning

"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs


0

0

Consumer (kafka-rest API on localhost:8082)

  1. Create a consumer POST request to http://localhost:8082/consumers/rested

Request Body:

 {
   "format": "json",
   "auto.offset.reset": "earliest",
   "auto.commit.enable": "false"
 }

Response Body:

{
   "instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",

   "base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"

}
  1. Create a subscription usihg POST to http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription

using Headers:

Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json

and Request Body:

{
    "topics": [
      "test3"
    ]
}

returns a Response of 204 No Content.

  1. Read records by making a GET request to http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records

using the Headers:

Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json

returns the Response:

{
    "error_code": 50002,
    "message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}

How can we fix this issue and ensure that we receive the data?

Exception (on Kafka)

The running Kafka Rest Proxy server log has the following exception:

rest-proxy         | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211  341 (io.confluent.rest-utils.requests)
rest-proxy         | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy         | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         |    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy         |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy         |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy         |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy         |    at java.lang.Thread.run(Thread.java:748)

Consumer-Groups CLI

I can view the consumer-group on the CLI but it has no active members:

$ kafka-consumer-groups --bootstrap-server broker:9092 --list

has the result:

console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested

However, when I attempt to retrieve the members:

$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members

Consumer group 'rested' has no active members.
解决方案

TL;DR

You need to wrap your key in double-quotes, Not because all keys need to be wrapped in quotes but with a JSON parser you need to make your key valid JSON and a string wrapped in double-quotes is valid JSON.

If you really need to process this message you would need to read it in a different format than JSON.

Long Answer

You have a record with a key that does not have quotes which makes the value invalid JSON so when the Jackson JSON parser tries to parse the key it's not valid JSON (which isn't clear from the error message but when it doesn't see a quote or a square or curly bracket it starts to assume it's a boolean or null).

You can see where it's grabbing they key and trying to decode it as JSON here

I was able to reproduce your error using this method

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
      http://localhost:8082/consumers/my_json_consumer

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
 http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription


./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>"key"&{"foo":"bar"}

*Ctrl-C

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

At this point I am able to read the record but when I add a key without the quotes I get the same error as you

./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>key&{"foo":"bar"}

Now when I call this code

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Now I receive this error

Use this to read your topics keys also

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning

这篇关于JSONParseException使用Kafka Rest API读取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 15:27