1.ACE反应器框架简介

反应器(Reactor):用于事件多路分离和分派的体系结构模式

对一个文件描述符指定的文件或设备的操作, 有两种工作方式: 阻塞与非阻塞。 在设计服务端程序时,如果采用阻塞模式将会影响整个系统的工作效率,改进方法有如下几个:

  1)每建立一个Socket连接时,同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程的无疑是对系统资源的一种浪费。

  2)服务器端保存一个Socket连接列表,然后对这个列表进行轮询,如果发现某个Socket端口上有数据可读时(读就绪),则调用该socket连接的相应读操作;如果某个端口的Socket连接已经中断,则调用相应的析构方法关闭该端口。这样能充分利用服务器资源,效率得到了很大提高。

  反应器本质上提供一组更高级的编程抽象,简化了事件驱动的分布式应用的设计和实现。除此而外,反应器还将若干不同种类的事件的多路分离集成到易于使用的API中。特别地,反应器对基于定时器的事件、信号事件、基于I/O端口监控的事件和用户定义的通知进行统一地处理。 ACE中的反应器与若干内部和外部组件协同工作。其基本概念是反应器框架检测事件的发生(通过在OS事件多路分离接口上进行侦听),并发出对预登记事件处理器(event handler)对象中的方法的"回调"(callback)。该方法由应用开发者实现,其中含有应用处理此事件的特定代码。

使用ACE的反应器,只需如下几步:

  1. 创建事件处理器,以处理他所感兴趣的某事件。
  2. 在反应器上登记,通知说他有兴趣处理某事件,同时传递他想要用以处理此事件的事件处理器的指针给反应器。

随后反应器框架将自动地:

  1. 在内部维护一些表,将不同的事件类型与事件处理器对象关联起来。
  2. 在用户已登记的某个事件发生时,反应器发出对处理器中相应方法的回调。

反应器模式在ACE中被实现为ACE_Reactor类,它提供反应器框架的功能接口。

如上面所提到的,反应器将事件处理器对象作为服务提供者使用。反应器内部记录某个事件处理器的特定事件的相关回调方法。当这些事件发生时,反应器会创建这种事件和相应的事件处理器的关联。

  1. 事件处理器
    事件处理器就是需要通过轮询发生事件改变的对象列表中的对象,如在上面的例子中就是连接的客户端,每个客户端都可以看成一个事件处理器。
  2. 回调事件
    就是反应器支持的事件,如Socket读就绪,写就绪。拿上面的例子来说,如果某个客户端(事件处理器)在反应器中注册了读就绪事件,当客户端给服务器发送一条消息的时候,就会触发这个客户端的数据可读的回调函数。

在反应器框架中,所有应用特有的事件处理器都必须由ACE_Event_Handler的抽象接口类派生。可以通过重载相应的"handle_"方法实现相关的回调方法。

使用ACE_Reactor基本上有三个步骤:

  1. 创建ACE_Event_Handler的子类,并在其中实现适当的"handle_"方法,以处理你想要此事件处理器为之服务的事件类型。
  2. 通过调用反应器对象的register_handler(),将你的事件处理器登记到反应器。
  3. 在事件发生时,反应器将自动回调相应的事件处理器对象的适当的handle_"方法。

下面我就以一个Socket客户端的例子为例简单的说明反应器的基本用法。

#include <ace/OS.h>
#include <ace/Reactor.h>
#include <ace/SOCK_Connector.h> #include <string>
#include <iostream>
using namespace std; class MyClient:public ACE_Event_Handler
{
public:
bool open()
{
ACE_SOCK_Connector connector;
ACE_INET_Addr addr(,"127.0.0.1");
ACE_Time_Value timeout(,);
if(connector.connect(peer,addr,&timeout) != )
{
cout<<endl<<"connecetd fail";
return false;
}
ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
cout<<endl<<"connecetd ";
return true;
} ACE_HANDLE get_handle(void) const
{
return peer.get_handle();
} int handle_input (ACE_HANDLE fd)
{
int rev=;
ACE_Time_Value timeout(,);
if((rev=peer.recv(buffer,,&timeout))>)
{
buffer[rev]='\0';
cout<<endl<<"rev:\t"<<buffer<<endl;
}
return ;
} private:
ACE_SOCK_Stream peer;
char buffer[];
}; int main(int argc, char *argv[])
{
MyClient client;
client.open(); while(true)
{
ACE_Reactor::instance()->handle_events();
} return ;
}

在这个例子中,客户端连接上服务器后,通过ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注册了一个读就绪的回调函数,当服务器端给客户端发消息的时候,会自动触发handle_input()函数,将接收到的信息打印出来。

05-11 15:11