我有一个依赖于Spring Integration(4.0.4.RELEASE)和RabbitMQ的应用程序。我的流程如下:
消息通过进程放入队列中(它们不希望有任何应答):
网关-> channel -> RabbitMQ
然后被另一个过程耗尽:
RabbitMQ --1--> inbound-channel-adapter A --2--> chain B --3--> aggregator C --4--> service-activator D --5--> final service-activator E
Explanations & context
The specific thing is that nowhere in my application I am using a splitter: aggregator C just waits for enough messages to come, or for a timeout to expire, and then forwards the batch to service D. Messages can get stuck in aggregator C for quite a long time, and should NOT be considered as consumed there. They should only be consumed once service D successfully completes. Therefore, I am using MANUAL acknowledgement on inbound-channel-adapter A and service E is in charge of acknowledging the batch.
Custom aggregator
I solved the acknowledgement issue I had when set to AUTO by redefining the aggregator. Indeed, messages are acknowledged immediately if any asynchronous process occurs in the flow (see question here). Therefore, I switched to MANUAL acknowledgement and implemented the aggregator like this:
<bean class="org.springframework.integration.config.ConsumerEndpointFactoryBean">
<property name="inputChannel" ref="channel3"/>
<property name="handler">
<bean class="org.springframework.integration.aggregator.AggregatingMessageHandler">
<constructor-arg name="processor">
<bean class="com.test.AMQPAggregator"/>
</constructor-arg>
<property name="correlationStrategy">
<bean class="com.test.AggregatorDefaultCorrelationStrategy" />
</property>
<property name="releaseStrategy">
<bean class="com.test.AggregatorMongoReleaseStrategy" />
</property>
<property name="messageStore" ref="messageStoreBean"/>
<property name="expireGroupsUponCompletion" value="true"/>
<property name="sendPartialResultOnExpiry" value="true"/>
<property name="outputChannel" ref="channel4"/>
</bean>
</property>
</bean>
<bean id="messageStoreBean" class="org.springframework.integration.store.SimpleMessageStore"/>
<bean id="messageStoreReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore" />
<property name="timeout" value="${myapp.timeout}" />
</bean>
<task:scheduled-tasks>
<task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
</task:scheduled-tasks>
我确实确实想以不同的方式聚合 header ,并保留所有amqp_deliveryTag中的最大值,以便以后在服务E 中进行多重确认(请参阅this线程)。到目前为止,它的效果很好,除了它比典型的聚合器 namespace 更冗长之外(请参阅this旧的Jira票证)。
服务
我只是在使用基本配置:
链-B
<int:chain input-channel="channel2" output-channel="channel3">
<int:header-enricher>
<int:error-channel ref="errorChannel" /> // Probably useless
</int:header-enricher>
<int:json-to-object-transformer/>
<int:transformer ref="serviceABean"
method="doThis" />
<int:transformer ref="serviceBBean"
method="doThat" />
</int:chain>
服务-D
<int:service-activator ref="serviceDBean"
method="doSomething"
input-channel="channel4"
output-channel="channel5" />
错误管理
由于我依靠MANUAL确认,因此还需要手动拒绝消息,以防发生异常。我对 inbound-channel-adapter A 具有以下定义:
<int-amqp:inbound-channel-adapter channel="channel2"
queue-names="si.queue1"
error-channel="errorChannel"
mapped-request-headers="*"
acknowledge-mode="MANUAL"
prefetch-count="${properties.prefetch_count}"
connection-factory="rabbitConnectionFactory"/>
我对 errorChannel 使用以下定义:
<int:chain input-channel="errorChannel">
<int:transformer ref="errorUnwrapperBean" method="unwrap" />
<int:service-activator ref="amqpAcknowledgerBean" method="rejectMessage" />
</int:chain>
ErrorUnwrapper基于this代码,整个异常检测和消息拒绝工作良好,直到消息到达聚合器C 为止。
问题
如果在服务激活程序D 中处理消息时引发异常,那么我会看到此异常,但 errorChannel 似乎没有收到任何消息,并且不会调用我的ErrorUnwrapper unwrap()方法。我在抛出Exception(“ahahah”)时看到的定制堆栈跟踪如下:
2014-09-23 16:41:18,725 ERROR o.s.i.s.SimpleMessageStore:174: Exception in expiry callback
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:170)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
(...)
Caused by: java.lang.Exception: ahahaha
at com.myapp.ServiceD.doSomething(ServiceD.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
(...)
2014-09-23 16:41:18,733 ERROR o.s.s.s.TaskUtils$LoggingErrorHandler:95: Unexpected error occurred in scheduled task.
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
(...)
问题
如何告诉服务处理来自此类聚合器的消息的服务将错误发布到 errorChannel ?我试图通过标题增强器在标题中指定没有错误的错误 channel 。我正在使用默认的 errorChannel 定义,但我也尝试过更改其名称并重新定义它。我在这里一无所知,即使我找到了this和that,也未能使其正常工作。在此先感谢您的帮助!
最佳答案
从StackTrace可以看到,您的过程是从MessageGroupStoreReaper
线程开始的,该线程是从默认ThreadPoolTaskScheduler
发起的。
因此,您必须为此提供一个定制bean:
<bean id="scheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="errorHandler">
<bean class="org.springframework.integration.channel.MessagePublishingErrorHandler">
<property name="defaultErrorChannel" ref="errorChannel"/>
</bean>
</property>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
</task:scheduled-tasks>
但是,我看到将
error-channel
放在<aggregator>
上的好处是,实际上我们有来自不同分离线程的多个点,至此我们无法正常处理。关于spring-integration - Spring 整合: how to handle exceptions in services after an aggregator?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/25999096/