本文介绍了带有事务的SourcePollingChannelAdapter的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在实现轮询时将SourcePollingChannelAdapter与必需的事务传播一起使用,以在发生错误时回滚所有操作.方法setTransactionSynchronizationFactory不被注释...非常感谢您的帮助!

I would like to use a SourcePollingChannelAdapter with a transaction propagation REQUIRED when the polling is realized, to rollback all operations if an error is occured. The method setTransactionSynchronizationFactory is not commented...Thanks a lot for your help !

在XML中,我可以做到:

In XML I can do :

<int:poller fixed-rate="5000">
  <int:transactional transaction-manager="transactionManager" propagation="REQUIRED" />
</int:poller>

我想像这样使用SourcePollingChannelAdapter和PeriodicTrigger进行事务处理,但是我不知道该怎么做.

I would like to use a transaction like this programmatically with a SourcePollingChannelAdapter and a PeriodicTrigger, but I don't know how.

我有这个:

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(source);
adapter.setTrigger(new PeriodicTrigger(5, TimeUnit.SECONDS));
adapter.setOutputChannel(channel);
adapter.setBeanFactory(ctx);
adapter.start();

调用bean源时,将删除数据库中的元素,并创建一条消息并在outputchannel中发送该消息;但是,如果在输出通道后流程中出现错误,我希望恢复数据库并使元素返回……实际上是一个具有传播的简单事务.我不知道该怎么办.

When the bean source is called, an element in database are deleted, a message is created and sent in outputchannel; but if i have an error in the flow after the ouputchannel i would like database restored and element came back ... a simple transaction in fact with propagation. I don't understand how do that.

输出通道是:

<int:channel id="channel" >
    <int:queue />
</int:channel>
<int-http:outbound-gateway request-channel="channel"
    url="http://localhost:8081/icopitole-ws/baseactive" http-method="GET"
    reply-channel="reresponseVersionChannel" expected-response-type="java.lang.String"  />

当URL没有响应时,尽管我添加了DefaultTransactionSynchronizationFactory和TransactionInterceptor,就像您所说的那样,将引发异常但不执行回滚:(

When the URL doesn't respond, an exception is thrown but no Rollback is executed, although I have add a DefaultTransactionSynchronizationFactory and TransactionInterceptor like you said :(

推荐答案

如果我对您的理解正确,则需要使用以下代码: DefaultTransactionSynchronizationFactory

If I understand you correctly you need to use this one: DefaultTransactionSynchronizationFactory

这是如何配置它的快照:

And here is a snapshot how to configure it:

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
            new ExpressionEvaluatingTransactionSynchronizationProcessor();
    syncProcessor.setBeanFactory(mock(BeanFactory.class));
    PollableChannel queueChannel = new QueueChannel();
    syncProcessor.setBeforeCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
    syncProcessor.setBeforeCommitChannel(queueChannel);
    syncProcessor.setAfterCommitChannel(queueChannel);
    syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#baz"));

    DefaultTransactionSynchronizationFactory syncFactory =
            new DefaultTransactionSynchronizationFactory(syncProcessor);

    adapter.setTransactionSynchronizationFactory(syncFactory);

SourcePollingChannelAdapter#adviceChain涵盖了事务边界,因此应按以下方式进行配置:

Transaction boundaries are covered in the SourcePollingChannelAdapter#adviceChain, so it should be configure like this:

    TransactionInterceptor txAdvice =
    new TransactionInterceptor(transactionManager,
         new MatchAlwaysTransactionAttributeSource(new DefaultTransactionAttribute()));
    adapter.setAdviceChain(Collections.singletonList(txAdvice));

因此,现在每个民意测验"都将包裹交易,而您的syncFactory会做这些事.

So, now each 'poll' will be wrapped with transaction and your syncFactory will do the stuff.

这篇关于带有事务的SourcePollingChannelAdapter的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 06:52
查看更多