问题描述
我想在实现轮询时将SourcePollingChannelAdapter与必需的事务传播一起使用,以在发生错误时回滚所有操作.方法setTransactionSynchronizationFactory不被注释...非常感谢您的帮助!
I would like to use a SourcePollingChannelAdapter with a transaction propagation REQUIRED when the polling is realized, to rollback all operations if an error is occured. The method setTransactionSynchronizationFactory is not commented...Thanks a lot for your help !
在XML中,我可以做到:
In XML I can do :
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" propagation="REQUIRED" />
</int:poller>
我想像这样使用SourcePollingChannelAdapter和PeriodicTrigger进行事务处理,但是我不知道该怎么做.
I would like to use a transaction like this programmatically with a SourcePollingChannelAdapter and a PeriodicTrigger, but I don't know how.
我有这个:
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(source);
adapter.setTrigger(new PeriodicTrigger(5, TimeUnit.SECONDS));
adapter.setOutputChannel(channel);
adapter.setBeanFactory(ctx);
adapter.start();
调用bean源时,将删除数据库中的元素,并创建一条消息并在outputchannel中发送该消息;但是,如果在输出通道后流程中出现错误,我希望恢复数据库并使元素返回……实际上是一个具有传播的简单事务.我不知道该怎么办.
When the bean source is called, an element in database are deleted, a message is created and sent in outputchannel; but if i have an error in the flow after the ouputchannel i would like database restored and element came back ... a simple transaction in fact with propagation. I don't understand how do that.
输出通道是:
<int:channel id="channel" >
<int:queue />
</int:channel>
<int-http:outbound-gateway request-channel="channel"
url="http://localhost:8081/icopitole-ws/baseactive" http-method="GET"
reply-channel="reresponseVersionChannel" expected-response-type="java.lang.String" />
当URL没有响应时,尽管我添加了DefaultTransactionSynchronizationFactory和TransactionInterceptor,就像您所说的那样,将引发异常但不执行回滚:(
When the URL doesn't respond, an exception is thrown but no Rollback is executed, although I have add a DefaultTransactionSynchronizationFactory and TransactionInterceptor like you said :(
推荐答案
如果我对您的理解正确,则需要使用以下代码: DefaultTransactionSynchronizationFactory
If I understand you correctly you need to use this one: DefaultTransactionSynchronizationFactory
这是如何配置它的快照:
And here is a snapshot how to configure it:
SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(mock(BeanFactory.class));
PollableChannel queueChannel = new QueueChannel();
syncProcessor.setBeforeCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
syncProcessor.setBeforeCommitChannel(queueChannel);
syncProcessor.setAfterCommitChannel(queueChannel);
syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#baz"));
DefaultTransactionSynchronizationFactory syncFactory =
new DefaultTransactionSynchronizationFactory(syncProcessor);
adapter.setTransactionSynchronizationFactory(syncFactory);
SourcePollingChannelAdapter#adviceChain
涵盖了事务边界,因此应按以下方式进行配置:
Transaction boundaries are covered in the SourcePollingChannelAdapter#adviceChain
, so it should be configure like this:
TransactionInterceptor txAdvice =
new TransactionInterceptor(transactionManager,
new MatchAlwaysTransactionAttributeSource(new DefaultTransactionAttribute()));
adapter.setAdviceChain(Collections.singletonList(txAdvice));
因此,现在每个民意测验"都将包裹交易,而您的syncFactory
会做这些事.
So, now each 'poll' will be wrapped with transaction and your syncFactory
will do the stuff.
这篇关于带有事务的SourcePollingChannelAdapter的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!