本文介绍了具有Netty的多线程UDP服务器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Netty实现UDP服务器。想法是只绑定一次(因此只创建一个 Channel )。这个 Channel 初始化时只有一个处理程序,它通过 ExecutorService 在多个线程之间调度传入数据报的处理。

I'm trying to implement a UDP server with Netty. The idea is to bind only once (therefore creating only one Channel). This Channel is initialized with only one handler that dispatches processing of incoming datagrams among multiple threads via an ExecutorService.

@Configuration
public class SpringConfig {

    @Autowired
    private Dispatcher dispatcher;

    private String host;

    private int port;

    @Bean
    public Bootstrap bootstrap() throws Exception {
        Bootstrap bootstrap = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(dispatcher);

        ChannelFuture future = bootstrap.bind(host, port).await();
        if(!future.isSuccess())
            throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());

        return bootstrap;
    }
}

@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {

    private int workerThreads;

    private ExecutorService executorService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DatagramPacket packet = (DatagramPacket) msg;

        final Channel channel = ctx.channel();

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //Process the packet and produce a response packet (below)
                DatagramPacket responsePacket = ...;

                ChannelFuture future;
                try {
                    future = channel.writeAndFlush(responsePacket).await();
                } catch (InterruptedException e) {
                    return;
                }
                if(!future.isSuccess())
                    log.warn("Failed to write response packet.");
            }
        });
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        executorService = Executors.newFixedThreadPool(workerThreads);
    }
}

我有以下问题:


  1. channelRead 收到 DatagramPacket Dispatcher 类的方法在被工作线程使用之前是否重复?我想知道在 channelRead 方法返回后该数据包是否被销毁,即使工作线程保留了引用。

  2. 是不是安全地在所有工作线程中共享 Channel 并让他们同时调用 writeAndFlush

  1. Should the DatagramPacket received by the channelRead method of the Dispatcher class be duplicated before being used by the worker thread? I wonder if this packet is destroyed after the channelRead method returns, even if a reference is kept by the worker thread.
  2. Is it safe to share the Channel among all the worker threads and let them call writeAndFlush concurrently?

谢谢!

推荐答案


  1. 不。如果你需要让对象活得更久,你可以把它变成别的东西,或者使用 ReferenceCountUtil.retain(datagram)然后 ReferenceCountUtil.release(datagram) 一旦你完成它。你也不应该在执行者服务上做 await(),你应该为任何事情注册一个处理程序。

  1. Nope. If you need the object to live longer you either turn it into something else or use ReferenceCountUtil.retain(datagram) and then ReferenceCountUtil.release(datagram) once you're done with it. You also shouldn't be doing await() at the executor service as well, you should register a handler for whatever happens.

是的,通道对象是线程安全的,可以从许多不同的线程中调用它们。

Yes, channel objects are thread safe and they can be called from many different threads.

这篇关于具有Netty的多线程UDP服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-16 21:31
查看更多