具体实现原理和模型图,请参考:Reactor模型介绍
单Reactor单线程实现,请参考:基于NIO-Reactor的介绍和单Reactor单线程模型的实现
在单Reactor单线程模型中,还有可优化的地方,那就是dispatch每分发一个事件,无论是ReadHandler和WriteHander中数据数据是同步处理的,以ReadHander举例:
class ReadHandler{ private SelectionKey selectionKey; public ReadHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public void run() { try { SocketChannel sc = (SocketChannel) selectionKey.channel(); //处理读请求 doRead(sc); //处理完读请求,将通道注册为写 Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_WRITE); }catch (Exception e){ e.printStackTrace(); } } private void doRead(SocketChannel ssc) { System.out.println("读取数据,然后做一些数据处理"); } }
在ReadHandler中,doRead(sc),方法是同步的,而一般读到一个请求要经历解析请求信息,然后交到对应的service处理业务等等操作,非常耗时,此时将doRead进行异步处理,将能提高服务器资源的使用,提高性能。
所以单Reactor单线程模型的实现,就是将单线程模型中的Handler中的数据处理异步处理,并引入线程池管理这些线程,这样dispatch将专注于事件分发,而各自handler专注于各自的事件处理。
/** * 单Reactor多线程模型 */ public class OneReactorMultiThreadMode { public static void main(String[] args) { /** * 初始化一个线程池,然后启动MultiReacor */ ThreadPool.getPool().init(3); new MultiReactor(8089).run(); } }
class MultiReactor implements Runnable{ private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; public MultiReactor(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); SelectionKey sk = servChannel.register(selector, SelectionKey.OP_ACCEPT); stop=false; System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { disptach(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } } private void disptach(SelectionKey key) { if(key.isValid()){ if(key.isAcceptable()){ new MultiAcceptor(key).run(); } if (key.isReadable()){ new MultiReadHandler(key).run(); } if(key.isWritable()){ new MultiWriteHandler(key).run(); } } } }
class MultiAcceptor { private SelectionKey selectionKey; public MultiAcceptor(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public void run() { try { ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); }catch (Exception e){ e.printStackTrace(); } } }
class MultiReadHandler { private SelectionKey selectionKey; public MultiReadHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public void run() { try { final SocketChannel sc = (SocketChannel) selectionKey.channel(); //使用线程池,异步处理读请求 ThreadPool.getPool().submit(new Runnable() { @Override public void run() { doRead(sc); } }); //处理完读请求,将通道注册为写 Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_WRITE); }catch (Exception e){ e.printStackTrace(); } } private void doRead(SocketChannel ssc) { System.out.println("读取数据,然后做一些数据处理"); } }
class MultiWriteHandler { private SelectionKey selectionKey; public MultiWriteHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public void run() { try { final SocketChannel sc = (SocketChannel) selectionKey.channel(); //使用线程池,异步处理写请求 ThreadPool.getPool().submit(new Runnable() { @Override public void run() { doWrite(sc); } }); //写完后,将通道注册为读 Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); }catch (Exception e){ e.printStackTrace(); } } private void doWrite(SocketChannel sc) { System.out.println("处理写。。。"); } }
/** * 示例-线程池 */ class ThreadPool{ public static final ThreadPool pool = new ThreadPool(); private boolean init=false; private static ExecutorService executorService; private ThreadPool(){}; public synchronized void init(int size){ if(!init){ executorService=Executors.newFixedThreadPool(size); init=true; }else { System.out.println("the thread pool had inited"); } } public static ThreadPool getPool(){ return pool; } public void submit(Runnable runnable){ if(init){ executorService.submit(runnable); }else { throw new RuntimeException("the thread pool is not inited"); } } }