我有三个名为LWA1,LWA2和LWA3的进程。每个服务器都有一个服务器,LWA1的端口为55555,LWA2的端口为55556,LWA3的端口为55557。
此外,每个进程都有一个客户端,以便连接到其他进程。

每个进程都应该能够对其他进程进行读写。所以:


LWA1应该向LWA2和LWA3读写
LWA2应该向LWA1和LWA3读写
LWA3应该向LWA1和LWA2读写


目前,每个进程执行两次写入,但仅接收一条消息。每个过程的输出如下(选项卡式的打印属于客户端,选项卡式的打印属于服务器)。

LWA1:

Setting up server with port: 55555
Server configured.

    Opening sockets to port 55556 and port 55557
    Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted


LWA2:

Setting up server with port: 55556
Server configured.

    Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}


LWA3:

Setting up server with port: 55557
Server configured.

    Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}


如您所见,每个客户端都向另一个客户端写入一个LamportRequest,但是其他两个客户端仅收到一条消息。为什么其他消息没有显示低谷?

我怀疑这可能与服务器中的密钥有关,但不知道可能是什么。另外,我还不太了解它们。如我错了请纠正我:

每个与Selector的连接都用一个不同的(SelectableChannel)密钥表示,因此服务器LWA1中的Iterator(例如)应仅具有(因此,仅侦听事件)两个密钥,一个用于LWA2,另一个用于LWA3,对?我尝试在keyAccept方法中的每个键上附加和整数以区分它们,效果很好,但是当在keyRead方法中打印附加的整数时,它显示为空。该方法中的密钥是新方法吗?第三把钥匙突然出现了吗?

额外的问题:我应该在单个线程中实现此结构。目前,我使用两种,一种用于服务器,一种用于客户端。一旦开始运作,关于如何统一它们的任何技巧?

------------------代码-----------------

服务器(为阅读目的而简化)如下:

public TalkToBrotherSocket(int clock, int port) {
    this.port = port;
    this.clock = clock;

    try {
        setServer();
        System.out.println("Server configured.\n");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void run() {
    while (true) {
        try {
            // Wait for an event one of the registered channels
            selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check if they key is ready to accept a new socket connection
                if (key.isAcceptable()) {
                    keyAccept(key);
                    System.out.println("Key accepted");
                } else if (key.isReadable()){
                    System.out.println("Reading data from server");
                    keyRead(key);
                } else if (key.isWritable()){
                    System.out.println("Writting data from server");
                    keyWrite(key); //unused at the moment
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


private void keyRead(SelectionKey key) throws IOException {
    // Create a SocketChannel to read the request
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    buffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(buffer);
    } catch (IOException e) {
        System.out.println("Closing socket");
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        System.out.println("Shutting down socket");
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    System.out.println("I read: " + new String(buffer.array()).trim());
}

private void keyAccept(SelectionKey key) throws IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    //Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(selector, SelectionKey.OP_READ);
}

private void setServer() throws IOException {
    // Create a new selector
    selector = Selector.open();

    // Create a new non-blocking server socket channel
    serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    serverChannel.bind(new InetSocketAddress("localhost", port));

    // Register the server socket channel, indicating an interest in
    // accepting new connections
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}


客户端(为阅读目的而简化)如下:

public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
    this.process = process;
    this.clock = clock;
    this.id = id;

    try {
        System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
        firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
        secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
        firstBuffer = ByteBuffer.allocate(1024);
        secondBuffer = ByteBuffer.allocate(1024);
        sendRequests();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

private void sendRequests() {
    LamportRequest lamportRequest = new LamportRequest(clock, process, id);
    firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
    System.out.println("\tSending lamport request: " + converted);
    try {
        firstClient.write(firstBuffer);
        secondClient.write(secondBuffer);
        firstBuffer.clear();
}


初始化如下:

System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();

new NIOClient(clock, firstPort, secondPort, id, process);

最佳答案

在@ user207421的评论之后,我添加了新的第二个缓冲区。对sendRequests方法的更改已被编辑到原始帖子中。

07-24 09:30