我正在构建一个具有基本Producer范式风格的系统,但是Producer部分需要处于Transaction模式。
这是我的确切情况:

Poller Thread -
 [Transaction START]
   Polls the DB for say 10 records
     * Sets the status in DB for those records as IN-Progress
     * Puts the above 10 records in a LinkedBlockingQueue - workqueue
 [Transaction END]

Worker Thread Pool of 5
   * Polls the workqueue for the tasks, do some lookups and update the same records in DB with some looked up values.


现在,我的问题出在流程的第1部分,因为如果让我们说一些原因,那么我从数据库中提取和更新的过程成功了,但是对于一条记录,插入队列失败了,我可以回滚整个事务,并将我的所有记录回滚数据库将处于未处理状态,但是可以在此工作队列中插入一些元素,而我的工作线程池可以拾取它们并开始处理,这是不应该发生的。

试图找到是否有一种以跨国方式写入阻塞队列的方法。

如果想在队列中写入某些内容,如果我可以阻止工作线程读取的话,可以考虑添加一些writelock()readlock()机制。

任何想法更好的方法。

谢谢,

最佳答案

考虑最坏的情况:拔出情况(数据库连接丢失)和崩溃情况(程序内存不足)。您将如何恢复呢?

一些提示:


如果您能想到插入队列失败(队列已满)的原因,请不要启动事务。只需跳过一项民意调查即可。
首先提交正在进行的事务,然后将所有记录添加到工作队列中。或者,每条记录使用一项事务,因此您可以将记录一个接一个地添加到工作队列中。
维护正在处理的所有记录的ID的内存中HashSet。如果ID在集合中,但记录不在进行中,反之亦然,则说明出现了非常错误的情况(例如,记录任务未完成/损坏)。
设置进行中时设置时间戳。对正在进行中的记录进行过长时间的另一个后台处理检查。如果ID不在进行中的HashSet中,请重置进行中的状态,并且您的正常过程将重试。
使您的任务成为幂等:查看是否可以找到一种任务可以识别记录工作的方式。这可能是一个相对昂贵的操作,但是它将为您保证在重试的情况下仅进行一次工作。

07-25 22:33
查看更多