我有一个在AWS Lambda上运行的基于微服务的应用程序。其中最关键的两个微服务使用事件源/cqrs。
背景:(这也是我整理想法的地方)
我正在使用this library并将事件存储在DynamoDB中,并将投影存储在AWS S3中。
编写部分的工作原理很像:每个命令调用都从DynamoDB加载聚合的当前状态(通过通过处理程序运行事件和/或加载缓存的聚合),它根据一些业务逻辑决定接受还是拒绝该命令,然后使用KeyConditionExpression: 'aggregateId = :a AND version >= :v'
写入DynamoDB,其中版本是针对该聚合处理的事件的计数。如果存在冲突,则写入失败。对我来说似乎是一个很好的系统!
然后,每个事件都广播到SNS(主题名称是服务名称),以便其他服务可以根据需要对该事件使用react。
我真正挣扎的部分是阅读。投影存储在S3中,并标记有为每个事件源处理的最后一个commitId。当读取查询进入时,它将从S3加载整个投影状态(针对所有聚合),查询事件源以获取所有较新的事件,计算最新状态(再次针对所有聚合),如果更新了对象,则将其写入S3更新),并根据查询参数返回状态的相关部分。
我的问题:(或其中之一)
我认为我做错了预测。
我的大多数投影仅按重要属性对ID进行分组,因此文件保持相对较小。但是我还需要一种方法来检索单个聚合。为此使用投影似乎很疯狂,因为我每次都需要加载整个状态(即每个投影的聚合)对其应用新事件,然后检索我想要的记录(它可能甚至没有改变)。
这就是我现在正在做的,它运行良好(
另一个问题是查询。我需要为每个需要查询的属性建立一个映射映射值以匹配aggregationIds!一定有更好的方法!
无论我以何种方式考虑此问题,投影始终需要整个当前状态+任何新事件,才能返回甚至未更改的单个记录。
最佳答案
我也这么认为;听起来您的查询与您的预测相结合
是的,听起来像是一团糟。更具体地说,这听起来像是查询正在触发要由投影完成的工作。
如果您可以将查询与预测脱钩,那么事情会变得更容易。基本思想是您的查询没有描述当前状态,而是描述了上一次运行投影时的状态。
相同的想法,不同的拼写:您从缓存在S3中的文档回答查询。当检测到新事件时,将运行投影,根据需要加载新数据,计算新文档,并替换高速缓存中的条目。
我想到一个三角形
三角形的每个分支与其他分支异步运行。
我建议您从查询中倒退-支持每个查询需要哪些文档?您必须克服的延迟目标是什么?然后,您开始权衡取舍-对于此新查询,是从现有文档中创建结果,还是需要使用更细粒度的新文档?
是的,并且...事件只是触发的一种方式;您还可以通过时钟触发投影过程(每15分钟检查一次,以查看是否需要更新),或者在人工操作人员心血来潮的情况下(嗯,看来您的帐户余额已过时,让我尝试更新该过程为你)。这样做的方法不只一种,而且您可以混合搭配策略。
不必要。没有规则说您不能使用以前缓存的表示形式作为起点,然后仅从记录簿中提取所需的更改。
例如,假设您正在构建一个将聚合
A{id:7}
和B{id:9}
组合在一起的 View 。您获取缓存的副本,然后查看其元数据(将其放在先前的写操作中),并在其中找到类似metadata:{A:{id:7, version:21}, B:{id:9, version:19}}
的内容。现在,您只需要在上次使用的事件之后加载事件,更新内存中的本地副本,更新元数据的本地副本并将事件推送到缓存中即可。