我有三个名为LWA1,LWA2和LWA3的进程。每个服务器都有一个服务器,LWA1的端口为55555,LWA2的端口为55556,LWA3的端口为55557。
此外,每个进程都有一个客户端,以便连接到其他进程。
每个进程都应该能够对其他进程进行读写。所以:
LWA1应该向LWA2和LWA3读写
LWA2应该向LWA1和LWA3读写
LWA3应该向LWA1和LWA2读写
目前,每个进程执行两次写入,但仅接收一条消息。每个过程的输出如下(选项卡式的打印属于客户端,选项卡式的打印属于服务器)。
LWA1:
Setting up server with port: 55555
Server configured.
Opening sockets to port 55556 and port 55557
Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted
LWA2:
Setting up server with port: 55556
Server configured.
Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}
LWA3:
Setting up server with port: 55557
Server configured.
Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}
如您所见,每个客户端都向另一个客户端写入一个LamportRequest,但是其他两个客户端仅收到一条消息。为什么其他消息没有显示低谷?
我怀疑这可能与服务器中的密钥有关,但不知道可能是什么。另外,我还不太了解它们。如我错了请纠正我:
每个与Selector的连接都用一个不同的(SelectableChannel)密钥表示,因此服务器LWA1中的Iterator(例如)应仅具有(因此,仅侦听事件)两个密钥,一个用于LWA2,另一个用于LWA3,对?我尝试在keyAccept方法中的每个键上附加和整数以区分它们,效果很好,但是当在keyRead方法中打印附加的整数时,它显示为空。该方法中的密钥是新方法吗?第三把钥匙突然出现了吗?
额外的问题:我应该在单个线程中实现此结构。目前,我使用两种,一种用于服务器,一种用于客户端。一旦开始运作,关于如何统一它们的任何技巧?
------------------代码-----------------
服务器(为阅读目的而简化)如下:
public TalkToBrotherSocket(int clock, int port) {
this.port = port;
this.clock = clock;
try {
setServer();
System.out.println("Server configured.\n");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
// Wait for an event one of the registered channels
selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check if they key is ready to accept a new socket connection
if (key.isAcceptable()) {
keyAccept(key);
System.out.println("Key accepted");
} else if (key.isReadable()){
System.out.println("Reading data from server");
keyRead(key);
} else if (key.isWritable()){
System.out.println("Writting data from server");
keyWrite(key); //unused at the moment
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void keyRead(SelectionKey key) throws IOException {
// Create a SocketChannel to read the request
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
buffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(buffer);
} catch (IOException e) {
System.out.println("Closing socket");
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
System.out.println("Shutting down socket");
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
System.out.println("I read: " + new String(buffer.array()).trim());
}
private void keyAccept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
//Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void setServer() throws IOException {
// Create a new selector
selector = Selector.open();
// Create a new non-blocking server socket channel
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
serverChannel.bind(new InetSocketAddress("localhost", port));
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
客户端(为阅读目的而简化)如下:
public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
this.process = process;
this.clock = clock;
this.id = id;
try {
System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
firstBuffer = ByteBuffer.allocate(1024);
secondBuffer = ByteBuffer.allocate(1024);
sendRequests();
} catch (IOException e) {
e.printStackTrace();
}
}
private void sendRequests() {
LamportRequest lamportRequest = new LamportRequest(clock, process, id);
firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
System.out.println("\tSending lamport request: " + converted);
try {
firstClient.write(firstBuffer);
secondClient.write(secondBuffer);
firstBuffer.clear();
}
初始化如下:
System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();
new NIOClient(clock, firstPort, secondPort, id, process);
最佳答案
在@ user207421的评论之后,我添加了新的第二个缓冲区。对sendRequests方法的更改已被编辑到原始帖子中。