问题描述
我正在尝试使用Netty实现UDP服务器。想法是只绑定一次(因此只创建一个 Channel
)。这个 Channel
初始化时只有一个处理程序,它通过 ExecutorService
在多个线程之间调度传入数据报的处理。
I'm trying to implement a UDP server with Netty. The idea is to bind only once (therefore creating only one Channel
). This Channel
is initialized with only one handler that dispatches processing of incoming datagrams among multiple threads via an ExecutorService
.
@Configuration
public class SpringConfig {
@Autowired
private Dispatcher dispatcher;
private String host;
private int port;
@Bean
public Bootstrap bootstrap() throws Exception {
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(dispatcher);
ChannelFuture future = bootstrap.bind(host, port).await();
if(!future.isSuccess())
throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());
return bootstrap;
}
}
@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {
private int workerThreads;
private ExecutorService executorService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
final Channel channel = ctx.channel();
executorService.execute(new Runnable() {
@Override
public void run() {
//Process the packet and produce a response packet (below)
DatagramPacket responsePacket = ...;
ChannelFuture future;
try {
future = channel.writeAndFlush(responsePacket).await();
} catch (InterruptedException e) {
return;
}
if(!future.isSuccess())
log.warn("Failed to write response packet.");
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
executorService = Executors.newFixedThreadPool(workerThreads);
}
}
我有以下问题:
-
channelRead
收到DatagramPacket
Dispatcher
类的方法在被工作线程使用之前是否重复?我想知道在channelRead
方法返回后该数据包是否被销毁,即使工作线程保留了引用。 - 是不是安全地在所有工作线程中共享
Channel
并让他们同时调用writeAndFlush
?
- Should the
DatagramPacket
received by thechannelRead
method of theDispatcher
class be duplicated before being used by the worker thread? I wonder if this packet is destroyed after thechannelRead
method returns, even if a reference is kept by the worker thread. - Is it safe to share the
Channel
among all the worker threads and let them callwriteAndFlush
concurrently?
谢谢!
推荐答案
-
不。如果你需要让对象活得更久,你可以把它变成别的东西,或者使用
ReferenceCountUtil.retain(datagram)
然后ReferenceCountUtil.release(datagram)
一旦你完成它。你也不应该在执行者服务上做await()
,你应该为任何事情注册一个处理程序。
Nope. If you need the object to live longer you either turn it into something else or use
ReferenceCountUtil.retain(datagram)
and thenReferenceCountUtil.release(datagram)
once you're done with it. You also shouldn't be doingawait()
at the executor service as well, you should register a handler for whatever happens.
是的,通道对象是线程安全的,可以从许多不同的线程中调用它们。
Yes, channel objects are thread safe and they can be called from many different threads.
这篇关于具有Netty的多线程UDP服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!