我正在移植一个构建在 ACE Proactor 框架之上的应用程序。该应用程序在 VxWorks 和 Windows 上都能完美运行,但在内核为 2.6.X.X 的 Linux(CentOS 5.5、WindRiver Linux 1.4 和 3.0)上无法运行 - 使用 librt。
我已经将问题缩小到一个非常基本的问题:
应用程序在套接字上开始异步(通过 aio_read)读取操作,随后在同一个套接字上开始异步(通过 aio_write)写入。由于协议(protocol)是从应用程序端初始化的,因此无法完成读取操作。
- 当套接字处于阻塞模式时,写入永远不会到达并且协议(protocol)“挂起”。
- 使用 O_NONBLOCK 套接字时,写入成功但读取无限期返回并返回“EWOULDBLOCK/EAGAIN”错误,永远无法恢复(即使重新启动 AIO 操作)。
我浏览了多个论坛,但找不到关于 Linux AIO 是否应该工作(我做错了什么)或不可能的明确答案。如果我放弃 AIO 并寻求不同的实现(通过 epoll/poll/select 等),是否有可能?
附上一个示例代码,用于在非阻塞套接字上快速重现问题:
#include <aio.h>
#include <stdio.h>
#include <stdlib.h>
#include <netdb.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <assert.h>
#include <errno.h>
#define BUFSIZE (100)
// Global variables
struct aiocb *cblist[2];
int theSocket;
void InitializeAiocbData(struct aiocb* pAiocb, char* pBuffer)
{
bzero( (char *)pAiocb, sizeof(struct aiocb) );
pAiocb->aio_fildes = theSocket;
pAiocb->aio_nbytes = BUFSIZE;
pAiocb->aio_offset = 0;
pAiocb->aio_buf = pBuffer;
}
void IssueReadOperation(struct aiocb* pAiocb, char* pBuffer)
{
InitializeAiocbData(pAiocb, pBuffer);
int ret = aio_read( pAiocb );
assert (ret >= 0);
}
void IssueWriteOperation(struct aiocb* pAiocb, char* pBuffer)
{
InitializeAiocbData(pAiocb, pBuffer);
int ret = aio_write( pAiocb );
assert (ret >= 0);
}
int main()
{
int ret;
int nPort = 11111;
char* szServer = "10.10.9.123";
// Connect to the remote server
theSocket = socket(AF_INET, SOCK_STREAM, 0);
assert (theSocket >= 0);
struct hostent *pServer;
struct sockaddr_in serv_addr;
pServer = gethostbyname(szServer);
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(nPort);
bcopy((char *)pServer->h_addr, (char *)&serv_addr.sin_addr.s_addr, pServer->h_length);
assert (connect(theSocket, (const sockaddr*)(&serv_addr), sizeof(serv_addr)) >= 0);
// Set the socket to be non-blocking
int oldFlags = fcntl(theSocket, F_GETFL) ;
int newFlags = oldFlags | O_NONBLOCK;
fcntl(theSocket, F_SETFL, newFlags);
printf("Socket flags: before=%o, after=%o\n", oldFlags, newFlags);
// Construct the AIO callbacks array
struct aiocb my_aiocb1, my_aiocb2;
char* pBuffer = new char[BUFSIZE+1];
bzero( (char *)cblist, sizeof(cblist) );
cblist[0] = &my_aiocb1;
cblist[1] = &my_aiocb2;
// Start the read and write operations on the same socket
IssueReadOperation(&my_aiocb1, pBuffer);
IssueWriteOperation(&my_aiocb2, pBuffer);
// Wait for I/O completion on both operations
int nRound = 1;
printf("\naio_suspend round #%d:\n", nRound++);
ret = aio_suspend( cblist, 2, NULL );
assert (ret == 0);
// Check the error status for the read and write operations
ret = aio_error(&my_aiocb1);
assert (ret == EWOULDBLOCK);
// Get the return code for the read
{
ssize_t retcode = aio_return(&my_aiocb1);
printf("First read operation results: aio_error=%d, aio_return=%d - That's the first EWOULDBLOCK\n", ret, retcode);
}
ret = aio_error(&my_aiocb2);
assert (ret == EINPROGRESS);
printf("Write operation is still \"in progress\"\n");
// Re-issue the read operation
IssueReadOperation(&my_aiocb1, pBuffer);
// Wait for I/O completion on both operations
printf("\naio_suspend round #%d:\n", nRound++);
ret = aio_suspend( cblist, 2, NULL );
assert (ret == 0);
// Check the error status for the read and write operations for the second time
ret = aio_error(&my_aiocb1);
assert (ret == EINPROGRESS);
printf("Second read operation request is suddenly marked as \"in progress\"\n");
ret = aio_error(&my_aiocb2);
assert (ret == 0);
// Get the return code for the write
{
ssize_t retcode = aio_return(&my_aiocb2);
printf("Write operation has completed with results: aio_error=%d, aio_return=%d\n", ret, retcode);
}
// Now try waiting for the read operation to complete - it'll just busy-wait, receiving "EWOULDBLOCK" indefinitely
do
{
printf("\naio_suspend round #%d:\n", nRound++);
ret = aio_suspend( cblist, 1, NULL );
assert (ret == 0);
// Check the error of the read operation and re-issue if needed
ret = aio_error(&my_aiocb1);
if (ret == EWOULDBLOCK)
{
IssueReadOperation(&my_aiocb1, pBuffer);
printf("EWOULDBLOCK again on the read operation!\n");
}
}
while (ret == EWOULDBLOCK);
}
提前致谢,
约塔姆。
最佳答案
首先,O_NONBLOCK
和 AIO 不能混用。当相应的 read
或 write
不会阻塞时,AIO 将报告异步操作完成 - 并且使用 O_NONBLOCK
,它们永远不会阻塞,因此 aio
请求将始终立即完成(使用 aio_return()
给出 EWOULDBLOCK
)。
其次,不要对两个同时未完成的 aio 请求使用相同的缓冲区。在发出 aio 请求和 aio_error()
告诉您它已完成之间的时间之间,缓冲区应被视为完全禁止。
第三,对同一文件描述符的 AIO 请求排队,以便给出合理的结果。这意味着在读取完成之前您的写入不会发生 - 如果您需要先写入数据,则需要以相反的顺序发出 AIO。以下将正常工作,无需设置 O_NONBLOCK
:
struct aiocb my_aiocb1, my_aiocb2;
char pBuffer1[BUFSIZE+1], pBuffer2[BUFSIZE+1] = "Some test message";
const struct aiocb *cblist[2] = { &my_aiocb1, &my_aiocb2 };
// Start the read and write operations on the same socket
IssueWriteOperation(&my_aiocb2, pBuffer2);
IssueReadOperation(&my_aiocb1, pBuffer1);
// Wait for I/O completion on both operations
int nRound = 1;
int aio_status1, aio_status2;
do {
printf("\naio_suspend round #%d:\n", nRound++);
ret = aio_suspend( cblist, 2, NULL );
assert (ret == 0);
// Check the error status for the read and write operations
aio_status1 = aio_error(&my_aiocb1);
if (aio_status1 == EINPROGRESS)
puts("aio1 still in progress.");
else
puts("aio1 completed.");
aio_status2 = aio_error(&my_aiocb2);
if (aio_status2 == EINPROGRESS)
puts("aio2 still in progress.");
else
puts("aio2 completed.");
} while (aio_status1 == EINPROGRESS || aio_status2 == EINPROGRESS);
// Get the return code for the read
ssize_t retcode;
retcode = aio_return(&my_aiocb1);
printf("First operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode);
retcode = aio_return(&my_aiocb1);
printf("Second operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode);
或者,如果您不关心相对于彼此排序的读取和写入,您可以使用
dup()
为套接字创建两个文件描述符,并使用一个用于读取,另一个用于写入 - 每个都有其 AIO 操作分别排队。关于linux - Linux 中的并发套接字读/写 ("full-duplex")(特别是 aio),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/4616708/