1,客户端启动类
package test3; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI; public final class WebSocketClient { static final String URL = System.getProperty("url", "ws://127.0.0.1:5688/ws"); public void connect(String URL, ChannelGroup clients, ChannelHandlerContext ctx) throws Exception {
URI uri = new URI(URL);
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port;
if (uri.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = uri.getPort();
} if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
System.err.println("Only WS(S) is supported.");
return;
} final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
} EventLoopGroup group = new NioEventLoopGroup();
try {
final WebSocketClientHandler handler = new WebSocketClientHandler(ctx, WebSocketClientHandshakerFactory
.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())); Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
}
}); Channel ch = b.connect(uri.getHost(), port).sync().channel();
handler.handshakeFuture().sync();
/*
* String msg = "222"; WebSocketFrame frame = new
* TextWebSocketFrame(msg); ch.writeAndFlush(frame);
*/
// ch.writeAndFlush("222");
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equals(msg.toLowerCase())) {
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equals(msg.toLowerCase())) {
WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
ch.writeAndFlush(frame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}
} finally {
group.shutdownGracefully();
}
}
}
(2)
package test3; import java.time.LocalDateTime; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import test.SocketHandlerInitializer; public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private Channel outboundChannel;
private ChannelHandlerContext channel;
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture; public WebSocketClientHandler(ChannelHandlerContext channel, WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
this.channel = channel;
} public ChannelFuture handshakeFuture() {
return handshakeFuture;
} @Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
handshakeFuture = ctx.newPromise();
} @Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
handshaker.handshake(ctx.channel());
} @Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("WebSocket Client 链接失败!");
} @Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead0");
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e); }
return;
} if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus()
+ ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
// resposnse(ctx, frame); channel.writeAndFlush(textFrame.text());
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
} private void response(ChannelHandlerContext ctx, final WebSocketFrame msg) {
// 获取客户端传输过来的消息
String content = msg.toString();
clients.writeAndFlush(new TextWebSocketFrame("[服务器收到相应]" + LocalDateTime.now() + "接受萨达到消息, 消息为:" + content)); final Channel inboundChannel = ctx.channel(); Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop()).channel(ctx.channel().getClass())
.handler(new SocketHandlerInitializer(inboundChannel)); ChannelFuture f = b.connect("127.0.0.1", 5688);
outboundChannel = f.channel();
msg.retain(); ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("isSuccess:true");
outboundChannel.writeAndFlush("2222222222");
} else {
System.out.println("isSuccess:false");
inboundChannel.close();
}
}
});
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
} }
(3)测试用的服务端代码
package com.googosoft.websocket; import java.io.IOException;
import javax.websocket.DecodeException;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint; @ServerEndpoint("/echo")
public class EchoServer { @OnOpen
public void initSession(Session session) {
} @OnMessage
public void onMessage(String message, Session session)
throws IOException, InterruptedException {
System.out.println("Received: " + message);
session.getBasicRemote().sendText("This is the first server message");
} @OnError
public void handleError(Throwable thw) {
thw.printStackTrace();
if (thw instanceof DecodeException) {
System.out.println("Error decoding incoming message: " + ((DecodeException)thw).getText());
} else {
System.out.println("Server WebSocket error: " + thw.getMessage());
}
} @OnClose
public void processClose(Session session){ }
}
在测试的时候,服务端的代码我把它放在了一个web项目里面充当服务端
客户端就用的普通的Java项目,在main方法里面建立链接,实现通信