本文介绍了Kafka Streams EOS模式-通知关机的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Kafka Streams应用程序,即使在调试级别,该应用程序也没有任何正确的日志记录就可以关闭-

I have a Kafka Streams application, which shuts down without any proper logging even at the debug level -

2020-12-18 14:25:36:875 +0000 [Thread-7] INFO  o.apache.kafka.streams.KafkaStreams:? - stream-client [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c] State transition from REBALANCING to PENDING_SHUTDOWN
    2020-12-18 14:25:36:973 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] Informed to shut down
    2020-12-18 14:25:36:974 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
    2020-12-18 14:25:36:974 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] DEBUG org.apache.kafka.clients.Metadata:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Updating last seen epoch from 0 to 0 for partition input-event-stream-client-pandprat-estestes5-0
    2020-12-18 14:25:37:075 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] DEBUG org.apache.kafka.clients.Metadata:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Updated cluster metadata updateVersion 3 to MetadataCache{cluster=Cluster(id = ibD7yxLZQQSg24kQTlFnZA, nodes = [b-9.XXXXX.ap-southeast-1.amazonaws.com:9092 (id: 9 rack: apse1-az3), b-7.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 7 rack: apse1-az1), b-8.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 8 rack: apse1-az2), b-5.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 5 rack: apse1-az3), b-4.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 4 rack: apse1-az2), b-6.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 6 rack: apse1-az1), b-1.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 1 rack: apse1-az3), b-3.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 3 rack: apse1-az2), b-2.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 2 rack: apse1-az1)], partitions = [Partition(topic = input-event-stream-client-pandprat-estestes5, partition = 0, leader = 9, replicas = [9,2,3], isr = [9,2,3], offlineReplicas = [])], controller = b-3.XXXXXX-msk-temp.k1lph1.c2.kafka.ap-southeast-1.amazonaws.com:9092 (id: 3 rack: apse1-az2))}
    2020-12-18 14:25:37:172 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Revoking previously assigned partitions []
    2020-12-18 14:25:37:172 +0000 [kafka-coordinator-heartbeat-thread | XXXXXX-estestes5-null] DEBUG o.a.k.c.c.i.AbstractCoordinator:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Heartbeat thread started

Kafka版本-2.3.1经纪人版本-2.2.1没有异常被抛出.还可以看到类似的情况,其中应用程序也从RUNNING移到PENDING_SHUTDOWN.请参阅下面的日志-

Kafka Version - 2.3.1Broker Version - 2.2.1There is no exception thrown.A similar scenario is also seen, where the application moves from RUNNING to PENDING_SHUTDOWN as well. See logs below -

Jan 7, 2021 @ 15:03:12.253  2021-01-07 09:33:12:252 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Informed to shut down

    Jan 7, 2021 @ 15:03:12.253  2021-01-07 09:33:12:252 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN

    Jan 7, 2021 @ 15:03:06.252  2021-01-07 09:33:06:252 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING

    Jan 7, 2021 @ 15:03:06.157  2021-01-07 09:33:06:157 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  c.a.t.e.EventSequenceProcessor:? - EventSequencer Processor getting initialized with bufferFlushInterval : 100, maxBufferSize : 10000, useExternalKnowledgeTime : true, forwardingLimit: 6000, forwardingIntervalInMillis: 6000

    Jan 7, 2021 @ 15:03:06.155  2021-01-07 09:33:06:154 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Transitioning stream task 0_0 to running

    Jan 7, 2021 @ 15:03:05.183  2021-01-07 09:33:05:183 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Transitioning stream task 0_0 to restoring

    Jan 7, 2021 @ 15:03:05.180  2021-01-07 09:33:05:179 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Registering state store event-seq-state-store to its state manager

    Jan 7, 2021 @ 15:03:05.168  2021-01-07 09:33:05:167 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Initializing stream tasks [0_0]

    Jan 7, 2021 @ 15:03:05.163  2021-01-07 09:33:05:162 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] partition assignment took 604 ms.

    Jan 7, 2021 @ 15:03:05.163      current active tasks: [0_0]

    Jan 7, 2021 @ 15:03:05.163      previous active tasks: []

    Jan 7, 2021 @ 15:03:05.163      current standby tasks: []

    Jan 7, 2021 @ 15:03:05.163
    Jan 7, 2021 @ 15:03:04.953  2021-01-07 09:33:04:952 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Register global stores []

    Jan 7, 2021 @ 15:03:04.653  2021-01-07 09:33:04:652 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Created state store manager for task 0_0 with the acquired state dir lock

    Jan 7, 2021 @ 15:03:04.653  2021-01-07 09:33:04:653 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating producer client for task 0_0

    Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] at state PARTITIONS_REVOKED: partitions [input-XXXXXXXX-tf-1test0107-0] assigned at the end of consumer rebalance.

    Jan 7, 2021 @ 15:03:04.559      current suspended active tasks: []

    Jan 7, 2021 @ 15:03:04.559      current suspended standby tasks: []

    Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

    Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Adding assigned tasks as active: {0_0=[input-XXXXXXXX-tf-1test0107-0]}

    Jan 7, 2021 @ 15:03:04.559
    Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating tasks based on assignment.

    Jan 7, 2021 @ 15:03:04.386  2021-01-07 09:33:04:386 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Assigned tasks to clients as {60f8a4be-b576-4ed0-9615-b91021cd76e0=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.

    Jan 7, 2021 @ 15:03:04.385  2021-01-07 09:33:04:385 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Assigning tasks [0_0] to clients {60f8a4be-b576-4ed0-9615-b91021cd76e0=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]} with number of replicas 0

    Jan 7, 2021 @ 15:03:04.385  2021-01-07 09:33:04:384 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Completed validating internal topics {XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog=InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog, topicConfigs={}), numPartitions=1)} in partition assignor.

    Jan 7, 2021 @ 15:03:04.385  2021-01-07 09:33:04:384 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Created state changelog topics [InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog, topicConfigs={}), numPartitions=1)] from the parsed topology.

    Jan 7, 2021 @ 15:03:04.058  2021-01-07 09:33:04:058 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Topic XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog is unknown or not found, hence not existed yet.

    Jan 7, 2021 @ 15:03:04.058  2021-01-07 09:33:04:058 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Going to create topic XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog with 1 partitions and config {cleanup.policy=compact}.

    Jan 7, 2021 @ 15:03:04.049  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Starting to validate internal topics {XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog=InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog, topicConfigs={}), numPartitions=1)} in partition assignor.

    Jan 7, 2021 @ 15:03:04.049  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Trying to check if topics [XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog] have been created with expected number of partitions.

    Jan 7, 2021 @ 15:03:03.955  2021-01-07 09:33:03:954 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Starting to validate internal topics {} in partition assignor.

    Jan 7, 2021 @ 15:03:03.955  2021-01-07 09:33:03:954 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Completed validating internal topics {} in partition assignor.

    Jan 7, 2021 @ 15:03:03.955  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Created repartition topics [] from the parsed topology.

    Jan 7, 2021 @ 15:03:03.953  2021-01-07 09:33:03:953 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Constructed client metadata {60f8a4be-b576-4ed0-9615-b91021cd76e0=ClientMetadata{hostInfo=null, consumers=[XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer-e7ce11d1-7b5a-478f-8aaa-1e46df67bbf3], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member subscriptions.

    Jan 7, 2021 @ 15:03:03.952  2021-01-07 09:33:03:952 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Current minimum supported version remains at 4, last seen supported version was 4

    Jan 7, 2021 @ 15:03:00.355  2021-01-07 09:33:00:354 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopologyBuilder:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] found [input-XXXXXXXX-tf-1test0107] topics possibly matching regex

    Jan 7, 2021 @ 15:03:00.355  2021-01-07 09:33:00:354 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopologyBuilder:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] updating builder with SubscriptionUpdates{updatedTopicSubscriptions=[input-XXXXXXXX-tf-1test0107]} topic(s) with possible matching regex subscription(s)

    Jan 7, 2021 @ 15:03:00.353      current assigned active tasks: []

    Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from STARTING to PARTITIONS_REVOKED

    Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Suspending all active tasks [] and standby tasks []

    Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] at state STARTING: partitions [] revoked at the beginning of consumer rebalance.

    Jan 7, 2021 @ 15:03:00.353      suspended standby tasks: []

    Jan 7, 2021 @ 15:03:00.353      current assigned standby tasks: []

    Jan 7, 2021 @ 15:03:00.353
    Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:353 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] partition revocation took 1 ms.

    Jan 7, 2021 @ 15:03:00.353      suspended active tasks: []

    Jan 7, 2021 @ 15:02:58.752  Event Sequencer Server started, listening on 2301

    Jan 7, 2021 @ 15:02:57.151  2021-01-07 09:32:57:150 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from CREATED to STARTING

    Jan 7, 2021 @ 15:02:57.151  2021-01-07 09:32:57:150 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Starting

    Jan 7, 2021 @ 15:02:57.151  2021-01-07 09:32:57:151 +0000 [main] INFO  c.a.t.c.IngestionTopicConsumer:? - StreamThread Metadata : ThreadMetadata{threadName=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1, threadState=STARTING, activeTasks=[], standbyTasks=[], consumerClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer, restoreConsumerClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-restore-consumer, producerClientIds=[], adminClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-admin}

    Jan 7, 2021 @ 15:02:57.051  2021-01-07 09:32:57:050 +0000 [main] WARN  o.a.k.c.consumer.ConsumerConfig:? - The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.

    Jan 7, 2021 @ 15:02:57.051  2021-01-07 09:32:57:050 +0000 [main] WARN  o.a.k.c.consumer.ConsumerConfig:? - The configuration 'admin.retries' was supplied but isn't a known config.

    Jan 7, 2021 @ 15:02:56.760  2021-01-07 09:32:56:760 +0000 [main] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Configs:

    Jan 7, 2021 @ 15:02:56.760
    Jan 7, 2021 @ 15:02:56.753  2021-01-07 09:32:56:752 +0000 [main] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating consumer client

    Jan 7, 2021 @ 15:02:55.857  2021-01-07 09:32:55:856 +0000 [main] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating restore consumer client

    Jan 7, 2021 @ 15:02:55.350  2021-01-07 09:32:55:349 +0000 [main] INFO  c.a.t.c.IngestionTopicConsumer:? - Initializing Ingestion Topic Consumer

    Jan 7, 2021 @ 15:02:55.350  2021-01-07 09:32:55:349 +0000 [main] INFO  com.arcesium.trinity.EventSequencer:? - Initializing Ingestion Topic Consumer

    Jan 7, 2021 @ 15:02:53.551      Source: input-topic (topics: [input-XXXXXXXX-tf-1test0107])

    Jan 7, 2021 @ 15:02:53.551      Processor: event-sequencer (stores: [event-seq-state-store])

    Jan 7, 2021 @ 15:02:53.551      Sink: output-event-topic (topic: output-XXXXXXXX-tf-1test0107)

    Jan 7, 2021 @ 15:02:53.551        <-- event-sequencer

    Jan 7, 2021 @ 15:02:53.551
    Jan 7, 2021 @ 15:02:53.551        <-- input-topic

    Jan 7, 2021 @ 15:02:53.551
    Jan 7, 2021 @ 15:02:53.551  2021-01-07 09:32:53:460 +0000 [main] INFO  c.a.t.config.EventSequencerConfig:? - Topology initialized: Topologies:

    Jan 7, 2021 @ 15:02:53.551     Sub-topology: 0

    Jan 7, 2021 @ 15:02:53.551        --> event-sequencer

    Jan 7, 2021 @ 15:02:53.551        --> output-event-topic

    Jan 7, 2021 @ 15:02:30.854  Listening for transport dt_socket at address: 2311

我还看到每当触发重新平衡时就会发生关机.请查看下面的日志-

I'm also seeing shutdown happening whenever a rebalance is triggered. Please see logs below -

Jan 7, 2021 @ 08:43:13.555  2021-01-07 03:13:13:554 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] Informed to shut down

    Jan 7, 2021 @ 08:43:13.555  2021-01-07 03:13:13:554 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN

    Jan 7, 2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamTask:? - task [0_0] Committing

    Jan 7, 2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.i.RecordCollectorImpl:? - task [0_0] Flushing producer

    Jan 7, 2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Flushing all stores registered in the state manager

    Jan 7, 2021 @ 08:42:50.008      current assigned active tasks: [0_0]

    Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] Suspending all active tasks [0_0] and standby tasks []

    Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamTask:? - task [0_0] Suspending

    Jan 7, 2021 @ 08:42:50.008      current assigned standby tasks: []

    Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED

    Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] at state RUNNING: partitions [input-XXXXXXX-tzobnwpj-0] revoked at the beginning of consumer rebalance.

    Jan 7, 2021 @ 08:42:50.008
    Jan 7, 2021 @ 08:42:49.842  2021-01-07 03:12:49:842 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0

    Jan 7, 2021 @ 08:42:49.652  2021-01-07 03:12:49:652 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0

    Jan 7, 2021 @ 08:42:49.651  2021-01-07 03:12:49:650 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0

    Jan 7, 2021 @ 08:42:49.649  2021-01-07 03:12:49:649 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0

有人知道为什么会发生吗?

Anyone has any idea why it could happen?

推荐答案

日志行通知关闭" 表明 shutdown 方法StreamThread 被调用.只能在2个地方调用它:-

The log line "Informed to shut down" suggests that the shutdown method of StreamThread was called.This can be called only from 2 places:-

一个- KafkaStream close 方法-用于实际上完全关闭Kafka流(最终关闭所有 StreamThreads )但是您的调试日志并不表示完整的Kafka流已关闭.如果是这样的话,您本可以在日志中找到

One - KafkaStream close method - which is meant for actually closing the Kafka stream completely ( eventually closing all StreamThreads)But your debug logs do not indicate that the complete Kafka stream is being closed. Had this been the case you would have got below in your logs

 log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);

两个- RebalanceListener - onPartitionsAssigned 方法

if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
                log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
                streamThread.shutdown();
                return;
            }

这可能意味着,由于 INCOMPLETE_SOURCE_TOPIC_METADATA ,您的 StreamThread 正在收到关闭请求.这也可能是暂时性问题,也可能是由于元数据不完整(例如主题名称不存在或拼写错误等)而导致的永久性失败

This probably implies that because of INCOMPLETE_SOURCE_TOPIC_METADATA your StreamThread is receiving a shutdown request.This could be a transient issue as well or a permanent failure because of incomplete metadata ( like non-existent or misspelled topic name etc.)

这篇关于Kafka Streams EOS模式-通知关机的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 00:53