问题描述
https://stackoverflow.com/a/47136941/1776585
在使用 Flux
+ split()
+ FluxMessageChannel
时,我无法让我的集成处理程序在并行线程中运行.
I can't make my integration handler to run in parallel threads while using Flux
+ split()
+ FluxMessageChannel
.
考虑以下片段:
// ...
.handle(message -> Flux.range(0, 10)
.doOnNext(i -> LOG.info("> " + i))
.subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...
所有日志都在一个线程中输出:
All logs are output in one thread:
[ parallel-1] d.a.Application : > 0
[ parallel-1] d.a.Application : -> 0
[ parallel-1] d.a.Application : > 1
[ parallel-1] d.a.Application : -> 1
[ parallel-1] d.a.Application : > 2
[ parallel-1] d.a.Application : -> 2
[ parallel-1] d.a.Application : > 3
[ parallel-1] d.a.Application : -> 3
[ parallel-1] d.a.Application : > 4
[ parallel-1] d.a.Application : -> 4
[ parallel-1] d.a.Application : > 5
[ parallel-1] d.a.Application : -> 5
[ parallel-1] d.a.Application : > 6
[ parallel-1] d.a.Application : -> 6
[ parallel-1] d.a.Application : > 7
[ parallel-1] d.a.Application : -> 7
[ parallel-1] d.a.Application : > 8
[ parallel-1] d.a.Application : -> 8
[ parallel-1] d.a.Application : > 9
[ parallel-1] d.a.Application : -> 9
如何强制多线程处理?
我已经尝试在 Flux
上使用 .parallel().runOn()
,但这只是使获取数据并行,但实际处理仍然在一个线程上运行.
I've tried using .parallel().runOn()
on the Flux
, but that just makes fetching data parallel, but actual handling still runs on one thread.
我也试过 .publishOn(Schedulers.parallel())
在 Flux
上没有效果.
I've also tried .publishOn(Schedulers.parallel())
on the Flux
with no effect.
并且将 ExecutorChannel
或带有执行程序的 Poller
添加到处理程序也无济于事.
And also adding ExecutorChannel
or a Poller
with an executor to a handler didn't help.
推荐答案
这有一些技巧:
.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))
FluxMessageChannel
使用的那些消息将与额外的 ExecutorChannel
并行.
And those messages consumed by the FluxMessageChannel
is going to be paralled by that additional ExecutorChannel
.
我认为您要问的是一个功能请求,可以使提到的 FluxMessageChannel
可配置.并且可以在那里配置这样的 subscribeOn/publishOn
等.
I think what you are asking is like a feature request to make the mentioned FluxMessageChannel
configurable. And such a subscribeOn/publishOn
etc. could be configured there.
请随时就此事提出JIRA!
这篇关于Spring Integration 5.0 + Project Reactor:控制线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!