本文介绍了TCP/IP IOCP 接收的数据有时会损坏 - Windows 上的 Visual C++的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个简单的测试 ICOP 客户端和服务器,以确保我正确使用 API 并且客户端发送的数据被服务器正确接收.我已经包含了这个问题的所有代码.

I am writing a simple test ICOP client and server to ensure I am using the API correctly and that the data the client sending is being received correctly by the server. I have included all the code for this question.

这是我遇到一些问题的地方,接收缓冲区中的数据有时似乎已损坏(损坏的原因是有时缓冲区中的数据块可能乱序或丢失).需要明确的是,这是单个接收缓冲区中的数据,我的意思并不是由于线程调度问题而导致多个缓冲区之间的顺序混乱.我之前发布了一个与此相关的问题这里.但是,我在获得正确的代码示例方面做了更多工作,因此我发布了一个新问题,并将链接到此问题.我希望其他人能够运行此代码并体验相同的奇怪行为.

This is where I ran into some problems, that the data within the receive buffers sometimes seems to be corrupted (corrupted in that sometimes chunks of data within buffers can be out of order or missing). To be clear, this is data within individual receive buffers, I don’t mean out of order between multiple buffers because of thread scheduling issues. I previously posted a question related to this here. However I have done more work in getting a proper code example so am posting a new question and and will link to this. I am hoping others are able to run this code and experience the same weird behaviour.

测试代码

测试应用可以在两种模式下运行,客户端和服务器.运行服务器,它开始侦听,运行客户端并连接到服务器,一旦连接将开始以允许的速度向服务器抛出数据.然后,服务器验证调用 WSARecv 后从 GetQueuedCompletionStatus 返回的每个缓冲区中的数据.每次 WSASend 完成时,我都会将结构的 OVERLAPPED 部分清零,并使用原始数据缓冲区再次调用 WSASend.

The test app can run in two modes, client and server. Run the server and it starts listening, run the client and connects to the server and as soon as it has connected will start throwing data at the server as fast as it will allow. The server then verifies the data within each buffer that is returned from GetQueuedCompletionStatus after calls to WSARecv. Each time a WSASend completes, I zero out the OVERLAPPED section of the structure and call WSASend again with the original buffer of data.

客户端发送的每个数据缓冲区都是一个字节序列,这些字节一个接一个地递增,直到指定的最大值.我不会发送完整范围 0..255,以防大小以倍数整齐地放入数据包中,并且以某种方式隐藏了问题,因此在我的示例代码中,字节范围为 0..250.对于构建的每个发送缓冲区,我重复该模式 numberOfGroups 次.

Each buffer of data the client sends is a sequence of bytes that increment one after the other up to a maximum specified. I don’t send the full range 0..255 in case that size fits neatly in multiples into packets and somehow hides the issue so in my example code bytes range from 0..250. For each send buffer that is constructed, I repeat that pattern numberOfGroups times.

这种格式应该意味着我可以有多个 WSARecv 未完成,然后完全独立于任何其他缓冲区来验证返回缓冲区中的数据,这意味着不需要同步或重构顺序.即我可以从第一个字节开始并验证它们是否一个接一个地递增到最大值,然后重置为 0.一旦我的测试没有问题,我就可以转向更复杂的东西,对接收到的缓冲区进行排序并验证更复杂的数据.

This format should mean that I can have multiple WSARecv’s outstanding and then verify the data within the returned buffers completely independently from any other buffer, meaning no synchronization or reconstructing the order should be required. i.e. I can start at the first byte and verify that they increment one after another up to the max and then reset to 0. Once I have this test working with no issues, I can move onto something more sophisticated, ordering the received buffers and verifying more complex data.

您可以在命令行上指定可以同时进行多少个未完成的 WSASend 和 WSARecv 调用.当有 2 个或更多未完成的 WSARecv 调用时,这个问题似乎更频繁地发生.使用 1,它可以运行很长时间,然后偶尔会检测到问题.

You can specify on the command line how many simultaneous outstanding WSASend and WSARecv calls there can be. This problem seems to happen far more often when there are 2 or more outstanding WSARecv calls. With 1, it can run for quite some time before it occasionally detects a problem.

我一直在 Windows 7 上进行测试并使用 Visual Studio 2010 C++.

I’ve been testing on Windows 7 and using Visual Studio 2010 C++.

客户端和服务器中同时调用的数量似乎有影响.对两者使用 2 似乎比某些组合更能产生损坏的数据.

The number of simultaneous calls in both client and server seem to have an affect. Using 2 for both seems to produce corrupt data more than some combinations.

套接字和 IOCP 似乎需要大量样板代码才能启动和运行非常基本的客户端和服务器应用程序.接收缓冲区的实际代码只有几行,涉及调用 WSARecv 并处理来自 GetQueuedCompletionStatus 的已完成调用.

Sockets and IOCP seem to require quite a lot of boilerplate code just to get a very basic client and server app up and running. The actual code that does the receiving of buffers is only a few lines and involves calling WSARecv and handling the completed calls from GetQueuedCompletionStatus.

此代码调用 WSARecv

This code calls WSARecv

void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
    DWORD numberOfBytesTransferred = 0;
    DWORD flags = 0;
    if (overlapped == nullptr)
    {
        overlapped = new TestOverlapped(receiveBufferSize);
        overlapped->connection = this;
    }
    else
    {
        overlapped->reset();
    }
    overlapped->operation = soRecv;
    auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}

当 WSARecv 调用完成时,它们由工作线程处理 - 我已经删除了与从该代码段接收数据无关的行

When the WSARecv calls complete, they are handled by worker threads - I've removed lines not related to receiving data from this snippet

void IOCPWorker::execute()
{
    bool quit = false;
    DWORD numberOfBytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    PTestOverlapped overlapped = nullptr;
    while (!quit)
    {
        auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
        if (queueResult)
        {
            switch (overlapped->operation)
            {
                case soRecv:
                {
                    IOCPConnection *connection = overlapped->connection;
                    connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data

                    connection->postRecv(overlapped);
                    overlapped = nullptr;
                    break;
                }
                default:;
            }
        }
    }
}

对 connection->onRecv 的调用是我验证数据的地方.这里有什么明显的错误吗?

The call to connection->onRecv is where I validate the data. Does anything look obviously wrong here?

我已经包含了完整的代码供参考,如果您喜欢冒险,应该可以编译.

I've included the complete code for reference and should compile if you're feeling adventurous.

完整参考来源

服务器示例监听端口 3000 并且最多有 2 个未完成的 WSARecv 调用

Server example listening on port 3000 and have at most 2 outstanding WSARecv calls

> IOCPTest.exe server 3000 2

客户端示例在端口 3000 上连接到 127.0.0.1,最多有 2 个未完成的 WSASend 调用

Client example connecting to 127.0.0.1 on port 3000 with at most 2 outstanding WSASend calls

> IOCPTest.exe client 127.0.0.1 3000 2

该程序由少量类组成

IOCPConnectionManager

这个类处理连接监听并启动工作线程.

This class handles listening for connections and also starts up the worker threads.

IOCPConnection

只需跟踪 SOCKET 和一些处理异步调用的方法.IOCPConnection::onRecv 在 WSARecv 返回并验证缓冲区内的数据时被调用.它只是打印一条消息,如果发现数据乱序则返回.

Just keeps track of the SOCKET and a few methods for handling asynchronous calls. IOCPConnection::onRecv is called when a WSARecv returns and the verifies the data within the buffer. It just prints a message and returns if the data is found to be out of sequence.

IOCPWorker

工作线程.IOCPWorker::execute() 是调用 GetQueuedCompletionStatus 的地方.

Worker thread. IOCPWorker::execute() is where GetQueuedCompletionStatus is called.

TestOverlapped

所需的 OVERLAPPED 结构.

The required OVERLAPPED structure.

您还需要为链接器包含 Ws2_32.lib 和 Mswsock.lib.

You'll need to include Ws2_32.lib and Mswsock.lib for the linker as well.

主 cpp 文件

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  Some allocated objects are not freed at the end, this is a test only *
*                                                                       *
************************************************************************/

#include "stdafx.h"
#include <iostream>
#include <string>
#include "IOCPTest.h"
#include <Windows.h>

void printUse()
{
    std::cout << "Invalid arguments" << std::endl;
    std::cout << "This test app has very limited error handling or memory management" << std::endl;
    std::cout << "Run as client or server (run the server first) e.g." << std::endl << std::endl;
    std::cout << "To run as server listening on port 3000 with 2 pending receives:" << std::endl;
    std::cout << "> IOCPTester.exe server 3000 2" << std::endl << std::endl;
    std::cout << "To run as client connected to 127.0.0.1 on port 3000 with 2 pending sends:" << std::endl;
    std::cout << "> IOCPTester.exe client 127.0.0.1 3000 2" << std::endl << std::endl;
    std::cout << "Hit enter to exit" << std::endl;
    std::cin.ignore();
}

int main(int argc, char *argv[])
{
    if (argc < 4)
    {
        printUse();
        return 0;
    }
    std::string mode(argv[1]);
    if ((mode.compare("client") != 0) && (mode.compare("server") != 0))
    {
        printUse();
        return 0;
    }

    IOCPTest::IOCPConnectionManager *manager = new IOCPTest::IOCPConnectionManager();

    bool server = mode.compare("server") == 0;
    if (server)
    {
        std::string listenPort(argv[2]);
        std::string postedReceiveCount(argv[3]);

        manager->listenPort = atoi(listenPort.c_str());
        manager->postedReceiveCount = atoi(postedReceiveCount.c_str());
        manager->postedSendCount = 1; // Not really used in this mode
        manager->startListening();
    }
    else
    {
        if (argc < 5)
        {
            printUse();
            return 0;
        }

        std::string host(argv[2]);
        std::string port(argv[3]);
        std::string postedSendCount(argv[4]);

        manager->postedReceiveCount = 1; // Not really used in this mode
        manager->postedSendCount = atoi(postedSendCount.c_str());

        IOCPTest::IOCPConnection *connection = manager->createConnection();

        connection->host = host;
        connection->port = atoi(port.c_str());
        connection->connect();
    }
    std::cout << "Hit enter to exit" << std::endl;
    std::cin.ignore();
}

IOCPTest.h

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  std::cout might not be the best approach in a multithreaded          *
*  environment but this is just a simple test app.                      *
*  Some allocated objects are not cleaned up at the end either, but     *
*  again this is just a test.                                           *
*                                                                       *
************************************************************************/

#ifndef IOCPTestH
#define IOCPTestH
#endif

#include <WinSock2.h> // Include before as otherwise Windows.h includes and causes issues
#include <Windows.h>
#include <string>

namespace IOCPTest
{

class IOCPConnection;

enum IOCPSocketOperation
{
    soUnknown,
    soAccept,
    soConnect,
    soDisconnect,
    soSend,
    soRecv,
    soQuit
};

struct TestOverlapped
{
    OVERLAPPED overlapped;
    WSABUF buffer;
    IOCPSocketOperation operation;
    IOCPConnection *connection;
    bool resend; // Set this to keep sending the same data over and over

    TestOverlapped(int bufferSize);
    ~TestOverlapped();
    void reset();
};

typedef TestOverlapped *PTestOverlapped;

class IOCPConnectionManager
{
public:
    static const int NUMACCEPTS = 5;

    WSADATA wsaData;
    HANDLE iocp;
    SOCKET listenSocket;
    USHORT listenPort;
    int postedReceiveCount;
    int postedSendCount;

    void startListening();
    void postAcceptEx();

    IOCPConnection *createConnection();

    IOCPConnectionManager();
};

class IOCPConnection
{
public:
    SOCKET socket;
    IOCPConnectionManager *manager;
    std::string host;
    USHORT port;

    void onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
    void postRecv(PTestOverlapped overlapped = nullptr);
    void onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
    void onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
    void send(PTestOverlapped overlapped);
    void onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);

    void connect();
};

class IOCPWorker
{
public:
    HANDLE threadHandle;
    DWORD threadId;
    IOCPConnectionManager *manager;

    IOCPWorker(bool suspended);

    void start();
    void execute();
};

}

IOCPTest.cpp

#include "stdafx.h"
#include "IOCPTest.h"
#include <iostream>
#include <Mswsock.h>
#include <WS2tcpip.h>
#include <sstream>

namespace IOCPTest
{

LPFN_ACCEPTEX fnAcceptEx = nullptr;
LPFN_CONNECTEX fnConnectEx = nullptr;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;
const byte maxByteExpected = 250;
const int numberOfGroups = 4096;
const int receiveBufferSize = 0x100000;

BOOL AcceptEx
(
    SOCKET sListenSocket,
    SOCKET sAcceptSocket,
    PVOID lpOutputBuffer,
    DWORD dwReceiveDataLength,
    DWORD dwLocalAddressLength,
    DWORD dwRemoteAddressLength,
    LPDWORD lpdwBytesReceived,
    LPOVERLAPPED lpOverlapped
)
{
    if (fnAcceptEx == nullptr)
    {
        DWORD dwBytes;
        int result = WSAIoctl(sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof (GuidAcceptEx), &fnAcceptEx, sizeof(fnAcceptEx), &dwBytes, NULL, NULL);
        if (result != 0)
        {
            std::cerr << "Error calling WSAIoctl for AcceptEx" << std::endl;
            return false;
        }
    }
    return fnAcceptEx(sListenSocket, sAcceptSocket, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped);
}

BOOL ConnectEx(
    SOCKET s,
    const struct sockaddr FAR *name,
    int namelen,
    PVOID lpSendBuffer,
    DWORD dwSendDataLength,
    LPDWORD lpdwBytesSent,
    LPOVERLAPPED lpOverlapped
)
{
    if (fnConnectEx == nullptr)
    {
        DWORD dwBytes;
        int result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof (GuidConnectEx), &fnConnectEx, sizeof(fnConnectEx), &dwBytes, NULL, NULL);
        if (result != 0)
        {
            std::cerr << "Error calling WSAIoctl for ConnectEx" << std::endl;
            return false;
        }
    }
    return fnConnectEx(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped);
}

// TestOverlapped

TestOverlapped::TestOverlapped(int bufferSize):
    overlapped(),
    operation(soUnknown),
    connection(nullptr),
    buffer(),
    resend(false)
{
    if (bufferSize > 0)
    {
        buffer.len = bufferSize;
        buffer.buf = (CHAR*) malloc(bufferSize);
    }
}

TestOverlapped::~TestOverlapped()
{
    if (buffer.buf != nullptr)
    {
        free(buffer.buf);
    }
}

void TestOverlapped::reset()
{
    overlapped = OVERLAPPED();
}

// IOCPConnectionManager

IOCPConnectionManager::IOCPConnectionManager():
    wsaData(),
    listenSocket(0),
    listenPort(0),
    postedReceiveCount(1)
{
    WSAStartup(WINSOCK_VERSION, &wsaData);
    iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

    SYSTEM_INFO systemInfo = SYSTEM_INFO();
    GetSystemInfo(&systemInfo);

    for (decltype(systemInfo.dwNumberOfProcessors) i = 0; i < systemInfo.dwNumberOfProcessors; i++)
    {
        IOCPWorker* worker = new IOCPWorker(true);
        worker->manager = this;
        worker->start();
    }
}

void IOCPConnectionManager::startListening()
{
    listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    CreateIoCompletionPort((HANDLE)listenSocket, iocp, ULONG_PTR(this), 0);

    sockaddr_in localAddress = sockaddr_in();
    localAddress.sin_family = AF_INET;
    localAddress.sin_addr.s_addr = INADDR_ANY; // Listen on all addresses
    localAddress.sin_port = htons(listenPort);

    if (bind(listenSocket, (SOCKADDR*) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
    {
        std::cerr << "Error in binding listening socket" << std::endl;
    }
    if (listen(listenSocket, SOMAXCONN) == 0)
    {
        std::cout << "Listening on port " << listenPort << std::endl;
    }
    for (int i = 0; i < NUMACCEPTS; i++)
    {
        postAcceptEx();
    }
}

void IOCPConnectionManager::postAcceptEx()
{
    SOCKET acceptSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    IOCPConnection *connection = new IOCPConnection();
    connection->manager = this;
    connection->socket = acceptSocket;

    CreateIoCompletionPort((HANDLE) acceptSocket, iocp, ULONG_PTR(connection), 0); // The thread count is ignored in this call when just associating the socket

    PTestOverlapped overlapped = new TestOverlapped(2 * (sizeof(sockaddr_in) + 16)); // As specified in documentation
    overlapped->operation = soAccept;
    overlapped->connection = connection;
    DWORD byesReceived = 0;
    int result = IOCPTest::AcceptEx
    (
        listenSocket,
        acceptSocket,
        overlapped->buffer.buf,
        0, // Size of initial receiving buffer, excluding the space at the end for the two addressed
        sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
        sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
        &byesReceived,
        (LPOVERLAPPED) overlapped
    );
    if (!result)
    {
        int errorCode = WSAGetLastError();
        if (errorCode != WSA_IO_PENDING)
        {
            std::cerr << "Error calling AcceptEx. Returned errorCode = " << errorCode << std::endl;
        }
    }
}

IOCPConnection *IOCPConnectionManager::createConnection()
{
    IOCPConnection *connection = new IOCPConnection();
    connection->manager = this;

    return connection;
}

// IOCPConnection

void IOCPConnection::onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
    manager->postAcceptEx(); // Replace this accept
    auto returnCode = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char *)&manager->listenSocket, sizeof(manager->listenSocket));
    if (returnCode == SOCKET_ERROR)
    {
        std::cerr << "SetSockOpt in OnAcceptEx returned SOCKET_ERROR" << std::endl;
    }
    std::cout << "Connection Accepted" << std::endl;
    for (int i = 0; i < manager->postedReceiveCount; ++i)
    {
        postRecv();
    }
}

void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
    DWORD numberOfBytesTransferred = 0;
    DWORD flags = 0;
    if (overlapped == nullptr)
    {
        overlapped = new TestOverlapped(receiveBufferSize);
        overlapped->connection = this;
    }
    else
    {
        overlapped->reset();
    }
    overlapped->operation = soRecv;
    auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}

void IOCPConnection::onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
    if (numberOfBytesTransferred > 0)
    {
        byte *data = (byte *)overlapped->buffer.buf;
        if (data[0] > maxByteExpected)
        {
            std::cerr << "Byte greater than max expected found. Max Expected: " << maxByteExpected << "; Found: " << data[0] << std::endl;
            return;
        }
        byte next = (data[0] == maxByteExpected)?0:data[0] + 1;
        for (decltype(numberOfBytesTransferred) i = 1; i < numberOfBytesTransferred; ++i)
        {
            if (data[i] != next)
            {
                // Not really the best solution for writing data out from multiple threads. Test app only.
                std::cerr << "Invalid data. Expected: " << (int)next << "; Got: " << (int)data[i] << std::endl;
                return;
            }
            else if (next == maxByteExpected)
            {
                next = 0;
            }
            else
            {
                ++next;
            }
        }
        //std::cout << "Valid buffer processed" << std::endl;
    }
}

void IOCPConnection::onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
    for (int i = 0; i < manager->postedSendCount; ++i)
    {
        // Construct a sequence of incremented byte values 0..maxByteExpected repeated numberOfGroups
        PTestOverlapped sendOverlapped = new TestOverlapped((maxByteExpected + 1) * numberOfGroups);
        sendOverlapped->connection = this;

        for (int j = 0; j < numberOfGroups; ++j)
        {
            for (byte k = 0; k <= maxByteExpected; ++k)
            {
                ((byte *)sendOverlapped->buffer.buf)[(j * (maxByteExpected + 1)) + (int)k] = k;
            }
        }
        sendOverlapped->resend = true; // Repeat sending this data
        send(sendOverlapped);
    }
}

void IOCPConnection::send(PTestOverlapped overlapped)
{
    overlapped->reset();
    overlapped->operation = soSend;

    DWORD bytesSent = 0;
    DWORD flags = 0;

    if (WSASend(socket, &overlapped->buffer, 1, &bytesSent, flags, (LPWSAOVERLAPPED) overlapped, nullptr) == SOCKET_ERROR)
    {
        int errorCode = WSAGetLastError();
        if (errorCode != WSA_IO_PENDING)
        {
            std::cerr << "Error calling WSASend. Returned errorCode = " << errorCode << std::endl;
        }
    }
}

void IOCPConnection::onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
}

void IOCPConnection::connect()
{
    socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (socket == INVALID_SOCKET)
    {
        std::cerr << "Error calling socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) in IOCPConnection::connect()" << std::endl;
        return;
    }
    CreateIoCompletionPort((HANDLE)socket, manager->iocp, ULONG_PTR(this), 0); // The thread count is ignored in this call when just associating the socket

    sockaddr_in localAddress = sockaddr_in();
    localAddress.sin_family = AF_INET;
    localAddress.sin_addr.s_addr = INADDR_ANY;
    localAddress.sin_port = 0;

    if (bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
    {
        std::cerr << "Error calling bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress) in IOCPConnection::connect()" << std::endl;
        return;
    }

    addrinfo hints = addrinfo();
    addrinfo *remoteAddress = nullptr;

    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    std::stringstream ss;
    ss << port;
    //std::cout << ss.str() << std::endl;
    if (getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) != 0)
    {
        std::cerr << "Error calling getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) in IOCPConnection::connect()" << std::endl;
        return;
    }

    TestOverlapped *overlapped = new TestOverlapped(0);
    overlapped->connection = this;
    overlapped->operation = soConnect;

    BOOL result = IOCPTest::ConnectEx
    (
        socket,
        remoteAddress->ai_addr,
        remoteAddress->ai_addrlen,
        nullptr,
        0,
        nullptr,
        LPOVERLAPPED(overlapped)
    );
    if (result == FALSE)
    {
        int errorCode = WSAGetLastError();
        if (errorCode != WSA_IO_PENDING)
        {
            //std::cerr << "Error calling ConnectEx. You'll need to add some more code if you want to know why :)" << std::endl;
            std::cerr << "Error calling ConnectEx. Returned errorCode = " << errorCode << std::endl;
        }
    }

    freeaddrinfo(remoteAddress);
}

// IOCPWorker

DWORD WINAPI IOCPWorkerThreadProc(LPVOID lpParam)
{
    ((IOCPWorker*)lpParam)->execute();
    return 0;
}

IOCPWorker::IOCPWorker(bool suspended)
{
    threadHandle = CreateThread(NULL, 0, IOCPWorkerThreadProc, this, (suspended)?CREATE_SUSPENDED:0, &threadId);
}

void IOCPWorker::start()
{
    ResumeThread(threadHandle);
}

void IOCPWorker::execute()
{
    //std::cout << "TMVIOCPWorker::execute()" << std::endl;
    bool quit = false;
    DWORD numberOfBytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    PTestOverlapped overlapped = nullptr;
    while (!quit)
    {
        auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
        if (queueResult)
        {
            switch (overlapped->operation)
            {
                case soAccept:
                {
                    IOCPConnection *connection = overlapped->connection;
                    connection->onAcceptEx(overlapped, numberOfBytesTransferred);

                    delete overlapped;
                    overlapped = nullptr;
                    break;
                }
                case soConnect:
                {
                    std::cout << "ConnectEx returned" << std::endl;
                    IOCPConnection *connection = overlapped->connection;
                    connection->onConnect(overlapped, numberOfBytesTransferred); // This method validates the received data
                    delete overlapped;
                    overlapped = nullptr;
                    break;
                }
                case soRecv:
                {
                    //std::cout << "Received Data: " << numberOfBytesTransferred << std::endl;
                    IOCPConnection *connection = overlapped->connection;
                    connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data

                    overlapped->reset();
                    connection->postRecv(overlapped);
                    overlapped = nullptr;
                    break;
                }
                case soSend:
                {
                    IOCPConnection *connection = overlapped->connection;
                    connection->onSent(overlapped, numberOfBytesTransferred);

                    // Send the same data over and over
                    std::cout << "Resending buffer" << std::endl;
                    if (overlapped->resend)
                    {
                        connection->send(overlapped);
                    }
                    else
                    {
                        delete overlapped;
                    }
                    overlapped = nullptr;
                    break;
                }
                default:;
            }
        }
    }
}

}

接收到的大多数缓冲区都是正确的,但是当使用套接字的 2 个接收缓冲区和 2 个发送缓冲区运行时,我仍然有很多这样的滚动:

Most buffers received are correct however I still have a lot of scroll like this when running with 2 receive and 2 send buffers for the socket:

Invalid data. Expected: 169; Got: 123
Invalid data. Expected: 114; Got: 89
Invalid data. Expected: 89; Got: 156
Invalid data. Expected: 206; Got: 227
Invalid data. Expected: 125; Got: 54
Invalid data. Expected: 25; Got: 0
Invalid data. Expected: 58; Got: 146
Invalid data. Expected: 33; Got: 167
Invalid data. Expected: 212; Got: 233
Invalid data. Expected: 111; Got: 86
Invalid data. Expected: 86; Got: 153
Invalid data. Expected: 190; Got: 165
Invalid data. Expected: 175; Got: 150
Invalid data. Expected: 150; Got: 217
Invalid data. Expected: 91; Got: 112
Invalid data. Expected: 95; Got: 162
Invalid data. Expected: 207; Got: 182
Invalid data. Expected: 222; Got: 243
Invalid data. Expected: 126; Got: 101
Invalid data. Expected: 157; Got: 132
Invalid data. Expected: 160; Got: 89
Invalid data. Expected: 205; Got: 180
Invalid data. Expected: 113; Got: 134
Invalid data. Expected: 45; Got: 20
Invalid data. Expected: 113; Got: 201
Invalid data. Expected: 64; Got: 198
Invalid data. Expected: 115; Got: 182
Invalid data. Expected: 140; Got: 115

我希望这只是我做错了一些简单的事情.我在发送之前对数据缓冲区运行了与接收时相同的验证,以确保我没有在那里做一些愚蠢的事情,但它通过了检查.我用另一种语言编写了一个服务器, 使用 IOCP,并且似乎可以正确接收数据.我还用另一种语言编写了一个客户端,在这种情况下 IOCP 服务器似乎也检测到损坏.但这就是说,客户端和服务器都可能存在问题.我很感激任何人愿意花在这上面的时间.

I hope it is just something simple I am doing wrong. I've run the same verification over the data buffer before sending as I do when receiving as well to make sure I hadn't done something silly there but it passes that check. I wrote a server in another language not using IOCP, and that seems to receive the data correctly. I also wrote a client in another language, and the IOCP server seems to detect corruptions in that case too. But that said, there might be issues with both the client and server. I appreciate any time that anyone is willing to spend on this.

推荐答案

好的,我可能已经发现了你的问题.如果你看一下你收到的数据,所有的字节都是有序的,但顺序突然跳了起来,好像被另一个调用打断了.现在,从 WSASendWSARecv :

Okay, I may have found your problem. If you take a look at the data you receive, all the bytes are in order, but suddenly jump in the sequence, as if it was interrupted by another call. Now, from the MSDN documentation on WSASend and WSARecv :

如果您使用 I/O 完成端口,请注意对 WSASend 的调用顺序也是填充缓冲区的顺序.不应从不同线程同时在同一个套接字上调用 WSASend,因为这会导致不可预测的缓冲区顺序.

如果您使用 I/O 完成端口,请注意对 WSARecv 的调用顺序也是填充缓冲区的顺序.不应从不同线程同时在同一个套接字上调用 WSARecv,因为这会导致不可预测的缓冲区顺序.

If you are using I/O completion ports, be aware that the order of calls made to WSARecv is also the order in which the buffers are populated. WSARecv should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.

就是这样.我现在真的不是你想要的好方法,但你所做的可能不是它应该使用的方式.

That's it. I don't really now the good way for what you want, but what you do is probably not the way it is meant to be used.

您是否通过真实网络尝试过此操作?环回接口是一个特殊的电路,可能会有不同的行为,但它仍然是未定义的行为,所以你不应该依赖这个.

Did you try this over a real network ? The loopback interface is a special circuit and may behave differently, but it's still undefined behavior, so you shouldn't rely on this.

这篇关于TCP/IP IOCP 接收的数据有时会损坏 - Windows 上的 Visual C++的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-21 02:19