本文介绍了将多个文件聚合到一个主文件以进行作业处理的文件入站通道适配器弹簧集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个代码来将多个文件合并为一个主文件.问题出在 int-transformer 中,尽管我在文件入站通道适配器的复合过滤器中聚合了文件列表,但我一次只获取一个文件.复合过滤器中的 文件列表 大小是正确的,但在 Transformer bean 中,文件列表 大小始终为 1,并且过滤器无法获得正确的列表大小聚合文件.

I have written a code to combined multiple files into one single Master file.The issue is with int-transformer where I am getting one file at a time although I have aggregated List of File in composite Filter of File inbound-channel-adapter. The List of File size in composite filter is correct but in Transformer bean the List of File size is always one and not getting the correct list size aggregated file by the filter.

这是我的配置:

<!-- Auto Wiring -->
<context:component-scan base-package="com.nt.na21.nam.integration.*" />
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger"
    level="DEBUG" />
<int:wire-tap channel="logger" />

<!-- Aggregating the processed Output for OSS processing -->

<int:channel id="networkData" />
<int:channel id="requests" />

<int-file:inbound-channel-adapter id="pollProcessedNetworkData"
    directory="file:${processing.files.directory}" filter="compositeProcessedFileFilter"
    channel="networkData">
    <int:poller default="true" cron="*/20 * * * * *" />

</int-file:inbound-channel-adapter>

<bean id="compositeProcessedFileFilter"
    class="com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine" />

<int:transformer id="aggregateNetworkData"
    input-channel="networkData" output-channel="requests">
    <bean id="networkData" class="com.nt.na21.nam.integration.helper.CSVFileAggregator">
    </bean>
</int:transformer>

CompositeFileListFilterForBaseLine:

public class CompositeFileListFilterForBaseLine implements FileListFilter<File> {

    private final static Logger LOG = Logger
            .getLogger(CompositeFileListFilterForBaseLine.class);

    @Override
    public List<File> filterFiles(File[] files) {
        List<File> filteredFile = new ArrayList<File>();
        int index;
        String fetchedFileName = null;
        String fileCreatedDate = null;
        String todayDate = DateHelper.toddMM(new Date());
        LOG.debug("Date - dd-MM: " + todayDate);

        for (File f : files) {
            fetchedFileName = StringUtils.removeEnd(f.getName(), ".csv");
            index = fetchedFileName.indexOf("_");

            // Add plus one to index to skip underscore
            fileCreatedDate = fetchedFileName.substring(index + 1);
            // Format the created file date
            fileCreatedDate = DateHelper.formatFileNameDateForAggregation(fileCreatedDate);
            LOG.debug("file created date: " + fileCreatedDate + " today Date: "
                    + todayDate);
            if (fileCreatedDate.equalsIgnoreCase(todayDate)) {
                filteredFile.add(f);
                LOG.debug("File added to List of File: " + f.getAbsolutePath());
            }
        }
        LOG.debug("SIZE: " + filteredFile.size());
        LOG.debug("filterFiles method end.");
        return filteredFile;
    }

}

CSVFileAggregator

public class CSVFileAggregator {

    private final static Logger LOG = Logger.getLogger(CSVFileAggregator.class);

    private int snePostion;

    protected String masterFileSourcePath=null;

    public File handleAggregateFiles(List<File> files) throws IOException {
        LOG.debug("materFileSourcePath: " + masterFileSourcePath);
        LinkedHashSet<String> allAttributes = null;
        Map<String, LinkedHashSet<String>> allAttrBase = null;
        Map<String, LinkedHashSet<String>> allAttrDelta = null;
        LOG.info("Aggregator releasing [" + files.size() + "] files");
    }
}

日志输出:

INFO : com.nt.na21.nam.integration.aggregator.NetFileAggregatorClient - NetFileAggregator context initialized. Polling input folder...
INFO : com.nt.na21.nam.integration.aggregator.NetFileAggregatorClient - Input directory is: D:\Projects\csv\processing
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - Date - dd-MM: 0103
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - file created date: 0103 today Date: 0103
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - File added to List of File: D:\Projects\NA21\NAMworkspace\na21_nam_integration\csv\processing\file1_base_0103.csv
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - file created date: 0103 today Date: 0103
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - File added to List of File: D:\Projects\NA21\NAMworkspace\na21_nam_integration\csv\processing\file2_base_0103.csv
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - **SIZE: 2**
DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - filterFiles method end.
DEBUG: org.springframework.integration.file.FileReadingMessageSource - Added to queue: [csv\processing\file1_base_0103.csv, csv\processing\file2_base_0103.csv]
INFO : org.springframework.integration.file.FileReadingMessageSource - Created message: [GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]]
DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'networkData', message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.handler.LoggingHandler - csv\processing\file2_base_0103.csv
DEBUG: org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'logger', message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler@606f8b2b received message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]
DEBUG: com.nt.na21.nam.integration.helper.CSVFileAggregator - materFileSourcePath: null
INFO : com.nt.na21.nam.integration.helper.CSVFileAggregator - **Aggregator releasing [1] files**

有人可以在这里帮助我确定过滤器的问题,并且同样没有收集用于转换吗?

提前致谢.

问题出在 int:aggregator 上,因为我不确定如何调用.我在我的设计中早些时候使用过它,但它根本没有被执行.感谢您的快速回复.

The issue is with int:aggregator as I am not sure how to invoke. I have used this earlier in my design but it didn't get executed at all. Thanks for the quick response.

针对这个问题,我编写了一个 FileScaner 实用程序,它将扫描文件夹中的所有文件,并且聚合工作正常.

For this problem I have written a FileScaner utility which will scan all the files in Folder inside and aggregation is working perfectly.

请找到无法使用聚合器的配置,因此我将设计拆分为两个轮询器,首先生成所有 CSV 文件,然后收集并聚合它.

Please find the config with Aggregator which didn't works, hence I splited the design by two poller first produced all the CSV file(s) and second collect it and aggregate it.

<!-- Auto Wiring -->
<context:component-scan base-package="com.bt.na21.nam.integration.*" />
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger" level="DEBUG" />
<int:wire-tap channel = "logger" />

<int:channel id="fileInputChannel" datatype="java.io.File" />
<int:channel id="error" />
<int:channel id="requestsCSVInput" />

<int-file:inbound-channel-adapter id="pollNetworkFile"
    directory="file:${input.files.directory}" channel="fileInputChannel"
    filter="compositeFileFilter" prevent-duplicates="true">
    <int:poller default="true" cron="*/20 * * * * *"
        error-channel="error" />
</int-file:inbound-channel-adapter>

<bean id="compositeFileFilter"
    class="com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForTodayFiles" />

<int:transformer id="transformInputZipCSVFileIntoCSV"
    input-channel="fileInputChannel" output-channel="requestsCSVInput">
    <bean id="transformZipFile"
        class="com.nt.na21.nam.integration.file.net.NetRecordFileTransformation" />
</int:transformer>

<int:router ref="docTypeRouter" input-channel="requestsCSVInput"
    method="resolveObjectTypeChannel">
</int:router>

<int:channel id="Vlan" />
<int:channel id="VlanShaper" />
<int:channel id="TdmPwe" />

<bean id="docTypeRouter"
    class="com.nt.na21.nam.integration.file.net.DocumentTypeMessageRouter" />

<int:service-activator ref="vLanMessageHandler" output-channel="newContentItemNotification" input-channel="Vlan" method="handleFile" />

<bean id="vLanMessageHandler" class="com.nt.na21.nam.integration.file.handler.VLanRecordsHandler" />

<int:service-activator ref="VlanShaperMessageHandler" output-channel="newContentItemNotification" input-channel="VlanShaper" method="handleFile" />

<bean id="VlanShaperMessageHandler" class="com.nt.na21.nam.integration.file.handler.VlanShaperRecordsHandler" />

<int:service-activator ref="PweMessageHandler" output-channel="newContentItemNotification" input-channel="TdmPwe" method="handleFile" />

<bean id="PweMessageHandler" class="com.nt.na21.nam.integration.file.handler.PseudoWireRecordsHandler" />


<int:channel id="newContentItemNotification" />

<!-- Adding for aggregating the records in one place for OSS output -->

 <int:aggregator input-channel="newContentItemNotification" method="aggregate"
    ref="netRecordsResultAggregator" output-channel="net-records-aggregated-reply"
    message-store="netRecordsResultMessageStore"
    send-partial-result-on-expiry="true">
 </int:aggregator>

 <int:channel id="net-records-aggregated-reply" />
 <bean id="netRecordsResultAggregator" class="com.nt.na21.nam.integration.aggregator.NetRecordsResultAggregator" />

 <!-- Define a store for our network records results and set up a reaper that will
    periodically expire those results. -->
<bean id="netRecordsResultMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />


 <int-file:outbound-channel-adapter id="filesOut"
                               directory="file:${output.files.directory}"
                               delete-source-files="true">

 </int-file:outbound-channel-adapter>

代码工作正常,直到路由到下面的所有频道:

The code is working fine till the routed to all the channel below:

<int:channel id="Vlan" />
<int:channel id="VlanShaper" />
<int:channel id="TdmPwe" />

我试图从包含 CSV 数据的上述通道的进程返回 LinkedHashSet,我需要汇总所有合并

I am trying to return LinkedHashSet from the Process of the above channel which contains CSV data and I need to aggregate all the merge

LinkedHashSet vAllAttributes to get the master output CSV file.
List<String> masterList = new ArrayList<String>(vAllAttributes);
Collections.sort(masterList);

推荐答案

好吧,看来您误解了一点 行为.它的本质是为通道生成每条消息一个文件.它不依赖于 FileListFilter 的逻辑.就像:

Well, looks like you misunderstood a bit <int-file:inbound-channel-adapter> behaviour. Its nature is producing one file per message to the channel. It doesn't depend on the logic of the FileListFilter. The is like:

  1. FileReadingMessageSource 使用 DirectoryScanner 将文件从提供的目录检索到内部 toBeReceived Queue

  1. The FileReadingMessageSource uses DirectoryScanner to retrieve files from the provided directory to an internal toBeReceived Queue

由于我们扫描文件目录,DirectoryScanner 的设计看起来像 List.listFiles(文件目录).我想这让你误入歧途了.

Since we scan the directory for the files the design for the DirectoryScanner looks like List<File> listFiles(File directory). I guess this has led you astray.

此后,filter 应用于原始文件列表并仅返回适当的文件.

After that the filter is applied to the original file list and returns only appropriate files.

它们被存储到 toBeReceived Queue.

并且只有在此之后 FileReadingMessageSource 从队列中轮询一个项目以构建输出通道的消息.

And only after that the FileReadingMessageSource polls an item from the queue to build message for the output channel.

要实现您的 aggregation 要求,您确实应该在 之间使用 > 和您的 .

To achieve your aggregation requirements you really should use an <aggregator> between <int-file:inbound-channel-adapter> and your <int:transformer>.

您可以使用 max-messages-per-poll 标记 ="-1" 在单个计划任务期间真正轮询所有文件.但无论如何,您的 filter 返回的文件都会有尽可能多的消息.

You can mark the <poller> of the <int-file:inbound-channel-adapter> with max-messages-per-poll="-1" to really poll all your files during the single scheduled task. But anyway there will as much messages as your filter returns files.

之后你必须接受的一些技巧:

After that you must accept some tricks for the <aggregator>:

  1. correlationKey - 允许将您的文件消息组合到单个 MessageGroup 中,以便为进一步的 .由于我们没有来自 <int-file:inbound-channel-adapter> 的任何上下文,但我们知道所有消息都是由单个轮询任务提供的,并且带有预定的线程(您不t 在 上使用 task-executor,因此我们可以简单地使用 correlationKey 作为:

  1. correlationKey - to allow your file messages to be combined to the single MessageGroup for release a single message for the further <transformer>. Since we don't have any context from <int-file:inbound-channel-adapter>, but we know that all messages are provided by the single polling task and withing scheduled Thread (you don't use task-executor on the <poller>), hence we can simply use correlationKey as:

correlation-strategy-expression="T(Thread).currentThread().id"

  • 但这还不够,因为无论如何我们最终都应该以某种方式产生单个消息.不幸的是,我们不知道文件的数量(但是您可以通过自定义 FileListFilter 中的 ThreadLocal 来做到这一点)以允许 ReleaseStrategy为聚合阶段返回 true.因此我们从来没有正常的group completion.但是我们可以 forceRelease 来自聚合器的未完成组使用 上的 MessageGroupStoreReapergroup-timeout>.

  • But the is not enough, because we should produce somehow the single message in the end anyway. Unfortunately we don't know the number of files (however you can do that via the ThreadLocal from your custom FileListFilter) to allow the ReleaseStrategy to return true for the aggregate phase. Hence we never have the normal group completion. But we can forceRelease uncompleted groups from the aggregator to use the MessageGroupStoreReaper or group-timeout on the <aggregator>.

    除了前面的条款之外,您还应该在 上提供这些选项:

    In addition to the previous clause you should supply these options on the <aggegator>:

    send-partial-result-on-expiry="true"
    expire-groups-upon-completion="true"
    

  • 仅此而已.没有理由提供任何自定义的聚合函数(ref/methodexpression),因为默认情况下只需使用组中所有消息的 payloadsList 构建一条消息.这适用于您的 CSVFileAggregator.虽然你可以避免 和这个 CSVFileAggregator 用于聚合函数.

    And that's all. There is no reason to provide any custom aggregation function (ref/method or expression), because the default on just build a single message with the List of payloads from all messages in group. And that is appropriate for your CSVFileAggregator. Although you can avoid that <transformer> and this CSVFileAggregator for the aggregation function.

    希望我清楚

    这篇关于将多个文件聚合到一个主文件以进行作业处理的文件入站通道适配器弹簧集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

    09-06 04:21