问题描述
我正在使用 spring xd 我的流如下所示,并在 3 个节点容器上运行测试,其中 1 个管理节点使用兔子作为传输
aws-s3|processor1|http-client|processor2>queue:readyQueue
我在水龙头下方创建了.
tap1 aws-s3>s3Queuetap2 处理器1>处理器队列1tap3 http-client>httpQueue
我在测试中运行以下场景:
Scenario1
:200k = 100 万条记录的 5 个文件http-client=70 和 processor2=30 的并发
我看到 900k 消息 s3Queue
我看到 889k 消息处理器队列 1
我看到 886k 消息 httpQueue
我看到883k消息processorQueue2消息随处丢失,而且是随机的
场景2:
5 个文件 200k = 100 万条记录和所有模块并发=1
我看到 998800 消息 s3Queue
我看到 998760 消息处理器队列 1
我看到 997540 消息 httpQueue
我看到 997530 消息处理器队列 2
即使这个数字也是随机且不一致的
Scenario3
我更改了如下流,并发=1 和 200k = 100 万条记录的 5 个文件
aws-s3 >testQueue
我收到了我运行 3 次的所有消息,没有任何问题.我收到了我所有的 100 万条消息
scenario4
我更改了如下流,并发=1 5 个 200k = 100 万条记录的文件
aws-s3 |processor1 >testQueue2
我收到了我运行 3 次的所有消息,没有任何问题.我收到了我所有的 100 万条消息
在场景 4 和场景 3 中,数据摄取速度更快,处理 500 万个数据需要 5 分钟,并且兔子传输队列中的摄取速度更快,例如每秒 5k 消息
在场景 1 中,即使 s3 模块提取数据的速度非常慢,例如每秒 300 到 1000 条消息
在场景 2 中,s3 提取数据的速度更快,但 http 客户端的速度很慢,例如每秒 100 条 msg,但 aws-s3 提取数据的速度很快,例如每秒 3-4k 条.
我认为 xd 线程导致问题,我正在丢失消息.请您帮我解决这个问题.
更新
场景5
我在 http 客户端将 reply-timeout
改为 -1
然后我只丢失了 37 条信息
现在我再次运行第二次迭代,我丢失了 25000 条消息,当发生这种情况时,我看到了下面的容器日志
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1];嵌套异常是 org.springframework.amqp.AmqpIOException: java.io.IOException在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)在 org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)在 org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)在 org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)在 sun.reflect.GeneratedMethodAccessor204.invoke(未知来源)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:497)在 org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)在 org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)在 org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)在 org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)在 org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)在 org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)在 com.sun.proxy.$Proxy1537.send(来源不明)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)在 org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)在 org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)引起:org.springframework.amqp.AmqpIOException:java.io.IOException在 org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)在 org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)在 org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)在 org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)在 org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)在 org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)在 org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)... 93 更多引起:java.io.IOException在 com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)在 com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)在 com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)在 com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)在 com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)在 com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)在 org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)... 112 更多引起:com.rabbitmq.client.ShutdownSignalException:连接错误在 com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)在 com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)在 com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)在 com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)在 com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)... 116 更多引起:com.rabbitmq.client.impl.UnknownChannelException:未知频道号 23364在 com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)在 com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)... 1 更多2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - 通道关闭:通道错误;协议方法:#method(reply-code=404,reply-text=NOT_FOUND - vhost 中没有队列 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0''/', class-id=50, method-id=20)2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - 通道关闭:连接错误2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - 通道关闭:通道错误;协议方法:#method(reply-code=404,reply-text=NOT_FOUND - vhost 中没有队列 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0''/', class-id=50, method-id=20)~2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - 通道关闭:连接错误2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - 通道关闭:连接错误2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - 通道关闭:连接错误
更新了
我发现发生此异常时消息丢失的问题,我看到很多 msg 丢失.我多次测试了这种模式.每次发生此异常时,我都会看到 msg 丢失.同时提高并发性使此问题经常发生.
2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - 通道关闭:连接错误
兔子配置
弹簧:兔米克:地址:host1:5672,host2:5672,host3:5672管理员地址:http://host1:15672,http://host2:15672,http://host3:15672节点:[email protected],[email protected],[email protected]用户名:测试密码:测试虚拟主机:/使用SSL:假ssl 属性:
更新缓存大小增加到 200
我添加了你提供的 xml 并将缓存大小增加到 200.这是处理 100 万和 80k 消息时发生的方式.只有我的 http 客户端并发是 100,其他都是 1.慢慢处理停止的消息之前仍然存在http-client 队列和相同的计数.但我命名通道中的 msg 计数缓慢增加,如每分钟 10 msg,但速度非常慢s3-poller|processor|http-client>queue:batchCacheQueue
在 http 186174 之前队列中的消息没有减少.但是慢慢地消息进入 batchCacheQueue
要模拟的测试用例:
1)我在复合模块中使用带有分离器的 spring 集成 aws-s3 源 |处理器像 xml 解析 |http-client with concurrency 100 >named channel.
2)我认为文件源也可能有效.创建包含百万条记录的单个文件并尝试从文件中提取.
3) 运行 4 到 5 次后,我们看到此异常发生
我们发现了频道大量流失时的问题;您需要在兔子缓存连接工厂中增加通道缓存大小.
请参阅此答案以获取解决方法.
我打开了一个 JIRA issue 以便下一版本的 Spring XD 公开在servers.yml 中设置此设置,因此您不必覆盖总线配置文件.
I am using spring xd My stream looks like below and running tests on 3 node container with 1 admin node with rabbit as transport
aws-s3|processor1|http-client|processor2>queue:readyQueue
I have created below tap.
tap1 aws-s3>s3Queue
tap2 processor1>processorQueue1
tap3 http-client>httpQueue
I run below scenarios in my tests:
Scenario1
: 5 files of 200k =1 Million records concurrency of http-client=70 and processor2=30
I see 900k message s3Queue
I see 889k message processorQueue1
I see 886k message httpQueue
I see 883k message processorQueue2Messages are lost everywhere and its random
Scenario2:
5 files of 200k =1 Million records and all module concurrency=1
I see 998800 message s3Queue
I see 998760 message processorQueue1
I see 997540 message httpQueue
I see 997530 message processorQueue2
Even this number is random and not consistent
Scenario3
I changed stream as below and concurrency=1 and 5 files of 200k =1 Million records
aws-s3 >testQueue
I get all my messages I run 3 times and no issues.I get all my 1 million messages
scenario4
I changed stream as below and concurrency=1 5 files of 200k =1 Million records
aws-s3 |processor1 >testQueue2
I get all my messages I run 3 times and no issues.I get all my 1 million messages
In scenario4 and scenarion 3 data ingestion is faster and it took 5 min to process 5 million faster and ingestion was faster in rabbit transport queue like 5k msg per sec
In scenario 1 data ingestion was slower even s3 module was pulling the data very slow like 300 to 1000 msg per sec
In scenario 2 s3 pulled data faster but http client was slow like 100 msg per sec but aws-s3 pulled data fast like 3-4k msg per sec.
I am thinking like seeing xd threading is causing issues and i am losing messages.Please can you help me how to solve this issue.
update
Scenario 5
I changed reply-timeout
to -1
in http client and thenI lost only 37 msgs
Now again I run 2nd iteration I lost 25000 msgs i see the bellowing containers log when that happened
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy1537.send(Unknown Source)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 93 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
... 1 more
2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~
2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
Updated
I found the issue for message loses when this exception happens i see lot of msg lost.This pattern i tested multiple time.Everytime this exception happens i see msg lost.Also bumping up concurrency makes this issue to occur often.
2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
rabbit configuration
spring:
rabbitmq:
addresses: host1:5672,host2:5672,host3:5672
adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
nodes: [email protected],[email protected],[email protected]
username: test
password: test
virtual_host: /
useSSL: false
sslProperties:
updated with increasing cache size to 200
I added xml provided by you and increased cache size to 200.This is the way happens when processing 1 million and 80 k messages.Only my http client concurrency is 100 all other is 1 .Slowly processing stopped msg are still there before http-client queue and same count.But msg count in my named channel slowly increasing like 10 msg per minute but its very slows3-poller|processor|http-client>queue:batchCacheQueue
Msg not getting decreass in queue before http 186174.But slowly msg are coming in to batchCacheQueue
Test case to simulate:
1)I was using spring integration aws-s3 source with a splitter in composite module | processor like xml parsing |http-client with concurrency 100 >named channel.
2)I think file source might also work.Create single file of million records and try to pull this from file.
3)After some 4 to 5 run we see this exception happening
We found an issue when channels are churned a lot; you need to increase the channel cache size in the rabbit caching connection factory.
See this answer for a work-around.
I opened a JIRA issue so that the next version of Spring XD will expose this setting by in servers.yml so you don't have to override the bus configuration file.
这篇关于spring xd 在处理大量数据时丢失消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!