本文介绍了spring xd 在处理大量数据时丢失消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 spring xd 我的流如下所示,并在 3 个节点容器上运行测试,其中 1 个管理节点使用兔子作为传输

aws-s3|processor1|http-client|processor2>queue:readyQueue

我在水龙头下方创建了.

tap1 aws-s3>s3Queuetap2 处理器1>处理器队列1tap3 http-client>httpQueue

我在测试中运行以下场景:

Scenario1:200k = 100 万条记录的 5 个文件http-client=70 和 processor2=30 的并发

我看到 900k 消息 s3Queue

我看到 889k 消息处理器队列 1

我看到 886k 消息 httpQueue

我看到883k消息processorQueue2消息随处丢失,而且是随机的

场景2:

5 个文件 200k = 100 万条记录和所有模块并发=1

我看到 998800 消息 s3Queue

我看到 998760 消息处理器队列 1

我看到 997540 消息 httpQueue

我看到 997530 消息处理器队列 2

即使这个数字也是随机且不一致的

Scenario3

我更改了如下流,并发=1 和 200k = 100 万条记录的 5 个文件

aws-s3 >testQueue

我收到了我运行 3 次的所有消息,没有任何问题.我收到了我所有的 100 万条消息

scenario4

我更改了如下流,并发=1 5 个 200k = 100 万条记录的文件

aws-s3 |processor1 >testQueue2

我收到了我运行 3 次的所有消息,没有任何问题.我收到了我所有的 100 万条消息

在场景 4 和场景 3 中,数据摄取速度更快,处理 500 万个数据需要 5 分钟,并且兔子传输队列中的摄取速度更快,例如每秒 5k 消息

在场景 1 中,即使 s3 模块提取数据的速度非常慢,例如每秒 300 到 1000 条消息

在场景 2 中,s3 提取数据的速度更快,但 http 客户端的速度很慢,例如每秒 100 条 msg,但 aws-s3 提取数据的速度很快,例如每秒 3-4k 条.

我认为 xd 线程导致问题,我正在丢失消息.请您帮我解决这个问题.

更新

场景5

我在 http 客户端将 reply-timeout 改为 -1 然后我只丢失了 37 条信息

现在我再次运行第二次迭代,我丢失了 25000 条消息,当发生这种情况时,我看到了下面的容器日志

2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1];嵌套异常是 org.springframework.amqp.AmqpIOException: java.io.IOException在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)在 org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)在 org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)在 org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)在 sun.reflect.GeneratedMethodAccessor204.invoke(未知来源)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:497)在 org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)在 org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)在 org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)在 org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)在 org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)在 org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)在 com.sun.proxy.$Proxy1537.send(来源不明)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)在 org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)在 org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)引起:org.springframework.amqp.AmqpIOException:java.io.IOException在 org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)在 org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)在 org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)在 org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)在 org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)在 org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)在 org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)在 org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)在 org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)... 93 更多引起:java.io.IOException在 com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)在 com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)在 com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)在 com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)在 com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)在 com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)在 org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)... 112 更多引起:com.rabbitmq.client.ShutdownSignalException:连接错误在 com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)在 com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)在 com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)在 com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)在 com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)... 116 更多引起:com.rabbitmq.client.impl.UnknownChannelException:未知频道号 23364在 com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)在 com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)... 1 更多2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - 通道关闭:通道错误;协议方法:#method(reply-code=404,reply-text=NOT_FOUND - vhost 中没有队列 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0''/', class-id=50, method-id=20)2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - 通道关闭:连接错误2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - 通道关闭:通道错误;协议方法:#method(reply-code=404,reply-text=NOT_FOUND - vhost 中没有队列 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0''/', class-id=50, method-id=20)~2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - 通道关闭:连接错误2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - 通道关闭:连接错误2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - 通道关闭:连接错误

更新了

我发现发生此异常时消息丢失的问题,我看到很多 msg 丢失.我多次测试了这种模式.每次发生此异常时,我都会看到 msg 丢失.同时提高并发性使此问题经常发生.

2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - 通道关闭:连接错误

兔子配置

弹簧:兔米克:地址:host1:5672,host2:5672,host3:5672管理员地址:http://host1:15672,http://host2:15672,http://host3:15672节点:[email protected],[email protected],[email protected]用户名:测试密码:测试虚拟主机:/使用SSL:假ssl 属性:

更新缓存大小增加到 200

我添加了你提供的 xml 并将缓存大小增加到 200.这是处理 100 万和 80k 消息时发生的方式.只有我的 http 客户端并发是 100,其他都是 1.慢慢处理停止的消息之前仍然存在http-client 队列和相同的计数.但我命名通道中的 msg 计数缓慢增加,如每分钟 10 msg,但速度非常慢s3-poller|processor|http-client>queue:batchCacheQueue

在 http 186174 之前队列中的消息没有减少.但是慢慢地消息进入 batchCacheQueue

要模拟的测试用例:

1)我在复合模块中使用带有分离器的 spring 集成 aws-s3 源 |处理器像 xml 解析 |http-client with concurrency 100 >named channel.

2)我认为文件源也可能有效.创建包含百万条记录的单个文件并尝试从文件中提取.

3) 运行 4 到 5 次后,我们看到此异常发生

解决方案

我们发现了频道大量流失时的问题;您需要在兔子缓存连接工厂中增加通道缓存大小.

请参阅此答案以获取解决方法.

我打开了一个 JIRA issue 以便下一版本的 Spring XD 公开在servers.yml 中设置此设置,因此您不必覆盖总线配置文件.

I am using spring xd My stream looks like below and running tests on 3 node container with 1 admin node with rabbit as transport

aws-s3|processor1|http-client|processor2>queue:readyQueue

I have created below tap.

tap1  aws-s3>s3Queue


tap2  processor1>processorQueue1

tap3  http-client>httpQueue

I run below scenarios in my tests:

Scenario1: 5 files of 200k =1 Million records concurrency of http-client=70 and processor2=30

I see 900k message s3Queue

I see 889k message processorQueue1

I see 886k message httpQueue

I see 883k message processorQueue2Messages are lost everywhere and its random

Scenario2:

5 files of 200k =1 Million records and all module concurrency=1

I see 998800 message s3Queue

I see 998760 message processorQueue1

I see 997540 message httpQueue

I see 997530 message processorQueue2

Even this number is random and not consistent

Scenario3

I changed stream as below and concurrency=1 and 5 files of 200k =1 Million records

aws-s3 >testQueue

I get all my messages I run 3 times and no issues.I get all my 1 million messages

scenario4

I changed stream as below and concurrency=1 5 files of 200k =1 Million records

aws-s3 |processor1 >testQueue2

I get all my messages I run 3 times and no issues.I get all my 1 million messages

In scenario4 and scenarion 3 data ingestion is faster and it took 5 min to process 5 million faster and ingestion was faster in rabbit transport queue like 5k msg per sec

In scenario 1 data ingestion was slower even s3 module was pulling the data very slow like 300 to 1000 msg per sec

In scenario 2 s3 pulled data faster but http client was slow like 100 msg per sec but aws-s3 pulled data fast like 3-4k msg per sec.

I am thinking like seeing xd threading is causing issues and i am losing messages.Please can you help me how to solve this issue.

update

Scenario 5

I changed reply-timeout to -1 in http client and thenI lost only 37 msgs

Now again I run 2nd iteration I lost 25000 msgs i see the bellowing containers log when that happened

2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
        at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
        at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
        at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
        at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
        at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
        at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
        at com.sun.proxy.$Proxy1537.send(Unknown Source)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
        at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        ... 93 more
Caused by: java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
        at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
        at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
        ... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
        ... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
        at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
        ... 1 more

2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)


2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~


2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

Updated

I found the issue for message loses when this exception happens i see lot of msg lost.This pattern i tested multiple time.Everytime this exception happens i see msg lost.Also bumping up concurrency makes this issue to occur often.

2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

rabbit configuration

spring:
  rabbitmq:
   addresses: host1:5672,host2:5672,host3:5672
   adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
   nodes: [email protected],[email protected],[email protected]
   username: test
   password: test
   virtual_host: /
   useSSL: false
   sslProperties:

updated with increasing cache size to 200

I added xml provided by you and increased cache size to 200.This is the way happens when processing 1 million and 80 k messages.Only my http client concurrency is 100 all other is 1 .Slowly processing stopped msg are still there before http-client queue and same count.But msg count in my named channel slowly increasing like 10 msg per minute but its very slows3-poller|processor|http-client>queue:batchCacheQueue

Msg not getting decreass in queue before http 186174.But slowly msg are coming in to batchCacheQueue

Test case to simulate:

1)I was using spring integration aws-s3 source with a splitter in composite module | processor like xml parsing |http-client with concurrency 100 >named channel.

2)I think file source might also work.Create single file of million records and try to pull this from file.

3)After some 4 to 5 run we see this exception happening

解决方案

We found an issue when channels are churned a lot; you need to increase the channel cache size in the rabbit caching connection factory.

See this answer for a work-around.

I opened a JIRA issue so that the next version of Spring XD will expose this setting by in servers.yml so you don't have to override the bus configuration file.

这篇关于spring xd 在处理大量数据时丢失消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 03:36