在我的应用程序中,我配置一些通道,如下所示:

@Bean
public MessageChannel eventFilterChannel() {
    return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}

@Bean
public MessageChannel processEventChannel() {
    return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}


我正在使用ExecutorChannel并使用我的自定义Executor,如下所示:

@Configuration
@EnableAsync
public class AsyncConfiguration extends AsyncConfigurerSupport {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("MyAppThread");
        executor.initialize();
        return executor;
    }
}


我有以下MessageEndpointeventFilterChannel频道的订户:

@MessageEndpoint
public class MyEventFilter {

    @Filter(inputChannel = "eventFilterChannel", outputChannel = "processEventChannel")
    public boolean filterEvents(final MyEvent myEvent) {

            //filter logic

    }

}


理想情况下,我希望事件过滤器消息端点在使用ExecutorChannel时是多线程的。我想了解这是否是多线程端点的正确实现?

但是,我很怀疑,因为我可以在日志中看到以下内容:

Channel 'application.eventFilterChannel' has 1 subscriber(s).


我的实现是正确的还是可以遵循的标准?

最佳答案

好吧,这里有一些误导。您的eventFilterChannel实际上只有一个订户-您的@Filter。但这确实是多线程的。在多个线程中使用相同的无状态组件。

ExecutorChannel将传入的任务排队,并且它们在池中的线​​程上并行执行。在我们的案例中,故事是关于消息传递的。不确定代码是否可以帮助您,但看起来像:

public final boolean dispatch(final Message<?> message) {
    if (this.executor != null) {
        Runnable task = createMessageHandlingTask(message);
        this.executor.execute(task);
        return true;
    }
    return this.doDispatch(message);
}


Runnable是这样的:

public void run() {
    doDispatch(message);
}
...

handler.handleMessage(message);


handler正是该@Filter的订阅者。
因此,从不同的线程调用相同的方法。由于这是被动且无状态的组件,因此仅将其保留一次并在不同线程中重用就很安全。

另一方面,不合主题:如果您向该频道添加更多订户,则无论如何都不会并行调用它们:默认情况下,它是轮询策略:根据索引选择下一条消息的处理程序。
如果一个处理程序无法处理消息,我们将尝试下一个,依此类推。您可以注入任何其他自定义实现。甚至将其重置为null,以始终从第一个开始。

08-28 19:26