问题描述
我创建了一个主题,并放置了一个简单的生产者来在该主题中发布一些消息
I created a topic and i put a simple-producer to publish some message in that topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-file-input
bin/kafka-console-producer.sh --broker-list localhost:9092 --streams-file-input
我在 kafka 流中运行下面的简单示例,但我遇到了一个我无法处理的奇怪异常
I am running the below simple example in kafka streams and i got a weird exception which i cannot handle
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.3:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
builder.stream("streams-file-input").to("streams-pipe-output");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L);
streams.close();
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
... 1 more
Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\my-streapplication\0_0\.lock (The system cannot find the path specified)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.lockStateDirectory(ProcessorStateManager.java:125)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:93)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
无论我做了什么,我都得到了这个例外.我正在使用 Ubuntu 在 vmware 中运行 kafka 集群(我使用的版本是 kafka_2.11-0.10.0.0)也许问题是 ram-Cpu?
Whatever i did i got this exception. I am running kafka cluster in vmware with Ubuntu(the version i use is kafka_2.11-0.10.0.0) Maybe the problem is the ram-Cpu?
推荐答案
Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\my-streapplication\0_0\.lock (The system cannot find the path specified)
这意味着您的应用程序状态的父目录 C:\tmp\kafka-streams
不存在.它是 StreamConfig
中的默认目录.我不知道为什么它在 Windows 上创建失败.
it means that the parent directory C:\tmp\kafka-streams
for your application state does not exsist. It is a default directory in StreamConfig
. I don't know why it's created failed on Windows.
您可以将StreamConfig.STATE_DIR_CONFIG
设置为指定目录.
You can set StreamConfig.STATE_DIR_CONFIG
as a specified directory.
这篇关于线程“StreamThread-1"中的异常org.apache.kafka.streams.errors.StreamsException:重新平衡失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!