问题描述
我们正在使用CQRS,并使用Axon 4使用事件源.
我们有以下情况.
域书
- 操作-使用Axon CRQS和事件源流(命令-汇总-事件)将新的Book创建到数据库中
- 操作-使用Axon CRQS和事件源流(命令-汇总-事件)将已创建的Book更新到数据库中
- 在Axon事件存储中,上面说明的这两个命令(createCommand和updateCommand)具有相同的"aggregateIdentifier" ID,它们位于同一聚合树中,因为我们正在同一聚合根上工作.
- 在Axon事件存储中,此命令具有不同的"aggregateSequenceNumber",这也是正常且预期的
此时,我们正在通过processingGroup进行重播事件,并且工作正常,现在我们想对事件进行更复杂的重播
问题
我们如何创建只重播带有"aggregateIdentifier" id(一棵聚合树)的事件的resetTokens,这意味着我们只想重播与某本书(聚合树")有关的事件,而不是所有书本?>
Axon 4支持此功能吗?
您可以根据需要使用Axon创建此功能,但是它需要使用Axon Framework特定项的一些自定义代码.
首先,这不是重播,因为从事件处理器级别描述了重播.重播事件处理器将根据给定令牌从某个时间点开始读取整个事件流.
您想要的是重播并过滤给定的集合标识符.
从评论的描述中,感觉就像是用户/服务对您的应用程序进行的零星查询.
处理此查询后,您需要打开一个事件流,过滤事件,然后使用剩余的流来更新所需的查询模型.
从实现的角度来看,我将使用 AnnotationEventHandlerAdapter
.此类应具有带注释的事件侦听器
,该类应该是您要创建并返回的查询模型.
随后,应使用过滤后的事件流调用 AnnotationEventHandlerAdapter#handle(EventMessage<?>)
方法.到达流的末尾后,您就可以查询到最新的查询模型.
因此,请转回到您的问题:
答案是肯定的,但是这需要您自己的一些方便的工作.
We are using CQRS and Event Sourcing using Axon 4.
We have the following scenario.
Domain Book
- Action- Create new Book into DB using the Axon CRQS and event sourcing flow (Command - Aggregate - Event)
- Action- Update the already created Book into DB using the Axon CRQS and event sourcing flow (Command - Aggregate - Event)
- In the Axon event store this two commands explained above (createCommand and updateCommand) have the same "aggregateIdentifier" id, they are in the same Aggregate tree, because we are working on the same Aggregate root.
- In the Axon event store this commands have different "aggregateSequenceNumber", also this is normal and expected
At this point we are doing replay events by processingGroup, and this works fine, now we want to do more complex replay of the events
Question
How can we create resetTokens that will replay only the events with some "aggregateIdentifier" id (one Aggregate tree), this means we want to replay only the events related to some book (Aggregate tree) and not all books ?
Is this functionality supported by the Axon 4 ?
You could create this feature with Axon if you'd want, but it requires some custom code using Axon Framework specifics.
Firstly though, this is not a replay as replays are described from an Event Processor level. Replaying an Event Processor will start reading the entirety of the Event Stream from a point in time based on the given token.
What you'd want is to replay and filter for a given aggregate identifier.
From the description in your comment, this feels like a sporadic query a user/service does against your application.
Upon handling this query, you will need to open an Event Stream, filter the events and then use the remaining stream to update the required query model.
From an implementation perspective, I would use a AnnotationEventHandlerAdapter
. This class should be given an annotated event listener
, which should be the Query Model you want to create ad-hoc and return.
Subsequently, the AnnotationEventHandlerAdapter#handle(EventMessage<?>)
method should be called with the filtered event stream. Once you've reached the end of the stream, then you're query model is up to date.
So, to circle back to your question:
The answer to this is yes, but it requires some handy work from your part.
这篇关于与某些"aggregateIdentifier"有关的轴突重放TrackingEvent.使用Axon 4的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!