MessagePublishingErrorHandler

MessagePublishingErrorHandler

本文介绍了Spring集成流程中的错误处理实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 spring 集成流程,涉及异步执行、从网关返回值到控制器、返回值后继续集成流程.

I have a spring integration flow that involves async execution, returning value from gateway to controller, continuing integration flow after returning a value.

这里是网关:

@MessagingGateway
public interface GW {

    @Gateway(requestChannel = "f.input")
    Task input(Collection<MessengerIncomingRequest> messages);

}

这里是流程:

@Bean
IntegrationFlow jFlow() {
        return IntegrationFlows.from(
        MessageChannels.executor("f.input", executor()))
        .split()
        .channel(MessageChannels.executor(executor()))
        .transform(transformer)
        .channel(routerChannel())
        .get();
}

@Bean
ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        ...
        return pool;
}

@Bean
MessageChannel routerChannel() {
        return MessageChannels
        .publishSubscribe("routerChannel", executor())
        .get();
}

@Bean
IntegrationFlow routerChannelFlow() {
        return IntegrationFlows
        .from(routerChannel())
        .publishSubscribeChannel(s -> s
        .subscribe(f -> f.bridge(null))
        .subscribe(process()))
        .get();
}

@Bean
IntegrationFlow process() {
        return f ->
        f.route(p -> p.getKind().name(),
        m -> m.suffix("Channel")
        .channelMapping(TaskKind.CREATE.name(), "create")
        ....
}

@Bean
IntegrationFlow createFlow() {
        return IntegrationFlows.from(
        MessageChannels.direct("createChannel"))
        .handle(routerService)
        .get();
}

如何为整个流程定义错误处理程序?最佳做法是什么?我知道我可以为网关方法调用放置一个 try/catch 块,但它只会捕获 jFlow 流中发生的异常,用于处理 channel(routerChannel()).

How can I define an error handler for the whole flow? What are the best practices? I know I can put a try/catch block for the gateway method call, but it will only catch exceptions that occur in jFlow flow for everything that comes before channel(routerChannel()).

我如何处理其余流程的错误?还是整个流程?

How can I handle errors for the rest of the flow? Or for the entire flow?

更新

我为 publishSubscribeChannel 添加了错误处理程序

I added error handler for publishSubscribeChannel

@Bean
IntegrationFlow routerChannelFlow() {
    return IntegrationFlows
            .from(routerChannel())
            .publishSubscribeChannel(s -> s
                    .subscribe(f -> f.bridge(null))
                    .subscribe(process())
                    .errorHandler(errorHandler))
            .get();
}

但它似乎没有帮助,因为如果出现异常,我会收到以下错误:

but it does not seem to help, because in case of exception I get the following error:

cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:

并且我的错误处理程序没有被调用.

and my error handler does not get called.

更新

根据加里的回答,我尝试了这段代码:

According to Gary's answer I tried this code:

@Bean
IntegrationFlow jFLow() {
    return IntegrationFlows.from(
            MessageChannels.executor("f.input", executor()))
            .split()
            .channel(MessageChannels.executor(executor()))
            .transform(transformer)
            .channel(routerChannel())
            .get();
}

@Bean
IntegrationFlow exceptionOrErrorFlow() {
    return IntegrationFlows.from(
            MessageChannels.direct("exceptionChannel"))
            .handle(errorHandler, "handleError")
            .get();
}

    @Bean
MessageChannel exceptionChannel() {
    return MessageChannels.direct("exceptionChannel")
            .get();
}

@Bean
IntegrationFlow process() {
        return f ->
        f.enrichHeaders((spec) ->
                    spec.header("errorChannel", "exceptionChannel", true))
        f.route(p -> p.getKind().name(),
        m -> m.suffix("Channel")
        .channelMapping(TaskKind.CREATE.name(), "create")
        ....
}

@MessagingGateway(errorChannel = "exceptionChannel")

再次编辑后,我将 exceptionChannel 添加到网关,并将丰富的标头移动到我的流程的第二个分支(异步).如果在流程的同步部分抛出异常,则静止控制器将被阻塞.

After another edit I added exceptionChannel to the gateway, and moved enriching header to the second leg (async) of my flow. Still controller gets blocked if exception is throw in the synchronous part of the flow.

推荐答案

首先,让我解释一下网关是如何工作的——它应该有助于理解下面的解决方案.

First of all, let me explain how the gateway works - it should help with understanding the solution below.

请求消息获得一个唯一的临时回复通道,该通道被添加为 replyChannel 标头.即使网关有一个显式的 replyChannel,它也只是桥接到请求的 replyChannel - 这就是网关如何将回复与请求相关联.

The request message gets a unique temporary reply channel which is added as the replyChannel header. Even if the gateway has an explicit replyChannel, that is simply bridged to the request's replyChannel - that's how the gateway correlates the reply to the request.

现在,网关还将请求的 errorChannel 标头设置为相同的回复通道.这样,即使流是异步的,异常也可以路由回网关并抛出给调用者或路由到网关的错误通道(如果指定).此路由由连接到 ErrorHandlingTaskExecutorMessagePublishingErrorHandler 执行,后者包装了您的执行程序.

Now, the gateway also sets the request's errorChannel header to the same reply channel. That way, even if the flow is asynchronous, an exception can be routed back to the gateway and either thrown to the caller or routed to the gateway's error channel (if specified). This routing is performed by a MessagePublishingErrorHandler which is wired into a ErrorHandlingTaskExecutor, which wraps your executor.

由于您将结果返回给网关然后继续;该网关交互被花费",并且没有任何东西会收到发送到 replyChannel 标头的消息(包括异常).因此,您会看到日志消息.

Since you are returning a result to the gateway and then continuing; that gateway interaction is "spent" and nothing will ever receive a message (including an exception) sent to the replyChannel header. Hence the log message you are seeing.

因此,一种解决方案是修复发送到独立流的消息上的 errorChannel 标头.使用 .enrichHeaders 替换(确保将 overwrite 设置为 true)网关设置的 errorChannel 标头.这应该在流程中尽快完成,以便任何异常都将路由到该通道(然后您可以在那里订阅您的错误处理程序).

So, one solution is to fix up the errorChannel header on the message sent to the independent flow. Use .enrichHeaders to replace (be sure to set overwrite to true) the errorChannel header that was set up by the gateway. This should be done as soon as possible in the flow so any exceptions will be routed to that channel (and then you can subscribe your error handler there).

另一种解决方案是连接您自己的错误处理执行器,在其 MessagePublishingErrorHandler 上显式设置 defaultErrorChannel 并删除 errorChannel 标头.

An alternative solution is to wire up your own error handling executor, explicitly setting a defaultErrorChannel on its MessagePublishingErrorHandler and remove the errorChannel header.

异步错误路由首先寻找一个header;如果存在,错误消息将路由到那里;如果没有标头并且 MPEH 没有默认错误通道;消息将被路由到默认的 errorChannel,其中(通常)订阅了 LoggingChannelAdapter.默认的 errorChannel 是一个发布/订阅频道,因此您可以订阅其他端点.

The async error routing first looks for a header; if present, the error message is routed there; if there's no header and the MPEH has no default error channel; the message will be routed to the default errorChannel to which (normally) there is a LoggingChannelAdapter subscribed. The default errorChannel is a pub/sub channel so you can subscribe other endpoints to it.

编辑

您正在发布/订阅之前更改频道.

You are changing the channel before the pub/sub.

获得至少一个对网关的响应很重要;您应该在 pub/sub 的一条腿上单独保留错误通道,并在第二条腿上更新它.这样,第一站的异常将被抛出给调用者(如果您想在网关中执行某些操作,例如路由到您的异常处理程序,您可以向网关添加一个 errorChannel).您必须只更新第二条腿的标头,以便其异常直接进入您的错误处理程序.

It's important to get at least one response to the gateway; you should leave the error channel alone on one leg of the pub/sub and update it on the second leg. That way, an exception on the first leg will be thrown to the caller (you can add an errorChannel to the gateway if you want to take some action there, such as routing to your exception handler). You must only update the header on the second leg so that its exceptions go straight to your error handler.

如果您将网关上的 errorChannel 设置为您的 exceptionChannel,那么两条腿上的异常都会去那里.

If you set the errorChannel on the gateway to your exceptionChannel then exceptions on both legs will go there.

这篇关于Spring集成流程中的错误处理实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 04:17