Netty TCP服务器正在端口8000
上运行,接收NMEA
格式数据。它使用Marine API库将乱码转换为有意义的信息,需要来自套接字的输入流。
SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();
如何获取正在使用的netty服务器端口的inputstream?
最佳答案
SentenceReader
没有接受“流式输入”数据的任何方法,但是通过子类化,可以使其接受数据。SentenceReader
的核心使用了一个DataReader
作为其数据,通常这个数据阅读器是从一个单独的线程SentenceReader
本身进行轮询的,我们可以修改这个结构以得到我们需要的。
首先,我们用自己的类对SentenceReader
进行子类化,为它提供所需的适当的构造函数和方法,并消除启动和停止方法的影响。我们现在提供null
作为文件(并希望将来的版本提供直接传入数据读取器的方法)
public class NettySentenceReader extends SentenceReader {
public NettySentenceReader () {
super((InputStream)null);
}
@Override
public void start() {
}
@Override
public void stop() {
}
}
我们现在需要在自己的netty处理程序中实现内部类
DataReader
的所有功能,以复制相同的行为public class SentenceReaderHandler extends
SimpleChannelInboundHandler<String> {
private SentenceFactory factory;
private SentenceReader parent;
public SentenceReaderHandler (SentenceReader parent) {
this.parent = parent;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
//ActivityMonitor monitor = new ActivityMonitor(parent);
this.factory = SentenceFactory.getInstance();
}
@Override
// This method will be renamed to `messageReceived` in Netty 5.0.0
protected void channelRead0(ChannelHandlerContext ctx, String data)
throws Exception {
if (SentenceValidator.isValid(data)) {
monitor.refresh();
Sentence s = factory.createParser(data);
parent.fireSentenceEvent(s);
} else if (!SentenceValidator.isSentence(data)) {
parent.fireDataEvent(data);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
if(!ctx.channel().isActive())
return;
//monitor.reset();
parent.fireReadingStopped();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
parent.handleException("Data read failed", e);
}
}
最后,我们需要将此集成到一个网络管道中:
SentenceReader reader = new NettySentenceReader();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
private static final StringDecoder DECODER = new StringDecoder();
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(DECODER);
pipeline.addLast(new SentenceReaderHandler(reader));
}
});