前言
反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:
- Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。
- Handlers处理器的职责:非阻塞的执行业务处理逻辑。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等
1. 单线程反应器
IO多路复用使用了NIO模型。
Reactor模式使用了IO多路复用的机制。
/反应器
class EchoReactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
EchoReactor() throws IOException {
//...获取选择器、开启serverSocket服务监听通道
//Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress("127.0.0.1",8080));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理.第一步,注册serverSocket(监听)的accept事件
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//将新连接处理器作为附件,绑定到sk选择键
// 绑定AcceptorHandler新连接处理器到selectKey;类似对象的set方法
sk.attach(new AcceptorHandler());
}
//轮询和分发事件
public void run() {
try {
while (! Thread.interrupted()) {
// 返回注册在selector中等待IO操作(及有事件发生)channel的selectionKey
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
//反应器负责dispatch收到的事件
SelectionKey sk = it.next();
//分发
dispatch(sk);
}
selected.clear();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler!= null) {
handler.run();
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
//处理连接
SocketChannel channel = serverSocket.accept();
if (channel!= null)
// 构造方法中绑定。处理真实的逻辑
new EchoHandler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(new EchoReactor()).start();
}
}
class EchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//取得选择键,再设置感兴趣的IO事件
//将新的SocketChannel传输通道,注册到了反应器Reactor类的同一个选择器中。
// 这样保证了Reactor类和Handler类在同一个线程中执行
sk = channel.register(selector, 0);
//将Handler自身作为选择键的附件
//Channel传输通道注册完成后,将EchoHandler自身作为附件,attach到了选择键中。
// 这样,在Reactor类分发事件(选择键)时,能执行到EchoHandler的run方法
sk.attach(this);
//注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写入模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
System.out.println(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读取模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了,这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
1.1 单线程反应器问题
当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,如果被阻塞的Handler不仅仅负责输入和输出处理的业务,还包括负责连接监听的AcceptorHandler处理器。这个是非常严重的问题
2 多线程反应器
- 将负责业务处理的线程单独放入线程池中,即将业务处理的线程和连接监听、IO查询的线程隔离
- 如何服务器为多核CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程,即工作线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器(类似多个单线程的多路复用)。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力
//多线程反应器
class MultiThreadEchoServerReactor {
ServerSocketChannel serverSocket;
AtomicInteger next = new AtomicInteger(0);
Selector bossSelector = null;
//selectors集合,引入多个selector选择器
Selector[] workSelectors = new Selector[2];
Reactor bossReactor = null;
//引入多个子反应器
Reactor[] workReactors = null;
MultiThreadEchoServerReactor() throws IOException {
//初始化多个selector选择器
bossSelector = Selector.open();// 用于监听新连接事件
workSelectors[0] = Selector.open(); // 用于监听read、write事件
workSelectors[1] = Selector.open(); // 用于监听read、write事件
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);//非阻塞
//bossSelector,负责监控新连接事件, 将 serverSocket注册到bossSelector
SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
//绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
sk.attach(new AcceptorHandler());
//bossReactor反应器,处理新连接的bossSelector
bossReactor = new Reactor(bossSelector);
//第一个子反应器,一子反应器负责一个worker选择器
Reactor workReactor1 = new Reactor(workSelectors[0]);
//第二个子反应器,一子反应器负责一个worker选择器
Reactor workReactor2 = new Reactor(workSelectors[1]);
workReactors = new Reactor[]{workReactor1, workReactor2};
}
private void startService() {
// 一子反应器对应一条线程
new Thread(bossReactor).start(); //只负责连接的监听
new Thread(workReactors[0]).start();
new Thread(workReactors[1]).start();
}
//反应器
//连接|工作
class Reactor implements Runnable {
//每条线程负责一个选择器的查询
final Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) {
//单位为毫秒
selector.select(1000);
//连接和工作有不同的选择器
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (null == selectedKeys || selectedKeys.size() == 0) {
continue;
}
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
//不同的选择器,执行附件里面不同的逻辑
dispatch(sk);
}
selectedKeys.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
//取出附件,接下来去执行
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
//执行逻辑
if (handler != null) {
handler.run();
}
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一个新的连接");
//连接成功了,可能有数据过来,放入工作选择器,来监听
if (channel != null) {
int index = next.get();
Logger.info("选择器的编号:" + index);
Selector selector = workSelectors[index];
//连接的通道 放到选择器中
new MultiThreadEchoHandler(selector, channel);
}
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == workSelectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//唤醒 查询线程,使得OP_READ生效
selector.wakeup();
Logger.info("新的连接 注册完成");
}
public void run() {
//异步任务,在独立的线程池中执行
//提交数据传输任务到线程池
//使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//异步任务,不在Reactor线程中执行
//数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
注意:
使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
pool.execute(new AsyncTask());
。工作线程也是有限的,就几个,是非阻塞的,真正的去完成数据传输这种操作,还是新建线程去完成,不然整个服务不就变成几个工作线程了,工作线程来查询连接是否有数据需要处理,然后用线程池去处理。
引用:《Java高并发核心编程(卷1)》一书。做了更多的注释