我正在尝试使用TcpOutboundGateway和客户端TcpConnectionFactory与外部TCP服务器通信。在我的场景中,每个连接应该与不同的线程相关联(线程上的每个连接可能用于多个请求/响应)。
所以我在这个主题中使用了一个ThreadAffinityClientConnectionFactorySpring Integration tcp client multiple connections
它工作得很好,直到我尝试打开超过4个并发连接,第五个(及以上)连接在超时时失败。
我发现org.springframework.integration.ip.tcp.TcpOutboundGateway使用handleRequestMessage方法中的信号量获取连接,所以我重写了TcpOuboundGateway如下:

public class NoSemaphoreTcpOutboundGateway extends TcpOutboundGateway {

    private volatile AbstractClientConnectionFactory connectionFactory;
    private final Map<String, NoSemaphoreTcpOutboundGateway.AsyncReply> pendingReplies = new ConcurrentHashMap();

    @Override
    public boolean onMessage(Message<?> message) {
        String connectionId = (String)message.getHeaders().get("ip_connectionId");
        if(connectionId == null) {
            this.logger.error("Cannot correlate response - no connection id");
            this.publishNoConnectionEvent(message, (String)null, "Cannot correlate response - no connection id");
            return false;
        }

        if(this.logger.isTraceEnabled()) {
            this.logger.trace("onMessage: " + connectionId + "(" + message + ")");
        }

        NoSemaphoreTcpOutboundGateway.AsyncReply reply = (NoSemaphoreTcpOutboundGateway.AsyncReply)this.pendingReplies.get(connectionId);
        if(reply == null) {
            if(message instanceof ErrorMessage) {
                return false;
            } else {
                String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
                this.logger.error(errorMessage);
                this.publishNoConnectionEvent(message, connectionId, errorMessage);
                return false;
            }
        } else {
            reply.setReply(message);
            return false;
        }

    }

    @Override
    protected Message handleRequestMessage(Message<?> requestMessage) {
        connectionFactory = (AbstractClientConnectionFactory) this.getConnectionFactory();
        Assert.notNull(this.getConnectionFactory(), this.getClass().getName() + " requires a client connection factory");

        TcpConnection connection = null;
        String connectionId = null;

        Message var7;
        try {
            /*if(!this.isSingleUse()) {
                this.logger.debug("trying semaphore");
                if(!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                    throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
                }

                haveSemaphore = true;
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("got semaphore");
                }
            }*/

            connection = this.getConnectionFactory().getConnection();
            NoSemaphoreTcpOutboundGateway.AsyncReply e = new NoSemaphoreTcpOutboundGateway.AsyncReply(10000);
            connectionId = connection.getConnectionId();
            this.pendingReplies.put(connectionId, e);
            if(this.logger.isDebugEnabled()) {
                this.logger.debug("Added pending reply " + connectionId);
            }

            connection.send(requestMessage);

            //connection may be closed after send (in interceptor) if its disconnect message
            if (!connection.isOpen())
                return null;

            Message replyMessage = e.getReply();
            if(replyMessage == null) {
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("Remote Timeout on " + connectionId);
                }

                this.connectionFactory.forceClose(connection);
                throw new MessageTimeoutException(requestMessage, "Timed out waiting for response");
            }

            if(this.logger.isDebugEnabled()) {
                this.logger.debug("Response " + replyMessage);
            }

            var7 = replyMessage;
        } catch (Exception var11) {
            this.logger.error("Tcp Gateway exception", var11);
            if(var11 instanceof MessagingException) {
                throw (MessagingException)var11;
            }

            throw new MessagingException("Failed to send or receive", var11);
        } finally {
            if(connectionId != null) {
                this.pendingReplies.remove(connectionId);
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("Removed pending reply " + connectionId);
                }
            }
        }
        return var7;
    }

    private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
        ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
        if(applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new TcpConnectionFailedCorrelationEvent(this, connectionId, new MessagingException(message, errorMessage)));
        }
    }

    private final class AsyncReply {
        private final CountDownLatch latch;
        private final CountDownLatch secondChanceLatch;
        private final long remoteTimeout;
        private volatile Message<?> reply;

        private AsyncReply(long remoteTimeout) {
            this.latch = new CountDownLatch(1);
            this.secondChanceLatch = new CountDownLatch(1);
            this.remoteTimeout = remoteTimeout;
        }

        public Message<?> getReply() throws Exception {
            try {
                if(!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
                    return null;
                }
            } catch (InterruptedException var2) {
                Thread.currentThread().interrupt();
            }
            for(boolean waitForMessageAfterError = true; this.reply instanceof ErrorMessage; waitForMessageAfterError = false) {
                if(!waitForMessageAfterError) {
                    if(this.reply.getPayload() instanceof MessagingException) {
                        throw (MessagingException)this.reply.getPayload();
                    }

                    throw new MessagingException("Exception while awaiting reply", (Throwable)this.reply.getPayload());
                }
                NoSemaphoreTcpOutboundGateway.this.logger.debug("second chance");
                this.secondChanceLatch.await(2L, TimeUnit.SECONDS);
            }
            return this.reply;
        }

        public void setReply(Message<?> reply) {
            if(this.reply == null) {
                this.reply = reply;
                this.latch.countDown();
            } else if(this.reply instanceof ErrorMessage) {
                this.reply = reply;
                this.secondChanceLatch.countDown();
            }
        }
    }
}

springcontext的配置如下:
@Configuration
@ImportResource("gateway.xml")
public class Conf {

@Bean
@Autowired
@ServiceActivator(inputChannel = "clientOutChannel")
public NoSemaphoreTcpOutboundGateway noSemaphoreTcpOutboundGateway(ThreadAffinityClientConnectionFactory cf, DirectChannel clientInChannel){
    NoSemaphoreTcpOutboundGateway gw = new NoSemaphoreTcpOutboundGateway();
    gw.setConnectionFactory(cf);
    gw.setReplyChannel(clientInChannel);
    gw.setRequestTimeout(10000);
    return gw;
}

 <int-ip:tcp-connection-factory
        id="delegateCF"
        type="client"
        host="${remoteService.host}"
        port="${remoteService.port}"
        single-use="true"
        lookup-host="false"
        ssl-context-support="sslContext"
        deserializer="clientDeserializer"
        serializer="clientSerializer"
        interceptor-factory-chain="clientLoggingTcpConnectionInterceptorFactory"
        using-nio="false"/>

delegateCF被传递给ThreadAffinityClientConnectionFactory构造函数
所以,问题是:
在并发性方面,可以将NoSemaphoreTcpOutboundGatewayThreadAffinityClientConnectionFactory结合使用吗?

最佳答案

看起来你走得不错,但同时我认为你不需要自定义。TcpOutboundGateway逻辑基于:

if (!this.isSingleUse) {
            logger.debug("trying semaphore");
            if (!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
            }

同时,看看gary对于semaphore的解决方案:
@Bean
public TcpNetClientConnectionFactory delegateCF() {
    TcpNetClientConnectionFactory clientCF = new TcpNetClientConnectionFactory("localhost", 1234);
    clientCF.setSingleUse(true); // so each thread gets his own connection
    return clientCF;
}

@Bean
public ThreadAffinityClientConnectionFactory affinityCF() {
    return new ThreadAffinityClientConnectionFactory(delegateCF());
}

注意评论。您只需要委派ThreadAffinityClientConnectionFactory

关于java - Spring集成中的自定义并发TcpOutboundGateway,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41166718/

10-10 09:26