本文介绍了Spring Integration 5.0 + Project Reactor:控制线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

https://stackoverflow.com/a/47136941/1776585

在使用 Flux + split() + FluxMessageChannel 时,我无法让我的集成处理程序在并行线程中运行.

I can't make my integration handler to run in parallel threads while using Flux + split() + FluxMessageChannel.

考虑以下片段:

// ...
.handle(message -> Flux.range(0, 10)
    .doOnNext(i -> LOG.info("> " + i))
    .subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...

所有日志都在一个线程中输出:

All logs are output in one thread:

[     parallel-1] d.a.Application    : > 0
[     parallel-1] d.a.Application    :  -> 0
[     parallel-1] d.a.Application    : > 1
[     parallel-1] d.a.Application    :  -> 1
[     parallel-1] d.a.Application    : > 2
[     parallel-1] d.a.Application    :  -> 2
[     parallel-1] d.a.Application    : > 3
[     parallel-1] d.a.Application    :  -> 3
[     parallel-1] d.a.Application    : > 4
[     parallel-1] d.a.Application    :  -> 4
[     parallel-1] d.a.Application    : > 5
[     parallel-1] d.a.Application    :  -> 5
[     parallel-1] d.a.Application    : > 6
[     parallel-1] d.a.Application    :  -> 6
[     parallel-1] d.a.Application    : > 7
[     parallel-1] d.a.Application    :  -> 7
[     parallel-1] d.a.Application    : > 8
[     parallel-1] d.a.Application    :  -> 8
[     parallel-1] d.a.Application    : > 9
[     parallel-1] d.a.Application    :  -> 9

如何强制多线程处理?

我已经尝试在 Flux 上使用 .parallel().runOn(),但这只是使获取数据并行,但实际处理仍然在一个线程上运行.

I've tried using .parallel().runOn() on the Flux, but that just makes fetching data parallel, but actual handling still runs on one thread.

我也试过 .publishOn(Schedulers.parallel())Flux 上没有效果.

I've also tried .publishOn(Schedulers.parallel()) on the Flux with no effect.

并且将 ExecutorChannel 或带有执行程序的 Poller 添加到处理程序也无济于事.

And also adding ExecutorChannel or a Poller with an executor to a handler didn't help.

推荐答案

这有一些技巧:

.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))

FluxMessageChannel 使用的那些消息将与额外的 ExecutorChannel 并行.

And those messages consumed by the FluxMessageChannel is going to be paralled by that additional ExecutorChannel.

我认为您要问的是一个功能请求,可以使提到的 FluxMessageChannel 可配置.并且可以在那里配置这样的 subscribeOn/publishOn 等.

I think what you are asking is like a feature request to make the mentioned FluxMessageChannel configurable. And such a subscribeOn/publishOn etc. could be configured there.

请随时就此事提出JIRA

这篇关于Spring Integration 5.0 + Project Reactor:控制线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 06:09