本文介绍了将 JobLaunchingGateway 与 spring-batch 和 spring-integration 一起使用抛出 DestinationResolutionException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想定期扫描目录以查找与特定模式匹配的文件,并为找到的文件启动作业.

I want to scan a directory periodically for files matching a specific pattern and launch a job for the found file.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Autowired
    public BatchConfiguration(final JobBuilderFactory jobBuilderFactory, final StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequestTransformer(final Job myJob) {
        return new FileMessageToJobRequest(myJob, "input.file.name");
    }

    @Bean
    public MessageSource<File> fileMessageSource() {
        final FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
        fileReadingMessageSource.setDirectory(directory));
        fileReadingMessageSource.setFilter(new CompositeFileListFilter<>(Arrays.asList(
                new AcceptOnceFileListFilter<>(),
                new SimplePatternFileListFilter(pattern)
        )));
        return fileReadingMessageSource;
    }

    @Bean
    public IntegrationFlow pollingFlow(final FileMessageToJobRequest fileMessageToJobRequest, final JobLaunchingGateway jobLaunchingGateway) {
        return IntegrationFlows.from(fileMessageSource(),
                c -> c.poller(Pollers.fixedDelay(10, TimeUnit.SECONDS)))
                .transform(fileMessageToJobRequest)
                .handle(jobLaunchingGateway)
                .get();
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(final JobLauncher jobLauncher) {
        return new JobLaunchingGateway(jobLauncher);
    }

    // Step definition omitted for readability

    @Bean(name = "myJob")
    public Job myJob(final JobCompletionNotificationListener listener, final @Qualifier("step1") Step step) {
        return jobBuilderFactory.get("myJob")
                .preventRestart()
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(myStep)
                .end()
                .build();
    }
}

一切正常,但我在作业执行后不断收到以下异常:

Everything works but I keep getting the following exception after job execution:

2017-06-28 16:55:20.510 ERROR 5480 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=JobLaunchRequest: myJob, parameters={input.file.name=/tmp/example.csv}, headers={id=99f4784a-1531-7d64-5abd-4ffe23455c45, timestamp=1498661720296}]
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:287)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 41 more

如何在执行作业后阻止 JobLaunchingGatewayMessage 发送到输出通道?

How can I stop JobLaunchingGateway from posting Messages to output-channel after executing a Job?

推荐答案

那是因为 JobLaunchingGateway 是 reqeust/reply 组件,它需要一个 outputChannelreplyChannel 标题.同时,您的配置中只有这个:

That's because JobLaunchingGateway is reqeust/reply component, which requires an outputChannel or replyChannel header. At the same time you have only this in your config:

 .handle(jobLaunchingGateway)
 .get();

如果你对那里的回复不感兴趣,你可以在最后加上这个:

If you are not interested in the reply from there, you can just add this in the end:

 .handle(jobLaunchingGateway)
 .channel("nullChannel")
 .get();

这篇关于将 JobLaunchingGateway 与 spring-batch 和 spring-integration 一起使用抛出 DestinationResolutionException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-06 05:17