我有一个复杂的Kafka Stream应用程序,其中2个流在同一流中完全有状态:


它使用Execution主题作为源,增强了消息并重新发布回相同的Execution主题。
它加入另一个主题WorkerTaskResult,将结果添加到Execution并发布回Execution主题。


主要目标是提供工作流系统。

详细的逻辑是:


ExecutionTaskRun的列表
Execution查看所有TaskRun的所有当前状态,并找到下一个要执行的状态
如果找到任何内容,执行将更改其TaskRunsList并将其下一个添加并发布回Kafka,并且还将要完成的任务发送到另一个队列(WorkerTask
WorkerTask在Kafka流之外继续进行,并通过简单的Kafka Consumer&Producer发布回另一个队列(WorkerTaskResult
WorkerTaskResult更改当前TaskRun中的当前Execution并更改状态(主要是RUNNING / SUCCEED / FAILED),还发布回Execution队列(使用Kafka Stream)


如您所见,Execution(带有TaskRun列表)是当前应用程序的状态。

当所有消息都是顺序的时,流工作良好(无并发性,我只能同时具有一个TaskRun列表的变更)。当工作流变为并行(可以加入并发WorkerTaskResult)时,我的执行状态似乎被覆盖并产生某种回滚。

日志输出示例:

2020-04-20 08:05:44,830 INFO  reamThread-1 afkaExecutor Stream in with 3264792750: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created
  ]
)
2020-04-20 08:05:44,881 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO  reamThread-1 afkaExecutor Stream out  with 1805535461 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
  ]
)
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor Stream out  with 578845055 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
  ]
)
2020-04-20 08:05:45,153 INFO  reamThread-1 afkaExecutor Stream in with 1805535461: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
  ]
)
2020-04-20 08:05:45,157 INFO  reamThread-1 afkaExecutor Stream out  with 1889889916 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ]
)
2020-04-20 08:05:45,209 WARN  reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO  reamThread-1 afkaExecutor Stream in with 1889889916: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ]
)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor Stream out  with 3651399223 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
  ]
)


我在控制台上警告Detected out-of-order KTable update for execution at offset 10, partition 7.

完整的源代码位于here中。

如果也尝试许多不同的方法,例如here


ExecutionWorkerTaskResult放在同一主题上,以确保在同一时间只处理相同的1条消息
并独自将最后一个Execution保留在StateStore上(以便加入WorkerTaskResultExecution
但这听起来像是我重新使用了KTable,但效果却不佳


或这个here


与以前的基本相同(我自己在StateStore上保留了最后一个Execution
但是使用2 KStream到KStream(删除KTable)。


我的问题是:


KafkaStreams是否支持这种模式(这不是dag流,因为我们沉浸在同一主题中)?
设计此流以确保并发安全的好方法是什么?


任何线索都非常感谢,自几天以来就完全卡住了,谢谢

编辑1:
以下是一些其他信息:


只有KStream应用程序将新事件发布到Execution,没有关于该主题的外部应用程序发布,唯一发布到Execution的外部应用程序是第一个事件(也就是执行的创建)。
有一个WorkerApp(外部应用程序,简单的消费者/生产者)从WorkerTask消费(要完成的工作)并在WorkerTaskResult上发布结果(主要是应用程序的当前状态)。


这是实际流的简化版本:

Builder
  -> Stream 1
     - from KStream<WorkerTaskResult>
     - join KTable<Execution>
     - to Execution topic
  -> Stream 2
     - from KTable<Execution> (same than previous)
     - multiple output
       - to WorkerTaskResult topic (if found an end)
       - to Execution & to WorkerTask topic (if found a next task)
       - to Execution topic (if detect an Execution end)


KStream主要是一个执行器状态应用程序,它可以查找下一个要执行的WorkerTask并评估流程是否结束,因此该应用程序可以:


创建新的TaskRun
更改当前TaskRun的状态


加入WorkerTaskResult
评估整个执行过程,发现任务失败(基于依赖关系)

更改执行状态并发布最终状态“成功”或“失败”,这将打破“无限循环”


在这个实际版本中,我真正不清楚的是Detected out-of-order KTable update在现实世界中是什么意思?
这是否意味着一个KTable必须在每个分区和每个键上只有一个生产者,才能使主题保持顺序?

编辑2:
同时,我找到了一种新的方式来思考似乎正在运行的流应用程序。单元测试通过了,不再有Detected out-of-order
这是简化的新流程:

Builder
  - from KTable<Execution>
  - leftJoin KTable<WorkerTaskResult>
  - Branch
    - If Join > to Execution topic
    - If not joint > continue the flow
      - Multiple output (same than previous)
        - to WorkerTaskResult topic (if found an end)
        - to Execution & to WorkerTask topic (if found a next task)
        - to Execution topic (if detect an Execution end)


我认为有意义的是:


WorkerTaskResult现在是一个KTable,所以我只保留结果的最新版本
我只有一个路径流(没有更多的2个路径)输出到Execution(我认为这是无序解决的最重要部分)
整个似乎每个输入只有一个输出(Execution上的1个新值将在Execution主题上产生1个新值)


这是新的拓扑:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
      --> KTABLE-SOURCE-0000000001
    Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
      --> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
      <-- KSTREAM-SOURCE-0000000000
    Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
      --> KTABLE-SOURCE-0000000005
    Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
      --> KTABLE-JOINOTHER-0000000008
      <-- KSTREAM-SOURCE-0000000004
    Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
      --> KTABLE-MERGE-0000000006
      <-- KTABLE-SOURCE-0000000005
    Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
      --> KTABLE-MERGE-0000000006
      <-- KTABLE-SOURCE-0000000001
    Processor: KTABLE-MERGE-0000000006 (stores: [])
      --> KTABLE-TOSTREAM-0000000009
      <-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
      --> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
      <-- KTABLE-MERGE-0000000006
    Processor: KSTREAM-FILTER-0000000015 (stores: [])
      --> KSTREAM-MAPVALUES-0000000016
      <-- KTABLE-TOSTREAM-0000000009
    Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
      --> KSTREAM-MAPVALUES-0000000017
      <-- KSTREAM-FILTER-0000000015
    Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
      --> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
      <-- KSTREAM-MAPVALUES-0000000016
    Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000042 (stores: [])
      --> KSTREAM-MAPVALUES-0000000043
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-FILTER-0000000030 (stores: [])
      --> KSTREAM-MAPVALUES-0000000031
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-FILTER-0000000055 (stores: [])
      --> KSTREAM-MAPVALUES-0000000056
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
      --> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
      <-- KSTREAM-FILTER-0000000042
    Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
      --> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
      <-- KSTREAM-FILTER-0000000030
    Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
      --> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
      <-- KSTREAM-FILTER-0000000055
    Processor: KSTREAM-FILTER-0000000024 (stores: [])
      --> KSTREAM-MAPVALUES-0000000025
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000032 (stores: [])
      --> KSTREAM-MAPVALUES-0000000033
      <-- KSTREAM-MAPVALUES-0000000031
    Processor: KSTREAM-FILTER-0000000044 (stores: [])
      --> KSTREAM-MAPVALUES-0000000045
      <-- KSTREAM-MAPVALUES-0000000043
    Processor: KSTREAM-FILTER-0000000057 (stores: [])
      --> KSTREAM-MAPVALUES-0000000058
      <-- KSTREAM-MAPVALUES-0000000056
    Processor: KSTREAM-FILTER-0000000010 (stores: [])
      --> KSTREAM-MAPVALUES-0000000011
      <-- KTABLE-TOSTREAM-0000000009
    Processor: KSTREAM-FILTER-0000000019 (stores: [])
      --> KSTREAM-MAPVALUES-0000000020
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000050 (stores: [])
      --> KSTREAM-MAPVALUES-0000000051
      <-- KSTREAM-MAPVALUES-0000000043
    Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
      --> KSTREAM-FILTER-0000000026
      <-- KSTREAM-FILTER-0000000024
    Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
      --> KSTREAM-MAPVALUES-0000000034
      <-- KSTREAM-FILTER-0000000032
    Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
      --> KSTREAM-MAPVALUES-0000000046
      <-- KSTREAM-FILTER-0000000044
    Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
      --> KSTREAM-MAPVALUES-0000000059
      <-- KSTREAM-FILTER-0000000057
    Processor: KSTREAM-FILTER-0000000026 (stores: [])
      --> KSTREAM-FILTER-0000000027
      <-- KSTREAM-MAPVALUES-0000000025
    Processor: KSTREAM-FILTER-0000000038 (stores: [])
      --> KSTREAM-MAPVALUES-0000000039
      <-- KSTREAM-MAPVALUES-0000000031
    Processor: KSTREAM-FILTER-0000000063 (stores: [])
      --> KSTREAM-MAPVALUES-0000000064
      <-- KSTREAM-MAPVALUES-0000000056
    Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
      --> KSTREAM-FILTER-0000000012
      <-- KSTREAM-FILTER-0000000010
    Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
      --> KSTREAM-FILTER-0000000021
      <-- KSTREAM-FILTER-0000000019
    Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
      --> KSTREAM-FILTER-0000000035
      <-- KSTREAM-MAPVALUES-0000000033
    Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
      --> KSTREAM-FILTER-0000000047
      <-- KSTREAM-MAPVALUES-0000000045
    Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
      --> KSTREAM-FILTER-0000000052
      <-- KSTREAM-FILTER-0000000050
    Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
      --> KSTREAM-FILTER-0000000060
      <-- KSTREAM-MAPVALUES-0000000058
    Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
      --> KSTREAM-FILTER-0000000068
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000012 (stores: [])
      --> KSTREAM-PEEK-0000000013
      <-- KSTREAM-MAPVALUES-0000000011
    Processor: KSTREAM-FILTER-0000000021 (stores: [])
      --> KSTREAM-PEEK-0000000022
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-FILTER-0000000027 (stores: [])
      --> KSTREAM-PEEK-0000000028
      <-- KSTREAM-FILTER-0000000026
    Processor: KSTREAM-FILTER-0000000035 (stores: [])
      --> KSTREAM-PEEK-0000000036
      <-- KSTREAM-MAPVALUES-0000000034
    Processor: KSTREAM-FILTER-0000000047 (stores: [])
      --> KSTREAM-PEEK-0000000048
      <-- KSTREAM-MAPVALUES-0000000046
    Processor: KSTREAM-FILTER-0000000052 (stores: [])
      --> KSTREAM-PEEK-0000000053
      <-- KSTREAM-MAPVALUES-0000000051
    Processor: KSTREAM-FILTER-0000000060 (stores: [])
      --> KSTREAM-PEEK-0000000061
      <-- KSTREAM-MAPVALUES-0000000059
    Processor: KSTREAM-FILTER-0000000068 (stores: [])
      --> KSTREAM-PEEK-0000000069
      <-- KSTREAM-MAPVALUES-0000000067
    Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
      --> KSTREAM-FILTER-0000000040
      <-- KSTREAM-FILTER-0000000038
    Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
      --> KSTREAM-TRANSFORM-0000000065
      <-- KSTREAM-FILTER-0000000063
    Processor: KSTREAM-FILTER-0000000040 (stores: [])
      --> KSTREAM-SINK-0000000041
      <-- KSTREAM-MAPVALUES-0000000039
    Processor: KSTREAM-PEEK-0000000013 (stores: [])
      --> KSTREAM-SINK-0000000014
      <-- KSTREAM-FILTER-0000000012
    Processor: KSTREAM-PEEK-0000000022 (stores: [])
      --> KSTREAM-SINK-0000000023
      <-- KSTREAM-FILTER-0000000021
    Processor: KSTREAM-PEEK-0000000028 (stores: [])
      --> KSTREAM-SINK-0000000029
      <-- KSTREAM-FILTER-0000000027
    Processor: KSTREAM-PEEK-0000000036 (stores: [])
      --> KSTREAM-SINK-0000000037
      <-- KSTREAM-FILTER-0000000035
    Processor: KSTREAM-PEEK-0000000048 (stores: [])
      --> KSTREAM-SINK-0000000049
      <-- KSTREAM-FILTER-0000000047
    Processor: KSTREAM-PEEK-0000000053 (stores: [])
      --> KSTREAM-SINK-0000000054
      <-- KSTREAM-FILTER-0000000052
    Processor: KSTREAM-PEEK-0000000061 (stores: [])
      --> KSTREAM-SINK-0000000062
      <-- KSTREAM-FILTER-0000000060
    Processor: KSTREAM-PEEK-0000000069 (stores: [])
      --> KSTREAM-SINK-0000000070
      <-- KSTREAM-FILTER-0000000068
    Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
      --> KSTREAM-SINK-0000000066
      <-- KSTREAM-MAPVALUES-0000000064
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> log-executionStream
      <-- KTABLE-SOURCE-0000000001
    Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000013
    Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000022
    Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000028
    Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000036
    Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
      <-- KSTREAM-FILTER-0000000040
    Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000048
    Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000053
    Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000061
    Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
      <-- KSTREAM-TRANSFORM-0000000065
    Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000069
    Processor: log-executionStream (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000002


就目前而言,我还不清楚该解决方案是否对任何并发都具有弹性,以及我是否可以在其他时间乱序(这意味着该执行在上一次回滚并且导致多次执行该操作)。相同的任务)。

最佳答案

KafkaStreams是否支持这种模式(这不是dag流,因为我们沉浸在同一主题中)?


通常是的。您只需要确保不会以“无限循环”结尾,即在某个时候输入记录应该“终止”,并且不再对输出主题产生任何影响。对于您的情况,Execution最终不应再通过反馈循环再创建新的Tasks


什么是设计此流以确保并发安全的好方法


它总是取决于具体的应用程序...对于您的情况,如果我正确地理解了您的应用程序的设计,那么您基本上会有两个输入主题(ExecutionWorkerTaskResult)和两个输出主题(Execution和)。在处理输入主题时,来自每个输入的消息可能会修改共享状态(即任务状态)。

另外,还有一个“外部应用程序”可以读取WorkerTask主题并写入WorkerTask主题吗?因此,您的总体数据流中实际上存在第二个循环吗?我假设还有其他上游应用程序实际上也会将新数据推送到WorkerTaskResult主题中?

                             +-----------------+
                             |                 |
                             v                 |
upstream producers ---> "Execution" --+        |
                                      |        |
                                      v        |
                                      KS-App --+
                                      ^        |
                                      |        |
            +--> "WorkerTaskResult" --+        +--> "WorkerTask" --+
            |                                                      |
            +------------------------ outside app <----------------+


我对atm不清楚的是:


哪些状态更改从KS-App直接传播回Execution
哪些状态更改是通过Execution从“外部应用程序”传播的?


也许您可以更新您的问题,我可以尝试相应地更新我的答案。

更新(基于编辑1和2)


“执行”和“ WorkerTask”主题(如果找到下一个任务)


这一步似乎引入了比赛条件?回写WorkerTaskResult主题时,可以在回读时更新状态。并行地,任务的执行可能首先完成(即,在重新读取和处理Execution更新之前),因此可以编写第二个Execution更新(任务完成时)来首先更新状态吗?


在这个实际版本上,对我来说真正不清楚的是,在现实世界中检测到乱序的KTable更新是什么意思?这是否意味着一个KTable必须在每个分区和每个键上只有一个生产者,才能使主题保持顺序?


你可以那样说。对于每个输入记录,Execution运算符将输入的时间戳与表中当前条目的时间戳进行比较。如果输入记录的时间戳较小,则会记录WARN(仍将应用更新):WARN的原因是,该表每个键仅存储一个条目,并且该表希望仅在时间上向前移动。如果存在乱序更新,则可能导致意外结果,从而导致WARN日志。每个分区使用一个生产者或每个密钥使用一个生产者将避免每个密钥出现乱序数据(假设生产者仅发送有序数据)。

如果我完全了解您的应用程序的新版本,我不确定100%是否为atm,但总的来说,您要确保避免数据争用,并将更新线性化为table()

关于java - 是否支持带有连接的接收器和源主题相同的Kafka Stream?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61316312/

10-10 10:03