我们有Atomikos(TransactionManager)和ActiveMQ的问题。 Atomikos在Spring环境中用于启用XA事务。我们测试了ActiveMQ客户端的故障转移行为,并注意到了一些不会消失的异常。我们的测试方案如下:


启动两个使用相同数据库的代理,一个代理将成为主服务器,第二个将成为从服务器
启动客户端;客户端将连接到主经纪人
突然停止经纪人(杀死-9)
故障转移到第二个代理,并且可以正常工作


我们使用以下版本:


ActiveMQ 5.10.0
Atomikos 3.9.3、4.0.0M3、3.9.7
春季3.2.4。发布


现在,我们从Atomikos获得以下异常:

2014-06-12 11:28:42 tpe-1 com.atomikos.datasource.xa.XAResourceTransaction WARN  - XA resource 'connectionFactoryOut': commit for XID '6170695F3134303235363533313236323230303334383030303031:6170695F31343032353635333132363232333438' raised -7: the XA resource has become unavailable
javax.transaction.xa.XAException: The JMS connection has failed: java.io.EOFException
    at org.apache.activemq.TransactionContext.toXAException(TransactionContext.java:793)
    at org.apache.activemq.TransactionContext.commit(TransactionContext.java:590)
    at com.atomikos.datasource.xa.XAResourceTransaction.commit(XAResourceTransaction.java:733)
    at com.atomikos.icatch.imp.CommitMessage.send(CommitMessage.java:72)
    at com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:83)
    at com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:79)
    at com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:58)
    at com.atomikos.icatch.imp.CoordinatorStateHandler.commitFromWithinCallback(CoordinatorStateHandler.java:582)
    at com.atomikos.icatch.imp.ActiveStateHandler$6.doCommit(ActiveStateHandler.java:301)
    at com.atomikos.icatch.imp.CoordinatorStateHandler.commitWithAfterCompletionNotification(CoordinatorStateHandler.java:852)
    at com.atomikos.icatch.imp.ActiveStateHandler.commit(ActiveStateHandler.java:296)
    at com.atomikos.icatch.imp.CoordinatorImp.commit(CoordinatorImp.java:707)
    at com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:968)
    at com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:82)
    at com.atomikos.icatch.imp.CompositeTransactionImp.commit(CompositeTransactionImp.java:336)
    at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:190)
    at com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:436)
    at com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:107)
    at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1021)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150)
    at net.sprd.messaging.test.data_generator.SenderThread$1.run(SenderThread.java:42)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.activemq.ConnectionFailedException: The JMS connection has failed: java.io.EOFException
    at org.apache.activemq.ActiveMQConnection.checkClosedOrFailed(ActiveMQConnection.java:1483)
    at org.apache.activemq.TransactionContext.commit(TransactionContext.java:551)
    ... 26 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
    ... 1 more
2014-06-12 11:28:42 tpe-1 com.atomikos.icatch.imp.CommitMessage WARN  - Unexpected error in commit
com.atomikos.icatch.SysException: XA resource 'connectionFactoryOut': commit for XID '6170695F3134303235363533313236323230303334383030303031:6170695F31343032353635333132363232333438' raised -7: the XA resource has become unavailable
    at com.atomikos.datasource.xa.XAResourceTransaction.commit(XAResourceTransaction.java:773)
    at com.atomikos.icatch.imp.CommitMessage.send(CommitMessage.java:72)
    at com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:83)
    at com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:79)
    at com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:58)
    at com.atomikos.icatch.imp.CoordinatorStateHandler.commitFromWithinCallback(CoordinatorStateHandler.java:582)
    at com.atomikos.icatch.imp.ActiveStateHandler$6.doCommit(ActiveStateHandler.java:301)
    at com.atomikos.icatch.imp.CoordinatorStateHandler.commitWithAfterCompletionNotification(CoordinatorStateHandler.java:852)
    at com.atomikos.icatch.imp.ActiveStateHandler.commit(ActiveStateHandler.java:296)
    at com.atomikos.icatch.imp.CoordinatorImp.commit(CoordinatorImp.java:707)
    at com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:968)
    at com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:82)
    at com.atomikos.icatch.imp.CompositeTransactionImp.commit(CompositeTransactionImp.java:336)
    at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:190)
    at com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:436)
    at com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:107)
    at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1021)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150)
    at net.sprd.messaging.test.data_generator.SenderThread$1.run(SenderThread.java:42)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


发送部件的Spring配置如下所示:

            
        

    <bean id="jmsTemplateBroker" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg name="connectionFactory" ref="cachingSendingSessionFactoryBroker" />
        <property name="sessionTransacted" value="true" />
    </bean>

    <bean id="cachingSendingSessionFactoryBroker" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg name="targetConnectionFactory" ref="atomikosConnectionFactoryBroker" />
        <property name="sessionCacheSize" value="1000" />
        <property name="cacheConsumers" value="false" />
        <property name="cacheProducers" value="false" />
    </bean>

    <bean id="atomikosConnectionFactoryBroker" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
        init-method="init" destroy-method="close" depends-on="activeMQConnectionFactoryBroker">
        <property name="uniqueResourceName" value="atomikosConnectionFactoryBroker" />
        <property name="xaConnectionFactory" ref="activeMQConnectionFactoryBroker" />
        <property name="minPoolSize" value="1" />
        <property name="maxPoolSize" value="1000" />
    </bean>

    <bean id="activeMQConnectionFactoryBroker" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="${broker.brokerURL}" />
        <property name="userName" value="testUser" />
        <property name="password" value="testPassword" />
        <property name="redeliveryPolicy" ref="redeliveryPolicyBroker" />
        <property name="prefetchPolicy" ref="activeMQPrefechPolicyBroker" />
    </bean>

    <bean id="activeMQPrefechPolicyBroker" class="org.apache.activemq.ActiveMQPrefetchPolicy">
        <property name="all" value="100" />
    </bean>

    <bean id="redeliveryPolicyBroker" class="org.apache.activemq.RedeliveryPolicy">
        <property name="initialRedeliveryDelay" value="1000" />
        <property name="redeliveryDelay" value="1000" />
        <property name="useExponentialBackOff" value="false" />
        <property name="maximumRedeliveries" value="1000" />
    </bean>


Atomikos Bean的配置如下:

            
            
            
        

    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
        init-method="init" destroy-method="close" depends-on="userTransactionService">
        <property name="forceShutdown" value="true" />
    </bean>

    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="70" />
    </bean>


    <bean id="timestamp" class="java.lang.String">
        <constructor-arg value="#{'' + T(java.lang.System).currentTimeMillis()}" />
    </bean>

    <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
        init-method="init" destroy-method="shutdownForce">
        <constructor-arg>
            <props>
                <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
                <prop key="com.atomikos.icatch.output_dir">/tmp/atomikosOutput</prop>
                <prop key="com.atomikos.icatch.console_file_name">/tmp/api_#{timestamp}.out</prop>
                <prop key="com.atomikos.icatch.console_log_level">DEBUG</prop>
                <prop key="com.atomikos.icatch.tm_unique_name">api_#{timestamp}</prop>
                <prop key="com.atomikos.icatch.threaded_2pc">false</prop>
                <prop key="com.atomikos.icatch.max_actives">-1</prop>
                <prop key="com.atomikos.icatch.max_timeout">10000</prop>
                <prop key="com.atomikos.icatch.default_jta_timeout">999</prop>
                <prop key="com.atomikos.icatch.serial_jta_transactions">false</prop>
                <prop key="com.atomikos.icatch.enable_logging">false</prop>
                <prop key="com.atomikos.icatch.log_base_name">api#{timestamp}</prop>
                <prop key="com.atomikos.icatch.log_base_dir">/tmp/atomikos</prop>
                <prop key="com.atomikos.icatch.checkpoint_interval">5000</prop>
            </props>
        </constructor-arg>
    </bean>


这些异常会定期抛出。我们假设以下情况:Atomikos向ActiveMQ代理发送一条prepare语句。之后,代理将关闭。现在,准备好的语句在ActiveMQ端丢失了,因此Atomikos将尝试恢复此已消失的事务。因为两个ActiveMQ代理都使用相同的数据库,所以我们认为应将准备好的语句存储在数据库中。因此,从代理可以恢复交易。但这不会发生。

有谁知道这是否是ActiveMQ的错误,或者是否有ActiveMQ的“隐藏”配置属性来启用XA事务的准备模式?

补充说明:我们确保对事务管理器名称使用唯一的ID(与Stackoverflow article about Atomikos unique ids相关)

谢谢。

最佳答案

这显然是/是一个错误。它已用版本3.9.9修复(即使此版本/修复程序当前仅适用于Atomikos客户)。

关于java - 事务未准备好(ActiveMQ),“XA资源变得不可用”(Atomikos),日志中无休止的异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24691103/

10-09 06:41