Netty TCP服务器正在端口8000上运行,接收NMEA格式数据。它使用Marine API库将乱码转换为有意义的信息,需要来自套接字的输入流。

SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();

如何获取正在使用的netty服务器端口的inputstream?

最佳答案

SentenceReader没有接受“流式输入”数据的任何方法,但是通过子类化,可以使其接受数据。
SentenceReader的核心使用了一个DataReader作为其数据,通常这个数据阅读器是从一个单独的线程SentenceReader本身进行轮询的,我们可以修改这个结构以得到我们需要的。
首先,我们用自己的类对SentenceReader进行子类化,为它提供所需的适当的构造函数和方法,并消除启动和停止方法的影响。我们现在提供null作为文件(并希望将来的版本提供直接传入数据读取器的方法)

public class NettySentenceReader extends SentenceReader {
    public NettySentenceReader () {
        super((InputStream)null);
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }
}

我们现在需要在自己的netty处理程序中实现内部类DataReader的所有功能,以复制相同的行为
public class SentenceReaderHandler extends
         SimpleChannelInboundHandler<String> {
    private SentenceFactory factory;
    private SentenceReader parent;

    public SentenceReaderHandler (SentenceReader parent) {
        this.parent = parent;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        if(!ctx.channel().isActive())
            return;
        //ActivityMonitor monitor = new ActivityMonitor(parent);
        this.factory = SentenceFactory.getInstance();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //ActivityMonitor monitor = new ActivityMonitor(parent);
        this.factory = SentenceFactory.getInstance();
    }

    @Override
    // This method will be renamed to `messageReceived` in Netty 5.0.0
    protected void channelRead0(ChannelHandlerContext ctx, String data)
             throws Exception {
        if (SentenceValidator.isValid(data)) {
            monitor.refresh();
            Sentence s = factory.createParser(data);
            parent.fireSentenceEvent(s);
        } else if (!SentenceValidator.isSentence(data)) {
            parent.fireDataEvent(data);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        //monitor.reset();
        parent.fireReadingStopped();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        if(!ctx.channel().isActive())
            return;
        //monitor.reset();
        parent.fireReadingStopped();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
        parent.handleException("Data read failed", e);
    }
}

最后,我们需要将此集成到一个网络管道中:
SentenceReader reader = new NettySentenceReader();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    private static final StringDecoder DECODER = new StringDecoder();
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(DECODER);
        pipeline.addLast(new SentenceReaderHandler(reader));
    }
});

07-24 19:08
查看更多