本文介绍了带有JMS变压器的ActiveMQ AMQP,利用弹簧集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试建立一个准系统应用程序,并使用ActiveMQ的AMQP和JMS转换器来运行.我的客户端库是Spring Integration,但是,我无法在此配置中启动并运行基本示例.

I am trying to get a barebones application same up and running leveraging ActiveMQ's AMQP with the JMS transformer. My Client library is Spring Integration, however, I cannot get a basic sample up and running in this configuration.

有关AMQP上ActiveMQ的JMS转换器的详细信息: http://activemq.apache.org/amqp.html

details on ActiveMQ's JMS transformer over AMQP: http://activemq.apache.org/amqp.html

主要测试应用

@IntegrationComponentScan
@SpringBootApplication
public class SpringCloudStreamJmsActivemqSenderExampleApplication implements CommandLineRunner {

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:61616");
        connectionFactory.setUserName("admin");
        connectionFactory.setPassword("admin");
        return connectionFactory;
    }

    @Bean
    public ConnectionFactory connectionFactoryAMQP() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:5672");
        connectionFactory.setUserName("admin");
        connectionFactory.setPassword("admin");
        return connectionFactory;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamJmsActivemqSenderExampleApplication.class, args);
    }

    @Autowired
    JmsGateway gateway;

    @Override
    public void run(String... strings) throws Exception {
        gateway.sendMessage("Hi");
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(1, TimeUnit.SECONDS).get();
    }

    @Bean(name = "outboundChannel")
    MessageChannel myOutBoundChannel() {
        return new QueueChannel();
    }

    @Bean(name = "inboundChannel")
    MessageChannel myInboundChannel() {
        return new QueueChannel();
    }

    @Bean(name = "errorChannel")
    MessageChannel myErrorChannel() {
        return new DirectChannel();
    }

    @Bean
    IntegrationFlow jmsInboundFlow() {
        return IntegrationFlows.from(Jms
                .inboundGateway(connectionFactoryAMQP())
                .destination("myCoolQueue")
                .errorChannel(myErrorChannel()))
                    .handle(this::print)
                .get();
    }

    @Bean
    IntegrationFlow jmsOutboundFlow() {
        return IntegrationFlows.from(myOutBoundChannel())
                .handle(Jms.outboundAdapter(connectionFactory())
                .destination("myCoolQueue"))
                .get();
    }

    @Bean
    IntegrationFlow customErrorFlow() {
        return IntegrationFlows.from(myErrorChannel())
                    .handle(this::printStackTrace)
                .get();
    }

    private void print(Message message) {
        System.out.println("Message payload: " + message.getPayload());
        //throw new RuntimeException("broke it");
    }

    private void printStackTrace(Message errorMessage) {
        ((ErrorMessage)errorMessage).getPayload().printStackTrace();
    }
}

消息传递网关

@MessagingGateway
interface JmsGateway {
    @Gateway(requestChannel = "outboundChannel")
    void sendMessage(String message);
}

ActiveMQ.xml

ActiveMQ.xml

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:0?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

日志输出

2017-01-09 08:42:26.158  INFO 24332 --- [  restartedMain] treamJmsActivemqSenderExampleApplication : Started SpringCloudStreamJmsActivemqSenderExampleApplication in 2.676 seconds (JVM running for 3.041)
2017-01-09 08:42:31.143  WARN 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'myCoolQueue' - trying to recover. Cause: Disposed due to prior exception
2017-01-09 08:42:31.150 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:36.155 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:41.163 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672

推荐答案

您必须通过两种方法更改Bean定义:

you have to change your Bean definition by 2 ways :

JNDI :

@Bean
public ConnectionFactory connectionFactoryAMQP() {
    String factoryName = "myFactoryLookup";
    Properties props = new Properties();
    props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    props.setProperty("connectionfactory." + factoryName, "amqp://localhost:5672");
    props.put("property.connectionfactory." + factoryName + ".username", "admin");
    props.put("property.connectionfactory." + factoryName + ".password", "admin");
    InitialContext ic = new InitialContext(props);
    ConnectionFactory connectionFactory = (ConnectionFactory) ic.lookup(factoryName );
    return connectionFactory;
}

OR

工厂:

@Bean
public ConnectionFactory connectionFactoryAMQP() {
        org.apache.qpid.jms.JmsConnectionFactory connectionFactory = new JmsConnectionFactory();
        connectionFactory.setRemoteURI("amqp://localhost:5672");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
    return connectionFactory;
}

添加此依赖项

在activemq.xml中添加端口

 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>

transport.transformer = jms 只能在AMQP传输与更新之间在代理方将JMS消息从AMQP消息转换为AMQP消息. ActiveMQ,当代理通过AMQP传输接收到AMQP消息时,会将其从AMQP消息转换为JMS消息,而当消息通过AMQP传输分派给使用者时,则会将其从JMS转换为AMQP消息.

transport.transformer=jms only to convert JMS messages from/to AMQP messages on the broker side between AMQP transport & ActiveMQ, when broker receives an AMQP message through AMQP transport it is converted from AMQP message to JMS message and when a message is dispatched to consumer through AMQP transport it is converted from JMS to AMQP message.

这篇关于带有JMS变压器的ActiveMQ AMQP,利用弹簧集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 23:05