本文介绍了考虑max-message-per poll和Cron的FTP入站通道适配器的轮询方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有UC,我需要从ftp位置选择文件并将其放入服务器位置
我使用ftp-inbound-channel-adapter(Spring集成 - 2.0.4)来实现它。
下面是我在xml中的配置

 < bean id =ftpAASessionFactoryclass =org.springframework。 integration.ftp.session.DefaultFtpSessionFactory> 
< property name =hostvalue =$ {ftp.session.host}/>
< property name =portvalue =$ {ftp.session.port}/>
< property name =usernamevalue =$ {ftp.session.username}/>
< property name =passwordvalue =$ {ftp.session.password}/>
< property name =clientModevalue =0/>
< property name =fileTypevalue =0/>
< / bean>

$ b< ftp:入站通道适配器ID =ftpAA入站
通道=ftpChannelsession-factory =ftpAASessionFactorycharset =UTF-8
auto-create-local-directory =falsedelete-remote-files =true
remote-directory =$ {ftp.source.location}local-directory =file:// $ {ftp.target.location}
>
< / int:poller>

< / ftp:入站通道适配器>

< int:channel id =ftpChannel>
< int:queue />
< int:interceptors>
< / int:interceptors>
< / int:channel>

level =DEBUGlog-full-message =true/>

level =ERRORlog-full-message =true/>

我配置了max-messages-per-poll为5,并且每隔一分钟就进行一次轮询(使用cron表达式)。

我的问题是,如果我们在ftp位置有6个文件,那么所有这6个文件都会首先被轮询到服务器位置根据每个轮询的最大消息数= 5,它应该只从Ftp位置选择5个文件),并且仅为5个文件形成有效载荷。



我希望只有5个文件在第一次轮询时被传输到我的服务器,第二次轮询时应该选择最后一个轮询。



请建议解决方案
TIA



当ftp位置有6个文件时,PFB记录日志

  [CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158)|连接到服务器[prgrear01.group.root.ad:21] 
*** [CRA] [01/03/2017 12:38:00]信息[task-scheduler-8] FtpSession.read(79) |文件已成功传输:/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt***
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory (219)|删除/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt
[CRA] [01/03/2017 12:38:00]信息[task-scheduler-8] FtpSession.read(79)|文件已成功传输:/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) |删除/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01]信息[task-scheduler-8] FtpSession.read(79)|文件已成功传输:/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) |删除/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01]信息[task-scheduler-8] FtpSession.read(79)|文件已成功传输:/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) |删除/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01]信息[task-scheduler-8] FtpSession.read(79)|文件已成功传输:/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) |删除/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01]信息[task-scheduler-8] FtpSession.read(79)|文件已成功传输:/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) |删除/FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
*** [CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FileReadingMessageSource.scanInputDirectory(272)|添加到队列:[d:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt,d:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt,d:\applications\files\local \ABC_809_2015_02_01_07_50_01_102_01.txt,D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt,D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt,D:\applications\files\ local_\\ABC_810_2017_02_22_07_50_01_102_02.txt]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260)|创建消息:[[Payload = D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt] [Headers = {timestamp = 1488352081732,id = 46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91)|投票导致消息:[Payload = D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt] [Headers = {timestamp = 1488352081732,id = 46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] *** QueueChannel.preSend(224)|预发送频道'ftpChannel',消息:[Payload = D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt] [Headers = {timestamp = 1488352081732,id = 46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224)| preSend on channel'debugLogger',message:[Payload = D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt] [Headers = {timestamp = 1488352081732,id = 46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67)| org.springframework.integration.handler.LoggingHandler#0接收到的消息:[有效载荷= d:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt] [接头= {时间戳= 1488352081732,ID = 46536ab1-c0bd-4cf4- 9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141)| [Payload = D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt] [Headers = {timestamp = 1488352081732,id = 46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01 / 03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237)|通道'debugLogger'上的postSend(sent = true),消息:[Payload = D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt] [Headers = {timestamp = 1488352081732,id = 46536ab1-c0bd-4cf4- 9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt]上的postSend(sent = true),[Headers = {timestamp = 1488352081732,id = 46536ab1-c0bd-4cf4-信息[task-scheduler-8] FileReadingMessageSource.receive(260)|创建消息:[[Payload = D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081784,id = 336045cf-0abd-4b1d-b698-d82c230e4b1f}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91)|轮询导致消息:[有效载荷= d:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [接头= {时间戳= 1488352081784,ID = 336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224)| preSend on channel'ftpChannel',message:[Payload = D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081784,id = 336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224)| preSend on channel'debugLogger',message:[Payload = D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081784,id = 336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67)| org.springframework.integration.handler.LoggingHandler#0接收到的消息:[有效载荷= d:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [接头= {时间戳= 1488352081784,ID = 336045cf-0abd-4b1d- b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141)| [Payload = D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081784,id = 336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01 / 03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [邮件地址= {timestamp = 1488352081784,id = 336045cf-0abd-4b1d- b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt] [邮件地址= {timestamp = 1488352081784,id = 336045cf-0abd-4b1d- b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01]信息[task-scheduler-8] FileReadingMessageSource.receive(260)|创建消息:[[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt] [Headers = {timestamp = 1488352081786,id = 75029ba5-4857-4a4e-832f-b8c657b539e3}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91)|投票导致消息:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt] [Headers = {timestamp = 1488352081786,id = 75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224)| preSend on channel'ftpChannel',message:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt] [Headers = {timestamp = 1488352081786,id = 75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224)| preSend on channel'debugLogger',message:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt] [Headers = {timestamp = 1488352081786,id = 75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67)| org.springframework.integration.handler.LoggingHandler#0 received message:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt] [Headers = {timestamp = 1488352081786,id = 75029ba5-4857-4a4e- 832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141)| [有效载荷= d:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt] [接头= {时间戳= 1488352081786,ID = 75029ba5-4857-4a4e-832F-b8c657b539e3}]
[CRA] [01 / 03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt] postSend(sent = true)on channel'debugLogger'[Headers = {timestamp = 1488352081786,id = 75029ba5-4857-4a4e- 832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt]上的postSend(sent = true),[header = {timestamp = 1488352081786,id = 75029ba5-4857-4a4e- 832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01]信息[task-scheduler-8] FileReadingMessageSource.receive(260)|创建消息:[[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt] [Headers = {timestamp = 1488352081789,id = edea505f-37a2-4c96-8034-b3c74f55f9de}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91)|投票导致消息:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt] [Headers = {timestamp = 1488352081789,id = edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224)| preSend on channel'ftpChannel',message:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt] [Headers = {timestamp = 1488352081789,id = edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224)| preSend信道 'debugLogger',消息:[有效载荷= d:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt] [接头= {时间戳= 1488352081789,ID = edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67)| org.springframework.integration.handler.LoggingHandler#0 received message:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt] [Headers = {timestamp = 1488352081789,id = edea505f-37a2-4c96- 8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141)| [Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt] [Headers = {timestamp = 1488352081789,id = edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01 / 03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt] [邮件地址= {timestamp = 1488352081789,id = edea505f-37a2-4c96- 8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt]上的postSend(sent = true),[header = {timestamp = 1488352081789,id = edea505f-37a2-4c96- 8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01]信息[task-scheduler-8] FileReadingMessageSource.receive(260)|创建消息:[[Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846-9001-011796d92aa0}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91)|投票导致消息:[Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224)| preSend on channel'ftpChannel',message:[Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224)| preSend on channel'debugLogger',message:[Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67)| org.springframework.integration.handler.LoggingHandler#0 received message:[Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846- 9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141)| [Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01 / 03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt] postSend(sent = true)on channel'debugLogger'[Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846-调用[task-scheduler-8] QueueChannel.postSend(237)| [0] [0]邮件:[Payload = D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt]上的postSend(sent = true),[Headers = {timestamp = 1488352081792,id = 5123c737-02d1-4846- 9001-011796d92aa0}]
[CRA] [01/03/2017 12:40:00]信息[task-scheduler-8] FileReadingMessageSource.receive(260)|创建消息:[[Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91)|投票导致消息:[Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.preSend(224)| preSend on channel'ftpChannel',message:[Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.preSend(224)| preSend on channel'debugLogger',message:[Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67)| org.springframework.integration.handler.LoggingHandler#0 received message:[Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981- 9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141)| [Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] [Headers = {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01 / 03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] [邮件地址= {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981- 9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.postSend(237)|邮件:[Payload = D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt] postSend(sent = true)在通道'ftpChannel'上,[Headers = {timestamp = 1488352200005,id = 7a0a0ea6-e573-4981- 9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158)|连接到服务器[prgrear01.group.root.ad:21]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91)|投票导致消息:null
[CRA] [01/03/2017 12:40:00]调试[task-scheduler-8] SourcePollingChannelAdapter.doPoll(101)|在投票期间收到没有消息,返回'fal

.......

解决方案

该日志仅显示4个文件;它看起来像每个轮询配置了3条消息,并且清楚地显示了3个文件在12:38和1和12:40发送(没有找到第五个文件)。

 调查结果为消息:null 

编辑 b
$ b

这是另一种实现所需结果的方法(使用出站网关)。



如果您更喜欢使用XML配置,则会在 - 你将不得不在<$ c之间插入文件限制器

pre
$ b $ public class So42528316Application {

public static void main(String [] args)throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42528316Application.class,args);
尝试{
context.getBean(So42528316Application.class).runDemo();
}
finally {
context.close();
}
}

@Autowired
私有FileGateway fetchAndProcess;

private void runDemo()抛出异常{
Collection< Boolean> rmResults = this.fetchAndProcess.processFilesAt(so42528316);
while(rmResults!= null){
System.out.println(Processed+ rmResults.size()+files);
Thread.sleep(10_000);
rmResults = this.fetchAndProcess.processFilesAt(so42528316);
}
System.out.println(没有更多文件可用);
}

@MessagingGateway(defaultRequestChannel =flow.input,defaultReplyTimeout =0)
public interface FileGateway {

Collection< Boolean> processFilesAt(String pattern);


$ b @Bean
public DefaultFtpSessionFactory sessionFactory(){
DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost(10.0.0.3);
factory.setUsername(ftptest);
factory.setPassword(ftptest);
factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
退货工厂;
}

@Bean
public IntegrationFlow flow(){
return f - > f
.handle(Ftp.outboundGateway(sessionFactory(),ls,payload))
.handle(so42528316Application,limitFiles)
.split()
.handle(ftp.outboundGateway(sessionFactory(),get,
payload.remoteDirectory +'/'+ payload.filename)
.localDirectory(new File(/ tmp, so42528316)))
.handle(so42528316Application,process)
.handle(Ftp.outboundGateway(sessionFactory(),rm,
headers ['file_remoteDirectory' ] +'/'+ headers ['file_remoteFile']))
.aggregate();
}

public List< FtpFileInfo> limitFiles(List< FtpFileInfo> files){
//在这里添加你想要的任何逻辑,例如检查文件是否已经在本地磁盘上。
if(files.size()== 0){
return null;

else if(files.size()> 2){
System.out.println(将提取列表从+ files.size()+减少到2)) ;
返回files.stream()。limit(2).collect(Collectors.toList());
}
其他{
返回文件;



public String process(File file){
System.out.println(Processing+ file);
file.delete();
返回file.getName();
}

}

结果:

 将提取列表从3减少到2 
处理/tmp/so42528316/bar.txt
处理/ tmp / so42528316 / baz .txt
处理2个文件
处理/tmp/so42528316/foo.txt
处理1个文件
没有更多文件可用


I have UC where I need to pick the files from ftp location and place it into the server locationI am using ftp-inbound-channel-adapter (Spring integration - 2.0.4) for achieving it .Below is the configuration in my xml

 <bean id="ftpAASessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
          <property name="host" value="${ftp.session.host}" />
          <property name="port" value="${ftp.session.port}" />
          <property name="username" value="${ftp.session.username}" />
          <property name="password" value="${ftp.session.password}" />
          <property name="clientMode" value="0" />
          <property name="fileType" value="0" />
   </bean>


   <ftp:inbound-channel-adapter id="ftpAAInbound"
          channel="ftpChannel" session-factory="ftpAASessionFactory" charset="UTF-8"
          auto-create-local-directory="false" delete-remote-files="true"
          remote-directory="${ftp.source.location}" local-directory="file://${ftp.target.location}"
          >
          <int:poller max-messages-per-poll="5" cron="0 */2 * ? * *">
          </int:poller>

   </ftp:inbound-channel-adapter>

   <int:channel id="ftpChannel">
          <int:queue />
          <int:interceptors>
                 <int:wire-tap channel="debugLogger" />
          </int:interceptors>
   </int:channel>

   <int:logging-channel-adapter id="debugLogger"
          level="DEBUG" log-full-message="true" />

   <int:logging-channel-adapter id="errorLogger"
          level="ERROR" log-full-message="true" />

I have configured max-messages-per-poll as 5 and polling to be done at every even minute (using cron expression).

My problem is that if we have 6 files in the ftp location , all the 6 files are transferred to the server location at first poll itself (according to max message per poll = 5 , it should only pick 5 files from Ftp location ) and the payload is formed for only 5 files.

I want only 5 files to be transferred to my server at first polling and on the second polling it should pick the last one

Please suggest the solutionTIA

PFB the logs when there were 6 files in ftp location

    [CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
***[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt***
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt
[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
***[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FileReadingMessageSource.scanInputDirectory(272) | Added to queue: [D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt, D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt, D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8]*** QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:40:00] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: null
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(101) | Received no Message during the poll, returning 'fal

.......

解决方案

That log only shows 4 files; it looks like it was configured for 3 messages per poll and it clearly shows 3 files were sent at 12:38 and 1 and 12:40 (with no fifth file found).

Poll resulted in Message: null

EDIT

Here is another way to achieve your desired result (using outbound gateways). This version uses the Java DSL.

If you prefer to use XML configuration, a similar flow is provided (in XML) in the ftp-sample - you would have to insert the file limiter between the ls gateway and splitter.

@SpringBootApplication
public class So42528316Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42528316Application.class, args);
        try {
            context.getBean(So42528316Application.class).runDemo();
        }
        finally {
            context.close();
        }
    }

    @Autowired
    private FileGateway fetchAndProcess;

    private void runDemo() throws Exception {
        Collection<Boolean> rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        while (rmResults != null) {
            System.out.println("Processed " + rmResults.size() + " files");
            Thread.sleep(10_000);
            rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        }
        System.out.println("No more files available");
    }

    @MessagingGateway(defaultRequestChannel = "flow.input", defaultReplyTimeout = "0")
    public interface FileGateway {

        Collection<Boolean> processFilesAt(String pattern);

    }

    @Bean
    public DefaultFtpSessionFactory sessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("10.0.0.3");
        factory.setUsername("ftptest");
        factory.setPassword("ftptest");
        factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        return factory;
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .handle(Ftp.outboundGateway(sessionFactory(), "ls", "payload"))
                .handle("so42528316Application", "limitFiles")
                .split()
                .handle(Ftp.outboundGateway(sessionFactory(), "get",
                                "payload.remoteDirectory + '/' + payload.filename")
                        .localDirectory(new File("/tmp", "so42528316")))
                .handle("so42528316Application", "process")
                .handle(Ftp.outboundGateway(sessionFactory(), "rm",
                        "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']"))
                .aggregate();
    }

    public List<FtpFileInfo> limitFiles(List<FtpFileInfo> files) {
        // Add any logic you want here, e.g. check if file already on local disk.
        if (files.size() == 0) {
            return null;
        }
        else if (files.size() > 2) {
            System.out.println("Reducing fetch list from " + files.size() + " to 2");
            return files.stream().limit(2).collect(Collectors.toList());
        }
        else {
            return files;
        }
    }

    public String process(File file) {
        System.out.println("Processing " + file);
        file.delete();
        return file.getName();
    }

}

Result:

Reducing fetch list from 3 to 2
Processing /tmp/so42528316/bar.txt
Processing /tmp/so42528316/baz.txt
Processed 2 files
Processing /tmp/so42528316/foo.txt
Processed 1 files
No more files available

这篇关于考虑max-message-per poll和Cron的FTP入站通道适配器的轮询方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-13 13:28