问题描述
我有一个与此非常类似的问题
I have a question very similar to this one How to create a Spring Reactor Flux from a ActiveMQ queue?
一个区别是消息来自Http端点而不是JMS队列。问题是消息通道由于某种原因没有填充,或者Flux.from()没有获取它。日志条目显示GenericMessage是从Http Integration流创建的,有效负载作为路径变量,但不会入队/发布到通道?我试过 .channel(MessageChannels.queue())
和 .channel(MessageChannels.publishSubscribe())
没有任何区别,事件流是空的。以下是代码:
With one difference that messages come from Http endpoint rather than JMS queue. The problem is that Message Channel is not get populated for some reason or it is not picked up by Flux.from(). The log entries show that GenericMessage is created from Http Integration flow with a payload as path variable but does not get enqueued/published to a channel? I tried .channel(MessageChannels.queue())
and .channel(MessageChannels.publishSubscribe())
does not make any difference, event stream is empty. Here is the code:
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/eventmessage/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
UPDATE1:
build.gradle
build.gradle
buildscript {
ext {
springBootVersion = '2.0.0.M2'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-freemarker')
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.integration:spring-integration-http')
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')
}
UPDATE2
当 @SpringBootApplication
和 @RestController
在一个文件中定义,但在 @SpringBootApplication $ c时停止工作$ c>和
@RestController
位于不同的文件中。
It works when @SpringBootApplication
and @RestController
are defined in one file, but stops to work when @SpringBootApplication
and @RestController
are in separate files.
TestApp.java
TestApp.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestApp {
public static void main(String[] args) {
SpringApplication.run(TestApp.class, args);
}
}
TestController.java
TestController.java
package com.example.controller;
import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;
@RestController
public class TestController {
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages() {
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
}
推荐答案
这对我很有用:
@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
}
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages() {
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
}
我在POM中有这种依赖关系:
I have this dependencies in POM:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
我运行应用程序并有两个终端:
I run the app and have two terminals:
curl http://localhost:8080/events
听取SSE。
在第二个我执行此操作:
And in the second one I perform this:
curl -X POST http://localhost:8080/message/foo
curl -X POST http://localhost:8080/message/bar
curl -X POST http://localhost:8080/message/666
所以,第一个终端响应喜欢:
So, the first terminal responds like:
data:foo
data:bar
data:666
注意,我们不需要 spring-boot-starter-webflux
依赖。 SSE的 Flux
适用于Servlet容器上的常规MVC。
Note, we don't need spring-boot-starter-webflux
dependency. The Flux
to SSE works well with regular MVC on the Servlet Container.
Spring Integration也将很快支持WebFlux :。所以,你将能够配置如下:
Spring Integration will support WebFlux soon, too: https://jira.spring.io/browse/INT-4300. So, you will be able to configure there something like:
IntegrationFlows
.from(Http.inboundReactiveGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
完全只依赖WebFlux而没有任何Servlet容器依赖。
And fully rely just only WebFlux without any Servlet Container dependencies.
这篇关于如何从Http集成流程创建Spring Reactor Flux?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!