在我的应用程序中,我配置一些通道,如下所示:
@Bean
public MessageChannel eventFilterChannel() {
return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}
@Bean
public MessageChannel processEventChannel() {
return new ExecutorChannel(asyncConfiguration.getAsyncExecutor());
}
我正在使用
ExecutorChannel
并使用我的自定义Executor
,如下所示:@Configuration
@EnableAsync
public class AsyncConfiguration extends AsyncConfigurerSupport {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("MyAppThread");
executor.initialize();
return executor;
}
}
我有以下
MessageEndpoint
是eventFilterChannel
频道的订户:@MessageEndpoint
public class MyEventFilter {
@Filter(inputChannel = "eventFilterChannel", outputChannel = "processEventChannel")
public boolean filterEvents(final MyEvent myEvent) {
//filter logic
}
}
理想情况下,我希望事件过滤器消息端点在使用
ExecutorChannel
时是多线程的。我想了解这是否是多线程端点的正确实现?但是,我很怀疑,因为我可以在日志中看到以下内容:
Channel 'application.eventFilterChannel' has 1 subscriber(s).
我的实现是正确的还是可以遵循的标准?
最佳答案
好吧,这里有一些误导。您的eventFilterChannel
实际上只有一个订户-您的@Filter
。但这确实是多线程的。在多个线程中使用相同的无状态组件。ExecutorChannel
将传入的任务排队,并且它们在池中的线程上并行执行。在我们的案例中,故事是关于消息传递的。不确定代码是否可以帮助您,但看起来像:
public final boolean dispatch(final Message<?> message) {
if (this.executor != null) {
Runnable task = createMessageHandlingTask(message);
this.executor.execute(task);
return true;
}
return this.doDispatch(message);
}
Runnable
是这样的:public void run() {
doDispatch(message);
}
...
handler.handleMessage(message);
该
handler
正是该@Filter
的订阅者。因此,从不同的线程调用相同的方法。由于这是被动且无状态的组件,因此仅将其保留一次并在不同线程中重用就很安全。
另一方面,不合主题:如果您向该频道添加更多订户,则无论如何都不会并行调用它们:默认情况下,它是轮询策略:根据索引选择下一条消息的处理程序。
如果一个处理程序无法处理消息,我们将尝试下一个,依此类推。您可以注入任何其他自定义实现。甚至将其重置为
null
,以始终从第一个开始。