我的Spring集成流程如下所示,

  • 从目录
  • 读取文件
  • 向消息
  • 添加名为errorChannel:'exceptionChannel'的 header
  • 执行业务逻辑
  • 打印输出到“成功目录”

  • 如果第3步出现错误,则将异常发送到exceptionChannel,并将其写入“失败的目录”。
    我希望将错误流委托(delegate)给单独的线程。

    我所拥有的:
    如果有5个文件,而第三个文件有错误,

    ->将2个文件写入成功目录

    -> 1个文件被写入失败的目录。

    ->错误文件之后,流程停止。

    我需要什么:
    如果有5个文件,而第三个文件有错误,

    ->必须将前2个文件写入成功目录

    ->第三个文件必须写入失败的目录

    ->最后2个文件必须写入成功目录

    成功流程代码:
    @Bean(name="exceptionChannel")
    MessageChannel exceptionChannel() {
        return MessageChannels.direct()
                .get();
    }
    
    @Bean
    public IntegrationFlow migrateInputToOutput() {
        return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR))),
                .enrichHeaders(h -> h.header("errorChannel", "exceptionChannel", true))
                .handle(someBusinessLogic) // ANY EXCEPTION HERE IS SENT TO exceptionChannel
                .handle(Files.outboundAdapter(new File(OUTPUT_SUCCESS_DIR))
                .get();
    }
    

    处理错误的代码:
    @Bean
    public IntegrationFlow handleErrorInMigration() {
        return IntegrationFlows.from("exceptionChannel"),
                .handle(errorLogicToPrintException)
                .handle(Files.outboundAdapter(new File(OUTPUT_ERROR_DIR))
                .get();
    }
    

    最佳答案

    Files.inboundAdapter()是可轮询的源,因此在代码中配置了类似Poller的地方。那个有一个errorChannel()选项。这样做真的更好。如果您的轮询器是全局轮询器,并且您不想修改它,则最好为此端点配置一个轮询器:

    IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)),
            e -> e.poller(p -> p.fixedDelay().errorChannel(exceptionChannel())))
    

    这样,您将不会影响所有其他轮询端点。

    要将错误处理移至其他线程,您需要考虑将exceptionChannel设置为ExecutorChannel:
    @Bean(name="exceptionChannel")
    MessageChannel exceptionChannel() {
        return MessageChannels.executor(myExecutor)
                .get();
    }
    

    10-07 19:06
    查看更多