我正在为我们的一个应用程序开发消息接口(interface)。应用程序是一种服务,旨在接受“作业”,进行一些处理,并返回结果(实际上是以文件的形式)。
这个想法是使用 RabbitMQ 作为消息传递基础设施,使用 Spring AMQP 来处理协议(protocol)特定的细节。
我不希望我的代码与 Spring AMQP 紧密耦合,所以我想使用 Spring Integration 来隐藏消息传递 api。所以基本上我想要这个:
消息发送到RabbitMQ ====> Spring AMQP ====> Spring Integration ====> MyService ====> 一路回复RabbitMQ
我正在尝试找出将它们连接在一起所需的 XML 配置,但是我在多层次抽象和不同术语方面遇到了问题。事实证明,在 Spring AMQP/RabbitMQ 之上找到一个演示 Spring Integration 的工作示例非常困难,尽管这种设置对我来说感觉非常“最佳实践”。
1) 那么.. 有没有聪明的人可以快速浏览一下这个,也许能把我推向正确的方向?我需要什么,不需要什么? :-)
2) 理想情况下,队列应该是多线程的,这意味着 taskExecutor 应该将多条消息传递给我的 jobService 以进行并行处理。需要什么配置?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
">
<context:component-scan base-package="com.myprogram.etc" />
<!-- Messaging infrastructure: RabbitMQ -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="${ei.messaging.amqp.servername}" />
<property name="username" value="${ei.messaging.amqp.username}" />
<property name="password" value="${ei.messaging.amqp.password}" />
</bean>
<rabbit:connection-factory id="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- From RabbitMQ -->
<int-amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="our-product-name-queue" connection-factory="connectionFactory"/>
<!-- Spring Integration configuration -->
<int:channel id="fromAMQP">
<!-- Is this necessary?? -->
<int:queue/>
</int:channel>
<!-- JobService is a @Service with a @ServiceActivator annotation -->
<int:service-activator input-channel="fromAMQP" ref="jobService"/>
</beans>
最佳答案
我怀疑我和你一样是 spring-integration 和 spring-integration-amqp 的菜鸟,但我确实根据一个示例项目获得了一些工作。
对于rabbitmq基础设施,我有以下几点:
<rabbit:connection-factory id="rabbitConnectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
<!-- some attributes seemed to be ok with queue name, others required id
-- so I used both with the same value -->
<rabbit:queue id='test.queue' name='test.queue'/>
<rabbit:direct-exchange name:"my.exchange">
<rabbit:bindings>
<rabbit:binding queue="test.queue" key="test.binding"/>
</rabbit:bindings>
</rabbit:direct-exchange>
要向rabbitmq发送消息,我有以下内容:
<!-- This is just an interface definition, no implementation required
-- spring will generate an implementation which puts a message on the channel -->
<int:gateway id="backgroundService",
service-interface="com.company.BackgroundService"
default-request-channel="toRabbit"
<int:channel id:"toRabbit"/>
<!-- used amqpTemplate to send messages on toRabbit channel to rabbitmq -->
<int-amqp:outbound-channel-adapter channel:"toRabbit"
amqp-template="amqpTemplate"
exchange-name="my.exchange"
routing-key="test.binding"/>
为了接收消息,我有以下内容:
<int:service-activator input-channel="fromRabbit"
ref="testService"
method="serviceMethod"/>
// from rabbitmq to local channel
<int-amqp:inbound-channel-adapter channel="fromRabbit"
queue-names="test.queue"
connection-factory="rabbitConnectionFactory"/>
<int:channel id="fromRabbit"/>
一些注意事项 - spring-integration 中 amqp 集成的文档说可以同步发送和接收返回值,但我还没有弄清楚。当我的 service-activator 方法返回一个值时,它导致抛出异常,将消息放回 rabbitmq(并生成无限循环,因为它会再次接收消息并再次抛出异常)。
我的 BackgroundService 界面如下所示:
package com.company
import org.springframework.integration.annotation.Gateway
public interface BackgroundService {
//@Gateway(requestChannel="someOtherMessageChannel")
public String sayHello(String toWho)
}
如果您不想使用 spring bean 中配置的默认 channel ,您可以通过注释在每个方法上指定一个 channel 。
附加到服务激活器的服务如下所示:
package com.company;
class TestService {
public void serviceMethod(String param) {
log.info("serviceMethod received: " + param");
//return "hello, " + param;
}
}
当我在不涉及rabbitmq 的情况下在本地连接所有内容时,调用者正确接收了返回值。当我转到rabbitmq channel 时,当返回值后抛出异常时,我得到了上述无限循环。这肯定是可能的,否则不可能在不修改代码的情况下连接到不同的 channel ,但我不确定这个技巧是什么。如果你弄清楚了,请回复一个解决方案。显然,您可以根据需要在端点之间放置任何您喜欢的路由、转换和过滤。
如果我上面的 XML 摘录中有错别字,请不要感到惊讶。我不得不从 groovy DSL 转换回 xml,所以我可能会犯错误。但是意图应该足够清楚。
关于rabbitmq - 使用 Spring 与 RabbitMQ 集成,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/10351948/