这是我的服务器的样子:

-WorkerThread(s):

  • 调用epoll_wait,接受连接,设置fd nonblocking(EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP)
  • 调用recv直到EPOLLIN事件上的EAGAIN为止,并将所有数据推送到全局RecvBuffer(已同步pthread_mutex)
  • EPOLLOUT事件上的
  • :访问全局SendBuffer,如果要为当前就绪的fd发送数据,则执行此操作(在while循环中,直到EAGAIN或所有数据发送完毕;发送完整个数据包后,从SendBuffer弹出它)

    -IOThread(s)
  • 从全局RecvBuffer中获取数据,对其进行处理
  • 通过首先尝试立即调用send来发送响应。如果不是所有数据都已发送,则将其余数据推送到全局SendBuffer上,以从WorkerThread发送该数据。

  • 问题是,服务器不会发送所有排队的数据(它们保留在SendBuffer中),“未发送”数据的数量会随着客户端数量的增加而增加。
    为了测试仅使用1个workerthread和1个iothread的im,但是如果我使用更多,似乎没有什么区别。
    访问全局缓冲区受pthread_mutex保护。
    另外,我的响应数据大小为130k字节(至少需要3个发送调用才能发送此数据量)。另一方面是使用阻塞套接字的Windows客户端。

    非常感谢你!
    MJ

    编辑:

    是的,默认情况下,即使我什么也没有发送,我仍在等待EPOLLOUT事件。为了简化实现和手册页指南,我这样做是这样的。另外,我对此并不了解:

    即使我当时不想“发送” EPOLLOUT事件,也没问题,因为当我要发送数据时,我会调用send直到将来EAGAIN和EPOLLOUT被触发(这是大多数时间)

    现在,我修改了代码以在IN / OUT事件之间切换:

    接受时:
    event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
    epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_ADD, infd, &event);
    

    发送完所有数据后:
    event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
    epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);
    

    当我通过调用IOThread中的send到达EAGAIN时:
    event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
    epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);
    

    ..我也有同样的行为。此外,我尝试删除EPOLLET标志,但没有任何更改

    一个侧面的问题:具有EPOLL_CTL_MOD标志的epoll_ctl是否替换事件成员,或者仅将其与给定参数进行或运算?

    EDIT3:更新了IOThread函数以连续发送,直到发送完所有数据或直到EAGAIN。
    即使发送了所有数据,我也尝试发送,但是大多数时候我在非套接字上获得errno 88 Socket操作

    EDIT4:我修复了“发送代码”中的一些错误,因此我没有收到任何现在未发送的排队数据。.但是,我没有收到应有的尽可能多的数据:))最大数量的“丢失”(未收到)数据我发送完成后,当客户端立即调用recv时获取,并且随着客户端数量的增加而增加。当我在客户端的发送和recv调用(阻塞调用)之间放置2秒的延迟时,我不会丢失服务器上的少量数据,具体取决于正在运行的客户端数量(客户端测试代码包括简单的for循环,其中包含1个send和1个recv调用)
    再次尝试使用和不使用ET模式。下面是更新的WorkerThread函数,该函数负责接收数据。
    @ Admins / Mods也许我现在应该打开新主题,因为问题有点不同?
    void CNetServer::WorkerThread(void* param)
    {
        CNetServer* pNetServer =(CNetServer*)param;
        struct epoll_event event;
        struct epoll_event *events;
        int s = 0;
    
    //  events = (epoll_event*)calloc (MAXEVENTS, sizeof event);
    
        while (1)
        {
            int n, i;
    
    //      printf ("BLOCKING NOW! epoll_wait thread %d\n",pthread_self());
            n = pNetServer->m_epollCtrl.Wait(-1);
    //      printf ("epoll_wait thread %d\n",pthread_self());
            pthread_mutex_lock(&g_mtx_WorkerThd);
            for (i = 0; i < n; i++)
            {
                if ((pNetServer->m_epollCtrl.Event(i)->events & EPOLLERR))
                {
                    // An error has occured on this fd, or the socket is not ready for reading (why were we notified then?)
    
                //  g_SendBufferArray.RemoveAll( 0 );
    
                    char szFileName[30] = {0};
                    sprintf( (char*)szFileName,"fd_%d.txt",pNetServer->m_epollCtrl.Event(i)->data.fd );
                    remove(szFileName);
    
                /*  printf( "\n\n\n");
                    printf( "\tDATA LEFT COUNT:%d\n",g_SendBufferArray.size());
                    for (int k=0;k<g_SendBufferArray.size();k++)
                        printf( "\tSD: %d DATA LEFT:%d\n",g_SendBufferArray[i]->sd,g_SendBufferArray[i]->nBytesSent );
    */
    
                //  fprintf (stderr, "epoll error\n");
                //  fflush(stdout);
                    close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                    continue;
                }
                else if (pNetServer->m_ListenSocket == pNetServer->m_epollCtrl.Event(i)->data.fd)
                {
                    // We have a notification on the listening socket, which   means one or more incoming connections.
                    while (1)
                    {
                        struct sockaddr in_addr;
                        socklen_t in_len;
                        int infd;
                        char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    
                        in_len = sizeof in_addr;
                        infd = accept (pNetServer->m_ListenSocket, &in_addr, &in_len);
                        if (infd == -1)
                        {
                            if ((errno == EAGAIN) ||
                                (errno == EWOULDBLOCK))
                            {
                                // We have processed all incoming connections.
                                break;
                            }
                            else
                            {
                                perror ("accept");
                                break;
                            }
                        }
    
                        s = getnameinfo (&in_addr, in_len,
                            hbuf, sizeof hbuf,
                            sbuf, sizeof sbuf,
                            NI_NUMERICHOST | NI_NUMERICSERV);
                        if (s == 0)
                        {
                            printf("Accepted connection on descriptor %d "
                                "(host=%s, port=%s) thread %d\n", infd, hbuf, sbuf,pthread_self());
                        }
    
                        // Make the incoming socket non-blocking and add it to the list of fds to monitor.
                        CEpollCtrl::SetNonBlock(infd,true);
                        if ( !pNetServer->m_epollCtrl.Add( infd, EPOLLIN, NULL ))
                        {
                            perror ("epoll_ctl");
                            abort ();
                        }
    
                    }
                    continue;
                }
                if( (pNetServer->m_epollCtrl.Event(i)->events & EPOLLOUT) )
                {
    
                    pNetServer->DoSend( pNetServer->m_epollCtrl.Event(i)->data.fd );
                }
                if( pNetServer->m_epollCtrl.Event(i)->events & EPOLLIN )
                {
                    printf("EPOLLIN TRIGGERED FOR SD: %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                    // We have data on the fd waiting to be read.
                    int done = 0;
                    ssize_t count = 0;
                    char buf[512];
                    while (1)
                    {
                        count = read (pNetServer->m_epollCtrl.Event(i)->data.fd, buf, sizeof buf);
                        printf("recv sd %d size %d thread %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd,count,pthread_self());
                        if (count == -1)
                        {
                            // If errno == EAGAIN, that means we have read all data. So go back to the main loop.
                            if ( errno != EAGAIN )
                            {
                                perror ("read");
                                done = 1;
                            }
                            break;
                        }
                        else if (count == 0)
                        {
                            //connection is closed by peer.. do a cleanup and close
                            done = 1;
                            break;
                        }
                        else if (count > 0)
                        {
                            static int nDataCounter = 0;
                            nDataCounter+=count;
                            printf("RECVDDDDD %d\n",nDataCounter);
                            CNetServer::s_pRecvContainer->OnData( pNetServer->m_epollCtrl.Event(i)->data.fd, buf, count );
                        }
                    }
    
                    if (done)
                    {
                        printf ("Closed connection on descriptor %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                        // Closing the descriptor will make epoll remove it from the set of descriptors which are monitored.
                        close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                    }
                }
    
            }
    //
    
            pNetServer->IOThread( (void*)pNetServer );
    
            pthread_mutex_unlock(&g_mtx_WorkerThd);
        }
    
    }
    
    void CNetServer::IOThread(void* param)
    {
    
        BYTEARRAY* pbPacket = new BYTEARRAY;
        int fd;
        struct epoll_event event;
        CNetServer* pNetServer =(CNetServer*)param;
    
        printf("IOThread startin' !\n");
    
        for (;;)
        {
            bool bGotIt = CNetServer::s_pRecvContainer->GetPacket( pbPacket, &fd );
    
            if( bGotIt )
            {
    
                //process packet here
                printf("Got 'em packet yo !\n");
                BYTE* re = new BYTE[128000];
                memset((void*)re,0xCC,128000);
                buffer_t* responsebuff = new buffer_t( fd, re, 128000 ) ;
    
                pthread_mutex_lock(&g_mtx_WorkerThd);
    
                while( 1 )
                {
                        int s;
                        int nSent = send( responsebuff->sd, ( responsebuff->pbBuffer + responsebuff->nBytesSent ),responsebuff->nSize - responsebuff->nBytesSent,0 );
                        printf ("IOT: Trying to send nSent: %d buffsize: %d \n",nSent,responsebuff->nSize - responsebuff->nBytesSent);
    
                        if (nSent == -1)
                        {
    
                            if (errno == EAGAIN || errno == EWOULDBLOCK )
                            {
                                    g_vSendBufferArray.push_back( *responsebuff );
                                    printf ("IOT: now waiting for EPOLLOUT\n");
                                    event.data.fd = fd;
                                    event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
                                    s = epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, fd, &event);
                                    break;
                                    if (s == -1)
                                    {
                                        perror ("epoll_ctl");
                                        abort ();
                                    }
    
                            }
                            else
                            {
                                printf( "%d\n",errno );
                                perror ("send");
                                break;
                            }
                            printf ("IOT: WOOOOT\n");
                            break;
                        }
                        else if (nSent == responsebuff->nSize - responsebuff->nBytesSent)
                        {
                            printf ("IOT:all is sent! wOOhOO\n");
                            responsebuff->sd = 0;
                            responsebuff->nBytesSent += nSent;
                            delete responsebuff;
                            break;
                        }
                        else if (nSent < responsebuff->nSize - responsebuff->nBytesSent)
                        {
                            printf ("IOT: partial send!\n");
                            responsebuff->nBytesSent += nSent;
    
                        }
    
                }
                delete [] re;
    
                pthread_mutex_unlock(&g_mtx_WorkerThd);
    
            }
        }
    
    }
    

    最佳答案

  • 停止使用EPOLLET。正确几乎是不可能的。
  • 如果您没有要发送的内容,请不要询问EPOLLOUT事件。
  • 当您有要在连接上发送的数据时,请遵循以下逻辑:
    A)如果该连接的发送队列中已经有数据,则只需添加新数据。您完成了。B)尝试立即发送数据。如果全部发送完毕,请完成。C)将剩余的数据保存在此连接的发送队列中。现在,为该连接请求EPOLLOUT。
  • 07-28 02:56