我正在尝试使用参与者实现消息处理管道。流水线的步骤包括诸如读取,过滤,扩充以及最终存储到数据库中的功能。
类似于以下内容:http://sujitpal.blogspot.nl/2013/12/akka-content-ingestion-pipeline-part-i.html
问题在于,读取,过滤和扩充步骤比存储步骤快得多,这导致存储角色充斥并且系统不可靠。
我正在考虑以下选项:让存储参与者提取已处理的消息并准备存储消息。这是一个好选择吗?更好的建议?
谢谢
您可以考虑以下几种选择:
如果消息顺序无关紧要-只需在单独的actor(或将来的actor)中执行每个存储操作即可。这将导致所有数据存储并行进行-我建议为此使用单独的线程池。如果某些消息是对其他消息的修改或参与同一事务-您可以仅为每个messageId / transactionId创建单独的参与者,以避免悲观/乐观的锁定问题(不要忘记在事务结束时或超时时杀死此类参与者)。 使用有界邮箱(反压力)-如果仍未处理较旧的消息,则将阻止输入中的新消息(例如,您可能阻止接收线程,直到消息被链中的最后一个参与者确认)。它将责任转移到源系统。它与JMS持久性很好地兼容-消息以可靠的方式存储在JMS代理端,直到系统最终对其进行处理。 结合前两个