StompBrokerRelayMessageHandler〜A

StompBrokerRelayMessageHandler〜A

本文介绍了春季启动SSL TCPClient〜StompBrokerRelayMessageHandler〜ActiveMQ〜Undertow的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试基于 Spring Websocket演示使用 Undertow .该应用程序在不安全的连接上运行良好.但是,我在配置 STOMP代理中继以通过SSL连接进行转发.

I'm attempting to build a websocket messaging app based on the Spring Websocket Demo running ActiveMQ as the STOMP message broker with Undertow. The application runs fine on insecure connections. However, I'm having difficulty configuring the STOMP Broker Relay to forward with SSL connections.

如Spring WebSocket文档中所述...

As mentioned in the Spring WebSocket Docs...

此外,文档还声明了对我拥有的 reactor-net 的依赖关系

Further, the docs state a dependency on reactor-net which I have...

问题是我当前的实现未初始化 NettyTCPClient ,因此ActiveMQ连接失败,并出现SSLException.

The issue is that my current implementation doesn't initialize the NettyTCPClient via SSL so the ActiveMQ connection fails with an SSLException.

[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED:
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] »
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...

因此,我尝试研究 Project Reactor Docs 为连接设置SSL选项,但是我没有成功.

As such I've attempted to research the Project Reactor Docs to set SSL options for the connection but I haven't been successful.

这时我发现了 StompBrokerRelayMessageHandler 初始化 NettyTCPClient /spring-projects/spring-framework/blob/master/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java#L957"rel =" noreferrer> Reactor2TcpClient 但是,它似乎不可配置.

At this point I've found the StompBrokerRelayMessageHandler initializes the NettyTCPClient by default in Reactor2TcpClient yet, it doesn't appear to configurable.

我们将不胜感激.

SSCCE

spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx
//...
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    private final static String KEYSTORE = "/activemq.jks";
    private final static String KEYSTORE_PASS = "xxx";
    private final static String KEYSTORE_TYPE = "JKS";
    private final static String TRUSTSTORE = "/activemq_certs.jks";
    private final static String TRUSTSTORE_PASS = "xxx";

    private static String getBindLocation() {
        return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public SslBrokerService activeMQBroker() throws Exception {

        final SslBrokerService service = new SslBrokerService();
        service.setPersistent(false);

        KeyManager[] km = SecurityManager.getKeyManager();
        TrustManager[] tm = SecurityManager.getTrustManager();

        service.addSslConnector(getBindLocation(), km, tm, null);
        final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
        service.setDestinations(new ActiveMQDestination[]{topic});

        return service;
    }


    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/welcome").withSockJS();
        registry.addEndpoint("/test").withSockJS();
    }

   private static class SecurityManager {
   //elided...
   }

}


@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
    ...
    @Bean
    public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
      StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
      ConfigurationReader reader = new StompClientDispatcherConfigReader();
      Environment environment = new Environment(reader).assignErrorJournal();
      TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
      handler.setTcpClient(client);
      return handler;
    }
}
private static class StompTcpClientSpecFactory
        implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);

    private final String host;
    private final int port;
    private final String KEYSTORE = "src/main/resources/tcpclient.jks";
    private final String KEYSTORE_PASS = "xxx";
    private final String KEYSTORE_TYPE = "JKS";
    private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
    private final String TRUSTSTORE_PASS = "xxx";
    private final String TRUSTSTORE_TYPE = "JKS";
    private final Environment environment;

    private final SecurityManager tcpManager = new SecurityManager
            .SSLBuilder(KEYSTORE, KEYSTORE_PASS)
            .keyStoreType(KEYSTORE_TYPE)
            .trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
            .trustStoreType(TRUSTSTORE_TYPE)
            .build();

    public StompTcpClientSpecFactory(Environment environment, String host, int port) {
        this.environment = environment;
        this.host = host;
        this.port = port;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
            Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {

        return tcpClientSpec
                .ssl(new SslOptions()
                        .sslProtocol("TLS")
                        .keystoreFile(tcpManager.getKeyStore())
                        .keystorePasswd(tcpManager.getKeyStorePass())
                        .trustManagers(tcpManager::getTrustManager)
                        .trustManagerPasswd(tcpManager.getTrustStorePass()))
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .env(this.environment)
                .dispatcher(this.environment.getCachedDispatchers("StompClient").get())
                .connect(this.host, this.port);
    }
}

推荐答案

StompBrokerRelayMessageHandler具有可以设置的tcpClient属性.但是,看来我们没有通过WebSocketMessageBrokerConfigurer设置公开它.

The StompBrokerRelayMessageHandler has a tcpClient property you can set. However it looks like we don't expose that through the WebSocketMessageBrokerConfigurer setup.

您可以删除@EnableWebSocketMessageBroker并扩展DelegatingWebSocketMessageBrokerConfiguration.实际上是一样的,但是您现在直接从提供所有bean的配置类进行扩展.

You can remove @EnableWebSocketMessageBroker and extend DelegatingWebSocketMessageBrokerConfiguration instead. It's effectively the same but you're now extending directly from the configuration class that provides all the beans.

这使您可以覆盖stompBrokerRelayMessageHandler() bean并直接设置其TcpClient属性.只需确保覆盖方法标记有@Bean.

This allows you to then override the stompBrokerRelayMessageHandler() bean and set its TcpClient property directly. Just make sure the overriding method is marked with @Bean.

这篇关于春季启动SSL TCPClient〜StompBrokerRelayMessageHandler〜ActiveMQ〜Undertow的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-24 09:39