一.异步IO模型(asynchronous IO)

(1)什么是异步I/O

异步I/O(asynchronous I/O)由POSIX规范定义。演变成当前POSIX规范的各种早起标准所定义的实时函数中存在的差异已经取得一致。一般地说,这些函数的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。这种模型与前一节介绍的信号驱动模型的主要区别在于:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。

示意图如下:

windows 异步通知I/O模型与重叠I/O模型-LMLPHP

我们调用aio_read函数(POSIX异步I/O函数以aio_或lio_开头),给内核传递描述符、缓冲区指针、缓冲区大小(与read相同的三个参数)和文件偏移(与lseek类似),并告诉内核当整个操作完成时如何通知我们。该系统调用立即返回,并且在等待I/O完成期间,我们的进程不被阻塞。本例子中我们假设要求内核在操作完成时产生某个信号,该信号直到数据已复制到应用进程缓冲区才产生,这一点不同于信号驱动I/O模型。

windows 异步通知I/O模型与重叠I/O模型-LMLPHP

windows 异步通知I/O模型与重叠I/O模型-LMLPHP

(2)运用到的函数讲解--WSAEventSelect模型

  函数过程:

  1. 初始化网络环境,创建一个监听的socket,然后进行connect操作。接下来WSACreateEvent()创建一个网络事件对象,其声明如下:

    WSAEVENT WSACreateEvent(void); //返回一个手工重置的事件对象句柄
  2. 再调用WSAEventSelect,来将监听的socket与该事件进行一个关联,其声明如下:
    int WSAEventSelect(
    SOCKET s, //套接字
    WSAEVENT hEventObject, //网络事件对象
    long lNetworkEvents //需要关注的事件
    );

    我们客户端只关心FD_READ和FD_CLOSE操作,所以第三个参数传FD_READ | FD_CLOSE。

  3. 启动一个线程调用WSAWaitForMultipleEvents等待1中的event事件,其声明如下:
    windows 异步通知I/O模型与重叠I/O模型-LMLPHP
    windows 异步通知I/O模型与重叠I/O模型-LMLPHP
    DWORD WSAWaitForMultipleEvents(
    DWORD cEvents, //指定了事件对象数组里边的个数,最大值为64
    const WSAEVENT FAR *lphEvents, //事件对象数组
    BOOL fWaitAll, //等待类型,TRUE表示要数组里全部有信号才返回,FALSE表示至少有一个就返回,这里必须为FALSE
    DWORD dwTimeout, //等待的超时时间
    BOOL fAlertable //当系统的执行队列有I/O例程要执行时,是否返回,TRUE执行例程返回,FALSE不返回不执行,这里为FALSE
    );
    windows 异步通知I/O模型与重叠I/O模型-LMLPHP
    windows 异步通知I/O模型与重叠I/O模型-LMLPHP

    由于我们是客户端,所以只等待一个事件。

  4. 当事件发生,我们需要调用WSAEnumNetworkEvents,来检测指定的socket上的网络事件。其声明如下:
    windows 异步通知I/O模型与重叠I/O模型-LMLPHP
    int WSAEnumNetworkEvents
    (
    SOCKET s, //指定的socket
    WSAEVENT hEventObject, //事件对象
    LPWSANETWORKEVENTS lpNetworkEvents //WSANETWORKEVENTS<span style="font-family:Arial, Helvetica, sans-serif;">结构地址</span>
    );
    windows 异步通知I/O模型与重叠I/O模型-LMLPHP

    当我们调用这个函数成功后,它会将我们指定的socket和事件对象所关联的网络事件的信息保存到WSANETWORKEVENTS这个结构体里边去,我们来看下这个结构体的声明:

    typedef struct _WSANETWORKEVENTS {
    long lNetworkEvents;<span style="white-space:pre"> </span>//指定了哪个已经发生的网络事件
    int iErrorCodes[FD_MAX_EVENTS];<span style="white-space:pre"> </span>//错误码
    } WSANETWORKEVENTS, *LPWSANETWORKEVENTS;

    根据这个结构体我们就可以判断是否是我们所关注的网络事件已经发生了。如果是我们的读的网络事件发生了,那么我们就调用recv函数进行操作。若是关闭的事件发生了,就调用closesocket将socket关掉,在数组里将其置零等操作。

  整个模型的流程图如下:

windows 异步通知I/O模型与重叠I/O模型-LMLPHP

(3)实现服务端代码:

windows 异步通知I/O模型与重叠I/O模型-LMLPHP
#include <WinSock2.h>
#include <process.h>
#include <stdio.h>
#pragma comment(lib,"ws2_32.lib") SOCKET g_sClient[WSA_MAXIMUM_WAIT_EVENTS] = {INVALID_SOCKET}; //client socket数组
WSAEVENT g_event[WSA_MAXIMUM_WAIT_EVENTS]; //网络事件对象数组
SOCKET g_sServer = INVALID_SOCKET; //server socket
WSAEVENT g_hServerEvent; //server 网络事件对象
int iTotal = 0; //client个数
/*
@function OpenTCPServer 打开TCP服务器
@param _In_ unsigned short Port 服务器端口
@param _Out_ DWORD* dwError 错误代码
@return 成功返回TRUE 失败返回FALSE
*/
BOOL OpenTCPServer( _In_ unsigned short Port, _Out_ DWORD* dwError)
{
BOOL bRet = FALSE;
WSADATA wsaData = { 0 };
SOCKADDR_IN ServerAddr = { 0 };
ServerAddr.sin_family = AF_INET;
ServerAddr.sin_port = htons(Port);
ServerAddr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
do
{
if (!WSAStartup(MAKEWORD(2, 2), &wsaData))
{
if (LOBYTE(wsaData.wVersion) == 2 || HIBYTE(wsaData.wVersion) == 2)
{
g_sServer = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
g_hServerEvent = WSACreateEvent(); //创建网络事件对象
WSAEventSelect(g_sServer, g_hServerEvent, FD_ACCEPT);//为server socket注册网络事件
if (g_sServer != INVALID_SOCKET)
{
if (SOCKET_ERROR != bind(g_sServer, (SOCKADDR*)&ServerAddr, sizeof(ServerAddr)))
{
if (SOCKET_ERROR != listen(g_sServer, SOMAXCONN))
{
bRet = TRUE;
break;
}
*dwError = WSAGetLastError();
closesocket(g_sServer);
}
*dwError = WSAGetLastError();
closesocket(g_sServer);
}
*dwError = WSAGetLastError();
}
*dwError = WSAGetLastError(); }
*dwError = WSAGetLastError();
} while (FALSE);
return bRet;
} //接受client请求线程
unsigned int __stdcall ThreadAccept(void* lparam)
{
WSANETWORKEVENTS networkEvents; //网络事件结构
while (iTotal < WSA_MAXIMUM_WAIT_EVENTS) //这个值是64
{
if (0 == WSAEnumNetworkEvents(g_sServer, g_hServerEvent, &networkEvents))
{
if (networkEvents.lNetworkEvents & FD_ACCEPT) //如果等于FD_ACCEPT,相与就为1
{
if (0 == networkEvents.iErrorCode[FD_ACCEPT_BIT]) //检查有无网络错误
{
//接受请求
SOCKADDR_IN addrServer = { 0 };
int iaddrLen = sizeof(addrServer);
g_sClient[iTotal] = accept(g_sServer, (SOCKADDR*)&addrServer, &iaddrLen);
if (g_sClient[iTotal] == INVALID_SOCKET)
{
printf("accept failed with error code: %d\n", WSAGetLastError());
return 1;
}
//为新的client注册网络事件
g_event[iTotal] = WSACreateEvent();
WSAEventSelect(g_sClient[iTotal], g_event[iTotal], FD_READ | FD_WRITE | FD_CLOSE);
iTotal++;
printf("accept a connection from IP: %s,Port: %d\n", inet_ntoa(addrServer.sin_addr), htons(addrServer.sin_port));
}
else //错误处理
{
int iError = networkEvents.iErrorCode[FD_ACCEPT_BIT];
printf("WSAEnumNetworkEvents failed with error code: %d\n", iError);
return 1;
}
}
}
Sleep(100);
}
return 0;
} //接收数据
unsigned int __stdcall ThreadRecv(void* lparam)
{
char* buf = (char*)malloc(sizeof(char) * 128);
while (1)
{
if (iTotal == 0)
{
Sleep(100);
continue;
}
//等待网络事件
DWORD dwIndex = WSAWaitForMultipleEvents(iTotal, g_event, FALSE, 1000, FALSE);
//当前的事件对象
WSAEVENT curEvent = g_event[dwIndex];
//当前的套接字
SOCKET sCur = g_sClient[dwIndex];
//网络事件结构
WSANETWORKEVENTS networkEvents;
if (0 == WSAEnumNetworkEvents(sCur, curEvent, &networkEvents))
{
if (networkEvents.lNetworkEvents & FD_READ) //有数据可读
{
if (0 == networkEvents.iErrorCode[FD_READ_BIT])
{
memset(buf, 0, sizeof(buf));
int iRet = recv(sCur, buf, sizeof(buf), 0); //接收数据
if (iRet != SOCKET_ERROR)
{
if (strlen(buf) != 0)
printf("Recv: %s\n", buf);
}
}
else //错误处理
{
int iError = networkEvents.iErrorCode[FD_ACCEPT_BIT];
printf("WSAEnumNetworkEvents failed with error code: %d\n", iError);
break;
}
}
else if (networkEvents.lNetworkEvents & FD_CLOSE) //client关闭
printf("%d downline\n", sCur);
}
Sleep(100);
}
if (buf)
free(buf);
return 0;
} int main()
{
DWORD dwError = 0;
if (OpenTCPServer(18000, &dwError))
{
_beginthreadex(NULL, 0, ThreadAccept, NULL, 0, NULL);
_beginthreadex(NULL, 0, ThreadRecv, NULL, 0, NULL);
}
Sleep(100000000);
closesocket(g_sServer);
WSACleanup();
return 0;
}
windows 异步通知I/O模型与重叠I/O模型-LMLPHP

二.重叠IO模型

1-重叠模型的优点

1可以运行在支持Winsock2的所有Windows平台,而不像完成端口只支持NT系统

2比起阻塞,select,WSAAsyncSelect以及WSAEventSelect等模型,重叠I/O(Overlapped I/O)模型使应用程序能达到更加系统性能

因为他和其他4种模型不同的是,使用重叠模型的应用程序通知缓冲区收发系统直接使用数据,也就是说,如果应用程序

投递了一个10kb大小的缓冲区来接收数据,而数据已经到达套接字,则将该数据直接拷贝到投递的缓冲区,

而4种模型中,数据达到并拷贝到单套接字接收缓冲区,此时应用程序会被告知可以读入的容量,当应用程序调用

接收函数之后,数据才从单套接字缓冲区拷贝应用程序到缓冲区,差别就体现了。

2-重叠模型的基本原理

重叠模型是让应用程序使用重叠数据结构(WSAOVERLAPPED),一次投递一个或多个Winsock I/O请求,针对这些提交的

请求,在他们完成之后,应用程序会收到通知,于是就可通过自己的代码来处理这些数据了。

使用事件通知的方法来实现重叠IO模型,基于事件的话,就要求将Win事件与WSAOVERLAPPED结构关联在一起,

使用重叠结构,常用的send,sendto,recv,recvform也被WSASend,WSARecv等替换掉,

OVERLAPPER SOCKET(重叠Socket)上进行重叠发送的操作,(简单的理解就是异步send,recv)

他们的参数中都有一个Overlapped参数,就是说所有的重叠Socket都要绑定到这个重叠结构体上,

提交一个请求,其他的事情就交给重叠结构去操心, 而其中重叠结构要与Windows事件绑定在一起,

在样,我们调用完WSARecv后.等重叠操作完成,就会有对应的事件来同意我们操作完成,

3-重叠模型的函数详解

(1)创建套接字

要使用重叠I/O模型,在创建套接字时,必须使用WSASocket函数,设置重叠标志。

The WSASocket function creates a socket that is bound to a specific transport-service provider.

由于要用到重叠模型来提交我们的操作,所以原来的recv、send、sendto、recvfrom等函数都要被替换为WSARecv、WSASend、WSASendto、WSARecvFrom函数来代替。

(2)传输数据

在重叠I/O模型中,传输数据的函数是WSASend\WSARecv(TCP)和WSASendTo、WSARecvFrom等,下面是WSASend的定义:

The WSASend function sends data on a connected socket.

参数

s:标识一个已连接套接口的描述字。
lpBuffers:一个指向WSABUF结构数组指针。每个WSABUF结构包含缓冲区的指针和缓冲区的大小。
dwBufferCount:lpBuffers数组中WSABUF结构的数目。
lpNumberOfBytesSent:如果发送操作立即完成,则为一个指向所发送数据字节数的指针。
dwFlags:标志位。
lpOverlapped:指向WSAOVERLAPPED结构的指针(对于非重叠套接口则忽略)。
lpCompletionRoutine:一个指向发送操作完成后调用的完成例程的指针。(对于非重叠套接口则忽略)
 

返回值

若无错误发生且发送操作立即完成,则WSASend()函数返回0。这时,完成例程(Completion Routine)应该已经被调度,一旦调用线程处于alertable状态时就会调用它。否则,返回SOCKET_ERROR 。通过WSAGetLastError获得详细的错误代码。WSA_IO_PENDING 这个错误码(其实表示没有错误)表示重叠操作已经提交成功(就是异步IO的意思了),稍后会提示完成(这个完成可不一定是发送成功,没准出问题也不一定)。其他的错误代码都代表重叠操作没有正确开始,也不会有完成标志出现。

可以异步接收连接请求的函数是AcceptEX。这是一个Mincrosoft扩展函数,它接受一个新的连接,返回本地和远程地址,取得客户程序发送的第一块数据,函数定义如下:

The AcceptEx function accepts a new connection, returns the local and remote address, and receives the first block of data sent by the client application.

Note  This function is a Microsoft-specific extension to the Windows Sockets specification.

参数

返回值
AcceptEX函数将几个套接字函数的功能集合在一起。如果它投递的请求成功完成,则执行了如下3个操作:

(1)接受了新的连接

(2)新连接的本地地址和远程地址都会返回

(3)接收到了远程主机发来的第一块数据

AcceptEX和大家熟悉的accept函数有很大的不同就是AcceptEX函数需要调用者提供两个套接字,一个指定了在哪个套接字上监听,另一个指定了在哪个套接字上接受连接,也就是说,AcceptEX不会像accept函数一样为新的连接创建套接字。

如果提供了新的缓冲区,AcceptEX投递的重叠操作直到接受到连接并且读到数据之后才会返回。以SO_CONNECT_TIME为参数调用getsockopt函数可以检查到是否接受了连接,如果接受了连接,这个调用还可以取得连接已经建立了多长时间。

  AcceptEX函数是从Mswsock.lib库中导出的,为了能够直接调用它,而不用链接到Mswsock.lib库,需要使用WSAIoctl函数将AcceptEX函数加载到内存,WSAIoctl函数是ioctlsocket函数的扩展,它可以使用重叠I/O。函数的第3个到第6个参数是输入和输出缓冲区,在这里传递AcceptEX函数的指针

(4)接收传输结果

当重叠I/O请求最终完成以后,以之关联的事件对象受信,等待函数返回,应用程序可以使用WSAGetOverlappedResult函数取得重叠操作的结果,函数用法如下:

The WSAGetOverlappedResult function retrieves the results of an overlapped operation on the specified socket.

参数:
返回值:
4-重叠模型的实例代码:
windows 异步通知I/O模型与重叠I/O模型-LMLPHP
//完成例程实现重叠io模型伪代码
SOCKET acceptSock;
WSABUF dataBuf; void main()
{
WSAOVERLAPPED overlapped;
//1.初始化
//... //2.接收连接请求
acceptSock=accept(listenSock,NULL,NULL); //3.初始化重叠结构
UINT flag=0;
ZeroMemory(&overlapped,sizeof(WSAOVERLAPPED));
dataBuf.len=DATA_BUFSIZE;
dataBuf.buf=buf; if (WSARecv(acceptSock,&dataBuf,1,&recvBytes,&flag,&overlapped,workroutine)==SOCKET_ERROR)//最后一个参数时回调函数地址
{
if(WSAGetLastError()!=WSA_IO_PENDING)
{
printf("WSARecv() failed with error %d\n",WSAGetLastError());
return;
}
} //创建事件
eventArray[0]=WSACreateEvent();
while (true)
{
int index=WSAWaitForMultipleEvents(1,eventArray,FALSE,WSA_INFINITE,TRUE);//最后一个参数最好为true
if (index==WAIT_IO_COMPLETION)//io请求完成
{
break;
}
else//io请求出错
{
return;
}
}
//调用回调函开始进行处理
} void CALLBACK WorkRoutine(DWORD error,DWORD bytesTransferred,LPWSAOVERLAPPED overlapped,DWORD inflag)
{
DWORD sendBytes,recvBytes;
DWORD flags; if(error!=0||bytesTransferred==0)
{
closesocket(acceptSock);
return;
} flags=0; ZeroMemory(&overlapped,sizeof(WSAOVERLAPPED));
dataBuf.len=DATA_BUFSIZE;
dataBuf.data=buf; if (WSARecv(acceptSock,&dataBuf,1,&recvBytes,&flag,&overlapped,workroutine)==SOCKET_ERROR)//最后一个参数时回调函数地址
{
if(WSAGetLastError()!=WSA_IO_PENDING)
{
printf("WSARecv() failed with error %d\n",WSAGetLastError());
return;
}
}
}
windows 异步通知I/O模型与重叠I/O模型-LMLPHP
05-11 17:26