本文介绍了Netty Nio Java中的通信的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在Netty nio中创建一个具有两个客户端和一个服务器的通信系统。更具体地说,首先,我希望当两个客户端与服务器连接时从服务器发送消息,然后在这两个客户端之间交换数据。我正在使用此示例提供的。我对代码的修改可以在这里找到:

I want to create a communication system with two clients and a server in Netty nio. More specifically, firstly, I want when two clients are connected with the server to send a message from the server and after that to be able to exchnage data between the two clients. I am using the code provided from this example. My modifications in the code can be found here: link

当第一个客户端连接时,serverHandler中的channelRead似乎可以正常工作,因此它始终返回1,但是当连接第二个客户端时,它不会变为2。如何从服务器正确检查两个客户端都连接到服务器时?如何从客户端的主要功能动态读取此值?那么,让两个客户端进行通信的最佳方法是什么?

It seems that the channelRead in the serverHandler works when the first client is connceted so it always return 1 but when a second client is connected does not change to 2. How can I check properly from the server when both clients are connected to the server? How can I read this value dynamically from my main function of the Client? Then which is the best way to let both clients communicate?

EDIT1 :显然,客户端服务正在运行并直接关闭,因此每个我正在运行新的NettyClient的时间已连接,但此后连接已关闭。因此,计数器始终从零到1变动。正如我在下面的评论中所建议的那样,我在同一端口上使用telnet对其进行了测试,但是计数器似乎正常增加,但是使用NettyClient服务编号。

Apparently it seems that the client service is running and close directly so every time that I am running a new NettyClient is connected but the connection is closed after that. So the counter is always chnages from zero to one. As I was advised in the below comments I tested it using telnet in the same port and the counter seems to increasing normally, however, with the NettyClient service no.

EDIT2:似乎我遇到的问题来自中的 future.addListener(ChannelFutureListener.CLOSE); 。 ProcessingHandler类中的code>。当我发表评论时,代码似乎可以正常工作。但是,不确定将其注释掉会带来什么后果。而且,我想从客户端的主要功能中检查返回消息何时是特定的两个。我该如何创建一种方法来等待服务器发出特定消息,同时又阻止主要功能。

It seems that the issue I got was from future.addListener(ChannelFutureListener.CLOSE); which was in channelRead in the ProcessingHandler class. When I commented it that out it seems that the code works. However, am not sure what are the consequences of commented that out. Moreover, I want from my main function of the client to check when the return message is specific two. How, could I create a method that waits for a specific message from server and meanwhile it blocks the main functionality.

 static EventLoopGroup workerGroup = new NioEventLoopGroup();
 static Promise<Object> promise = workerGroup.next().newPromise(); 
 public static void callClient() throws Exception {
    String host = "localhost";
    int port = 8080;
    try {
        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(NioSocketChannel.class);
        b.option(ChannelOption.SO_KEEPALIVE, true);
        b.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
            }
        });
        ChannelFuture f = b.connect(host, port).sync();
    } finally {
        //workerGroup.shutdownGracefully();
    }
}

我想在主函数中调用方法并返回结果,当结果为2时继续主要功能。但是,我无法在一段时间内调用callClient,因为它将在同一个客户端上运行多次。

I want inside the main function to call the method and return the result and when it is 2 to continue with the main functionality. However, I cannot call callClient inside the while since it will run multiple times the same client.

   callBack();
    while (true) {
        Object msg = promise.get();
        System.out.println("Case1: the connected clients is not two");
        int ret = Integer.parseInt(msg.toString());
        if (ret == 2){
            break;
        }
    }
   System.out.println("Case2: the connected clients is two");
   // proceed with the main functionality

如何为第一个更新Promise变量客户。当我运行两个客户端时,对于第一个客户端,我总是收到以下消息:

How can I update the promise variable for the first client. When I run two clients, for the first client I always received the message :

似乎承诺没有正常更新,而对于第二个客户,我总是收到:

seems that the promise is not updated normally, while for the second client I always received the:


推荐答案

因为孩子 ChannelInitializer 每个新的频道(客户端)都会被调用。在这里,您正在创建 ProcessingHandler 的新实例,因此每个通道请参见自己的 ChannelGroup 。

Because child ChannelInitializer is called for every new Channel (client). There you are creating new instance of ProcessingHandler, so every channel see its own instance of ChannelGroup.

解决方案1-频道属性

使用并将其与 Channel

在某处创建属性(例如在 Constants 类内部):

Create attribute somewhere (let's say inside Constants class):

public static final AttributeKey<ChannelGroup> CH_GRP_ATTR = 
       AttributeKey.valueOf(SomeClass.class.getName());

现在,创建ChannelGroup,所有 ProcessingHandler实例都将使用 code>:

Now, create ChannelGroup which will be used by all instances of ProcessingHandler:

final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

在NettyServer中更新您的孩子 ChannelInitializer : / p>

Update your child ChannelInitializer in NettyServer :

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(
        new RequestDecoder(), 
        new ResponseDataEncoder(), 
        new ProcessingHandler());

    ch.attr(Constants.CH_GRP_ATTR).set(channels);
}

现在,您可以像这样在处理程序中访问ChannelGroup的实例:

Now you can access instance of ChannelGroup inside your handlers like this:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    final ChannelGroup channels = ctx.channel().attr(Constants.CH_GRP_ATTR).get();
    channels.add(ctx.channel());

这将起作用,因为每次新客户端连接时,ChannelInitializer都将使用与<$相同的引用来调用c $ c> ChannelGroup 。

This will work, because every time new client connects, ChannelInitializer will be called with same reference to ChannelGroup.

解决方案2-静态字段

如果将 ChannelGroup 声明为静态,则所有类实例将查看相同的 ChannelGroup 实例:

If you declare ChannelGroup as static, all class instances will see same ChannelGroup instance:

private static final ChannelGroup channels =
     new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

解决方案3-传播共享实例

将参数引入 ProcessingHandler 的构造函数中:

Introduce parameter into constructor of ProcessingHandler:

private final ChannelGroup channels;
public ProcessingHandler(ChannelGroup chg) {
    this.channels = chg;
}

现在,在NettyServer类中,创建 ChannelGroup的实例并将其传播到ProcessingHandler构造函数:

Now, inside your NettyServer class create instance of ChannelGroup and propagate it to ProcessingHandler constructor:

final ChannelGroup channels = new 
      DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(
        new RequestDecoder(), 
        new ResponseDataEncoder(), 
        new ProcessingHandler(channels)); // <- here
}

我个人会选择第一个解决方案,因为

Personally, I would choose first solution, because


  • 它显然将ChannelGroup与Channel上下文关联

  • 您可以在其他处理程序中访问同一ChannelGroup
  • >
  • 您可以有多个服务器实例(在同一JVM中在不同端口上运行)

  • It clearly associate ChannelGroup with Channel context
  • You can access same ChannelGroup in other handlers
  • You can have multiple instances of server (running on different port, within same JVM)

这篇关于Netty Nio Java中的通信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-18 05:35