问题描述
我正在尝试创建一个带有TCP入站终结点的m子流,该终结点是侦听端口的TCP服务器.识别出成功的客户端连接后,在接收到来自客户端的任何请求之前,我需要向套接字中写入一条消息(该消息使客户端知道我正在侦听),然后客户端才能向我发送其他请求.这是我使用示例Java程序执行的操作:
I am trying to create a mule flow with a TCP inbound endpoint which is a TCP server that listens to a port. When a successful client connection is identified, before receiving any request from the client, I need to write a message into the socket (which lets the client know that I am listening), only after which the client sends me further requests. This is how I do it with a sample java program :
import java.net.*;
import java.io.*;
public class TCPServer
{
public static void main(String[] args) throws IOException
{
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(4445);
}
catch (IOException e)
{
System.err.println("Could not listen on port: 4445.");
System.exit(1);
}
Socket clientSocket = null;
System.out.println ("Waiting for connection.....");
try {
clientSocket = serverSocket.accept();
}
catch (IOException e)
{
System.err.println("Accept failed.");
System.exit(1);
}
System.out.println ("Connection successful");
System.out.println ("Sending output message - .....");
//Sending a message to the client to indicate that the server is active
PrintStream pingStream = new PrintStream(clientSocket.getOutputStream());
pingStream.print("Server listening");
pingStream.flush();
//Now start listening for messages
System.out.println ("Waiting for incoming message - .....");
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String inputLine;
while ((inputLine = in.readLine()) != null)
{
System.out.println ("Server: " + inputLine);
out.println(inputLine);
if (inputLine.equals("Bye."))
break;
}
out.close();
in.close();
clientSocket.close();
serverSocket.close();
}
}
我试图将Mule的TCP入站端点用作服务器,但无法看到如何从客户端识别成功的连接以触发出站消息.仅当从客户端发送消息时才触发该流.有没有办法可以扩展Mule TCP连接器的功能并具有可以满足上述要求的侦听器?
I have tried to use Mule's TCP inbound endpoint as a server, but I am not able to see how I can identify a successful connection from the client, inorder to trigger the outbound message. The flow gets triggered only when a message is sent across from the client. Is there a way I can extend the functionality of the Mule TCP connector and have a listener which could do the above requirement?
根据提供的答案,这就是我实施此方法的方式-
Based on the answer provided, this is how I implemented this -
public class TCPMuleOut extends TcpMessageReceiver {
boolean InitConnection = false;
Socket clientSocket = null;
public TCPMuleOut(Connector connector, FlowConstruct flowConstruct,
InboundEndpoint endpoint) throws CreateException {
super(connector, flowConstruct, endpoint);
}
protected Work createWork(Socket socket) throws IOException {
return new MyTcpWorker(socket, this);
}
protected class MyTcpWorker extends TcpMessageReceiver.TcpWorker {
public MyTcpWorker(Socket socket, AbstractMessageReceiver receiver)
throws IOException {
super(socket, receiver);
// TODO Auto-generated constructor stub
}
@Override
protected Object getNextMessage(Object resource) throws Exception {
if (InitConnection == false) {
clientSocket = this.socket;
logger.debug("Sending logon message");
PrintStream pingStream = new PrintStream(
clientSocket.getOutputStream());
pingStream.print("Log on message");
pingStream.flush();
InitConnection = true;
}
long keepAliveTimeout = ((TcpConnector) connector)
.getKeepAliveTimeout();
Object readMsg = null;
try {
// Create a monitor if expiry was set
if (keepAliveTimeout > 0) {
((TcpConnector) connector).getKeepAliveMonitor()
.addExpirable(keepAliveTimeout,
TimeUnit.MILLISECONDS, this);
}
readMsg = protocol.read(dataIn);
// There was some action so we can clear the monitor
((TcpConnector) connector).getKeepAliveMonitor()
.removeExpirable(this);
if (dataIn.isStreaming()) {
}
return readMsg;
} catch (SocketTimeoutException e) {
((TcpConnector) connector).getKeepAliveMonitor()
.removeExpirable(this);
System.out.println("Socket timeout");
} finally {
if (readMsg == null) {
// Protocols can return a null object, which means we're
// done
// reading messages for now and can mark the stream for
// closing later.
// Also, exceptions can be thrown, in which case we're done
// reading.
dataIn.close();
InitConnection = false;
logger.debug("Client closed");
}
}
return null;
}
}
}
TCP连接器如下:
<tcp:connector name="TCP" doc:name="TCP connector"
clientSoTimeout="100000" receiveBacklog="0" receiveBufferSize="0"
sendBufferSize="0" serverSoTimeout="100000" socketSoLinger="0"
validateConnections="true" keepAlive="true">
<receiver-threading-profile
maxThreadsActive="5" maxThreadsIdle="5" />
<reconnect-forever />
<service-overrides messageReceiver="TCPMuleOut" />
<tcp:direct-protocol payloadOnly="true" />
</tcp:connector>
推荐答案
您要尝试执行的操作有些困难,但并非不可能.消息由org.mule.transport.tcp.TcpMessageReceiver
类接收,并且此类始终使用输入流中的数据来创建注入流中的消息.但是,您可以扩展接收器,并通过在流的tcp连接器中添加service-overrides
标记(指示此处)并替换messageReceiver
元素.在扩展的接收器中,应更改TcpWorker.getNextMessage
方法,以便在从输入流中读取之前发送确认消息.HTH,Marcos.
What you're trying to do is a little difficult to accomplish but not impossible. The messages are received by the org.mule.transport.tcp.TcpMessageReceiver
class, and this class always consumes the data in the input stream to create the message that injects in the flow.However, you could extend that receiver and instruct the TCP module to use yours by adding a service-overrides
tag in your flow's tcp connector (documented here) and replacing the messageReceiver
element.In your extended receiver you should change the TcpWorker.getNextMessage
method in order to send the ack message before read from the input stream.HTH, Marcos.
这篇关于Mule中的TCP Server配置-写入客户端套接字的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!