我有一个从一组微服务构建的应用程序。一个服务接收数据,通过Spring JPA和Eclipse链接将其保留,然后将警报(AMQP)发送给第二个服务。
然后,根据特定条件,第二个服务针对持久化数据调用RESTfull Web服务以检索保存的信息。
我注意到,有时即使已保存数据,RESTfull服务也会返回空数据集。查看持久性服务的代码,使用了save而不是saveandflush,因此我认为数据刷新的速度不足以使下游服务查询。
我应该说原始的持久性功能包装在
@Transactional
中 最佳答案
问题的可能预后
我相信这里的问题与save
和saveAndFlush
无关。该问题似乎与Spring @Transactional
方法的性质有关,并且在涉及数据库和AMQP代理的分布式环境中错误地使用了这些事务,并且可能会加重这种对JPA上下文如何工作的基本误解。
在您的解释中,您似乎暗示要在@Transactional
方法内启动JPA事务,并且在事务期间(但在提交之前)将消息发送到AMQP代理。稍后,在队列的另一侧,使用者应用程序获取消息并进行REST服务调用。此时,您注意到来自发布者方的事务更改尚未提交到数据库,因此对消费者方不可见。
问题似乎是您在JPA事务提交到磁盘之前在AMPA消息中传播了这些消息。当消费者阅读消息并进行处理时,您从发布方进行的交易可能尚未完成。因此,这些更改对于客户应用程序是不可见的。
如果您的AMPQ实现是Rabbit,那么我以前已经看到过此问题。当启动使用数据库事务管理器的@Transactional
方法时,在该方法内,您将使用RabbitTemplate
发送相应的消息。
如果您的RabbitTemplate
没有使用事务处理的 channel (即channelTransacted=true
),那么您的消息将在数据库事务提交之前传递。我相信,通过在RabbitTemplate
中启用交易 channel (默认情况下处于禁用状态),可以解决部分问题。
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
channel-transacted="true"/>
交易 channel 后,RabbitTemplate
将“加入”当前数据库事务(显然是JPA事务)。一旦您的JPA事务提交,它就会运行一些结尾代码,这些代码也将在Rabbit channel 中提交更改,从而强制消息的实际“发送”。关于保存与saveAndFlush
您可能会认为清除JPA上下文中的更改应该已经解决了问题,但是您错了。刷新JPA上下文只会强制将实体中的更改(此时仅在内存中)写入磁盘。但是,它们仍会写入相应数据库事务中的磁盘中,直到您的JPA事务提交后才会提交。这发生在
@Transactional
方法的末尾(不幸的是,在您已经发送了AMQP消息之后的一段时间(如果您没有使用如上所述的事务处理的 channel ))。因此,即使刷新JPA上下文,在发布者应用程序中
@Transactional
方法完成之前,您的使用者应用程序也不会看到这些更改(根据经典数据库隔离级别规则)。当您调用
save(entity),
时,EntityManager
不需要立即同步任何更改。大多数JPA实现只是将实体标记为内存已脏,然后等到最后一分钟将所有更改与数据库同步并在数据库级别提交这些更改。注意:在某些情况下,您可能希望其中的某些更改立即存入磁盘,而不是在异想天开的
EntityManager
决定这样做之前。当数据库表中有一个触发器,您需要运行该触发器以生成一些其他记录,这些记录在以后的事务中将需要时,就会发生这种情况。因此,您可以强制对磁盘所做的更改进行刷新,以强制执行触发器。通过刷新上下文,您只是在强制将内存中的更改同步到磁盘,但这并不意味着立即对这些修改进行数据库提交。因此,您刷新的那些更改不一定对其他事务可见。基于传统的数据库隔离级别,很可能不会。
2PC问题
这里的另一个经典问题是您的数据库和AMQP代理是两个独立的系统。如果这是关于Rabbit的,那么您没有2PC(两阶段提交)。
因此,您可能需要考虑一些有趣的情况,例如,数据库事务成功提交。但是,Rabbit无法提交您的消息,在这种情况下,您将不得不重复整个事务,可能会跳过数据库的副作用,而只是重新尝试将消息发送给Rabbit。
您可能应该在Distributed transactions in Spring, with and without XA上阅读此文章,特别是有关链式交易的部分有助于解决此问题。
他们建议使用更复杂的事务管理器定义。例如:
<bean id="jdbcTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="rabbitTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="chainedTransactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
<constructor-arg name="transactionManagers">
<array>
<ref bean="rabbitTransactionManager"/>
<ref bean="jdbcTransactionManager"/>
</array>
</constructor-arg>
</bean>
然后,在您的代码中,只需使用链接的事务管理器来协调数据库事务部分和Rabbit事务部分。现在,仍然有可能提交数据库部分,但是Rabbit事务部分失败。
因此,想象一下这样的事情:
@Retry
@Transactional("chainedTransactionManager")
public void myServiceOperation() {
if(workNotDone()) {
doDatabaseTransactionWork();
}
sendMessagesToRabbit();
}
通过这种方式,如果您的Rabbit事务部分由于任何原因失败,并且您被迫重试整个链接的事务,则可以避免重复数据库的副作用,而只需确保将失败的消息发送给Rabbit。同时,如果您的数据库部分发生故障,则您永远不会将消息发送给Rabbit,也就不会有问题。
另外,如果您的数据库副作用是幂等的,则可以跳过检查,仅重新应用数据库更改,然后重新尝试将消息发送给Rabbit。
事实是,一开始,您尝试做的事情似乎非常容易,但是一旦您深入研究了不同的问题并理解了这些问题,您就会意识到以正确的方式进行操作是一件棘手的事情。