问题描述
我是 Spring 集成的新手.
I'm new to Spring integration.
使用 Spring 4,只有 java 注释.
Working with Spring 4, Only java anotations.
我现在工作的项目我们在属性文件中设置了 tcp 连接.
The project where i'm working now we set up tcp connection in a property file.
目前,它仅硬编码为 2 个不同的连接,必须将其更改为更动态的方法,我们可以在属性文件中设置可变数量的连接,并能够在运行时添加新连接.
At the momet it is hardcoded to only 2 different connections and it must be changed to a more dynamic approach where we can set up a variable amount of them in the property file and be able to add new ones at runtime.
我知道存在 动态 tcp 客户端示例 并试图以此为基础我的工作.
I'm aware of the existence of dynamic tcp client example and tried to base my work on it.
首先我们为连接设置以下bean:
First we set up the following bean for the connection:
@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
env.getProperty("socket.tcp.nodes[0].ip"),
env.getProperty("socket.tcp.nodes[0].port", Integer.class)
);
tcpNetClientConnectionFactory.setSingleUse(false);
tcpNetClientConnectionFactory.setSoKeepAlive(true);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
tcpNetClientConnectionFactory.setSerializer(by);
tcpNetClientConnectionFactory.setDeserializer(by);
return tcpNetClientConnectionFactory;
}
然后我们有等待发送的适配器:
Then we have the adapter who waits for something to sent:
@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
@Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
return adapter;
}
调用 fromTcp() 时,它会转换消息,以下代码将其发送到另一个应用程序进行进一步处理.
When fromTcp() is called it transforms the message and the following code sends it to another application for further processing.
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
处理消息时,我们必须发送响应.
When the message is procesed we have to send a response.
@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
return tcpSendingMsgHandler;
}
并通过使用网关:
@MessagingGateway()
public interface MessageTcpGateway {
@Gateway(requestChannel = "toTcpCh01")
ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}
我们把它寄回去.
通过示例,我可以理解如何为响应动态创建流.
With the example I can understand how to dynamically create a flow for the response.
但我无法理解如何创建一个公共连接池,然后基于这些连接工厂动态创建侦听适配器和响应适配器,然后在运行时关闭/删除它们.
But I can't fathom how to create a common connection pool and then create listening adapters and response adapters on the fly based on those connectionsfactory and then closing/removing them on runtime.
由于 这个问题
我是否需要为每个适配器创建多个单独的 IntegrationFlow?所以所有的调用和响应都可以异步处理(我可能对异步有误)
Do I need to create multiple separate IntegrationFlow for each adapter? so all the calls and responses can be handled asynchronous ( I may be wrong about the asynchrony)
然后在想要关闭连接时分别处理它们?比如调用close to the TcpReceivingChannelAdapter然后调用TcpSendingMessageHandler,最后注销connectonfactory?
and then handle them separatly when wanting to close a connection? like calling close to the TcpReceivingChannelAdapter and then to TcpSendingMessageHandler and finally deregistering the connectonfactory?
推荐答案
我不这样做 协作通道适配器 您需要为 TcpReceivingChannelAdapter
和 定义单独的
.它确实可以作为单个 IntegrationFlow
>TcpSendingMessageHandlerIntegrationFlow
完成,从 TcpReceivingChannelAdapter
开始,并以 TcpSendingMessageHandler
结束.关键是 IntegrationFlow
本身只是一个对组件引用进行分组的逻辑容器.艰苦的工作真的由您在那里声明的所有组件完成,使用这个 TcpReceivingChannelAdapter
到 TcpSendingMessageHandler
和网关,你真的会异步.
I don't this that for Collaborating Channel Adapters you need separate IntegrationFlow
definitions for the TcpReceivingChannelAdapter
and TcpSendingMessageHandler
. It really can be done as a single IntegrationFlow
starting from the TcpReceivingChannelAdapter
and ending with the TcpSendingMessageHandler
. The point is that IntegrationFlow
per se is just a logical container to group components references. The hard work is really done by all those components you declare there and with this TcpReceivingChannelAdapter
to TcpSendingMessageHandler
and gateway in between you are really going to be async.
请记住,ByteArrayLengthHeaderSerializer
也必须声明为 bean.不确定每个动态流是否需要一个单独的实例,但这里有一个 API 可以从那里执行此操作:
Please, keep in mind that ByteArrayLengthHeaderSerializer
has to be declared as a bean as well. Not sure that you need a separate instance for each dynamic flow, but here is an API to do that from there anyway:
/**
* Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
* application context. Usually it is some support component, which needs an application context.
* For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
* @param bean an additional arbitrary bean to register into the application context.
* @return the current builder instance
*/
IntegrationFlowRegistrationBuilder addBean(Object bean);
这篇关于动态生成具有入站通道和响应通道的 TCP 客户端的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!