本文介绍了由以下原因引起:com.fasterxml.jackson.core.JsonParseException:无法识别的令牌“ÿ":预期(JSON字符串,数字,数组,对象或令牌“空",的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我使用的是Spring Boot Apache Kafka示例,并出现以下错误.
I am using Spring Boot Apache Kafka example and getting below error.
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"�contentType
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3557)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2652)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:857)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357)
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
这是代码-
application.properties
application.properties
spring.cloud.stream.function.definition=pageViewEventSupplier;process;pageCount
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=analytics-pvin
spring.cloud.stream.kafka.streams.binder.functions.pageCount.applicationId=analytics-pcin
spring.kafka.consumer.bootstrap-servers=localhost:9092
#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
#
# page views out
spring.cloud.stream.bindings.pageViewEventSupplier-out-0.destination=pvs
#
# page views in
spring.cloud.stream.bindings.process-in-0.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.process-out-0.destination=pcs
#
# page counts in
spring.cloud.stream.bindings.pageCount-in-0.destination=pcs
AnalyticsApplication.java
AnalyticsApplication.java
package com.example.analytics;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class AnalyticsApplication {
private static final String PAGE_COUNT_MV = "pcmv";
public static void main(String[] args) {
SpringApplication.run(AnalyticsApplication.class, args);
}
@Component
public static class PageViewEventSource {
@Bean
public Supplier<PageViewEvent> pageViewEventSupplier() {
List<String> names = Arrays.asList("mfisher", "dyser", "schacko", "abilan", "ozhurakousky", "grussell");
List<String> pages = Arrays.asList("blog", "sitemap", "initializr", "news", "colophon", "about");
return () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
return new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
};
}
}
@Component
public static class PageViewEventProcessor {
@Bean
public Function<KStream<String, PageViewEvent>, KStream<String, Long>> process() {
return e -> e.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), "0"))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized.as(AnalyticsApplication.PAGE_COUNT_MV))
.toStream();
}
}
@Component
public static class PageCountSink {
private final Log log = LogFactory.getLog(getClass());
@Bean
public Consumer<KTable<String, Long>> pageCount() {
return counts -> counts
.toStream()
.foreach((key, value) -> log.info(key + "=" + value));
}
}
@RestController
public static class CountRestController {
private final InteractiveQueryService interactiveQueryService;
public CountRestController(InteractiveQueryService registry) {
this.interactiveQueryService = registry;
}
@GetMapping("/counts")
Map<String, Long> counts() {
Map<String, Long> counts = new HashMap<>();
ReadOnlyKeyValueStore<String, Long> queryableStoreType =
this.interactiveQueryService.getQueryableStore(AnalyticsApplication.PAGE_COUNT_MV, QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, Long> all = queryableStoreType.all();
while (all.hasNext()) {
KeyValue<String, Long> value = all.next();
counts.put(value.key, value.value);
}
return counts;
}
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class PageViewEvent {
private String userId, page;
private long duration;
}
推荐答案
简单地表示该主题包含的数据格式不是有效的JSON.
Simply means that the topic contains data that is not in valid JSON format.
在调试器中或使用控制台使用者检查内容.
Examine the content in a debugger or using the console consumer.
这篇关于由以下原因引起:com.fasterxml.jackson.core.JsonParseException:无法识别的令牌“ÿ":预期(JSON字符串,数字,数组,对象或令牌“空",的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!