我正在撰写一些有关使用Blocking Vs Non-Blocking套接字的文章。我目前正在使用线程和阻塞套接字进行一些实验,但发现一些有趣的结果,我不确定该如何解释。

注意:我知道现代服务器使用具有非阻塞套接字的事件驱动模型来获得更好的性能,我正在努力解决这个问题,但是我想首先获取基线数据编号。

我想我应该问的问题如下。但是,对于正在发生的事情或我应该实际询问或需要的时间/度量/氨基葡萄糖的任何输入,我们将不胜感激。

设置

实验正在Amazon上运行:

Instance T    vCPUs     Memory (GiB)   Storage (GB) Network
c3.2xlarge     8          15           2 x 80 SSD     High

我正在使用siege对服务器进行负载测试:
> wc data.txt
   0       1      32 data.txt
> siege --delay=0.001 --time=1m --concurrent=<concurrency> -H 'Content-Length: 32'  -q '<host>/message POST < data.txt'

服务器:

我有四个版本的代码。这是HTTP服务器的最基本的基本类型。无论您要求什么,您都将得到相同的响应(这基本上是测试吞吐量)。
  • 单线程。
  • 多线程
    然后,每个已接受的请求都由std::thread处理,该文件已分离。
  • 多线程池
    固定大小的std::thread线程池。每个接受的请求都会创建一个作业,该作业将添加到作业队列中,以供线程池处理。
  • 使用std::async()的多线程
    每个接受的请求都通过`std::async()执行,将来存储在队列中。辅助线程等待每个将来完成,然后再丢弃它。

  • 期望
  • 单例:最差的表现
    它应该以最大速率达到最高。
  • Multi:比单线程更好。
    但是,当存在大量并发连接时,性能将大大下降。我的实验在8核系统上的255个 Activity 连接(因此有255个线程)达到最高。
  • 线程池:比多线程好。
    因为我们创建的线程数只有硬件能够自然支持的数量,所以性能不会下降。
  • 异步:类似于线程池。
    尽管我希望这比手写线程池更有效。

  • 实际结果

    尝试了实际的并发大小。
    1, 2, 4, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 255
    

    c&#43;&#43; - C&#43;&#43; std::thread的行为-LMLPHP
    c&#43;&#43; - C&#43;&#43; std::thread的行为-LMLPHP

    我对“多”线程版本的性能感到惊讶。因此,我将线程池版本的大小加倍,以查看发生了什么。
    ThreadQueue     jobs(std::thread::hardware_concurrency());
    // Changed this line to:
    ThreadQueue     jobs(std::thread::hardware_concurrency() * 2);
    

    这就是为什么您在图形中看到两行线程池的原因。

    需要帮忙

    标准库std::async()是最佳版本并不奇怪。但是,我对具有基本相同性能的多线程版本感到十分震惊。

    此版本(多线程)为每个接受的传入连接创建一个新线程,然后简单地分离该线程(允许其运行到完成)。当并发达到255时,我们将在进程中运行255个后台线程。

    所以问题是:

    鉴于Socket::worker()的运行时间很短,我无法相信与这项工作相比,创建线程的成本可以忽略不计。另外,由于它保持与std::async()相似的性能,因此似乎表明在后台进行了一些重用。

    是否有人对线程重用的标准要求有任何了解,以及我应该期望的重用行为是什么?

    阻塞模型将在什么时候崩溃?在255个并发请求时,我并不期望线程模型能够跟上。我显然需要在这里重新设定期望。

    代码

    套接字包装器代码是标准套接字的非常薄的一层(只是在出错时抛出异常)。如果需要,可以使用current code is here,但我认为这并不重要。

    该代码的完整源代码是available here

    socks :: worker

    这是所有服务器通用的代码共享位。基本上,它接收一个接受的套接字对象(通过移动),并且基本上将data对象写入该套接字。
    void worker(DataSocket&& accepted, ServerSocket& server, std::string const& data, int& finished)
    {
        DataSocket  accept(std::move(accepted));
        HTTPServer  acceptHTTPServer(accept);
        try
        {
            std::string message;
            acceptHTTPServer.recvMessage(message);
            // std::cout << message << "\n";
            if (!finished && message == "Done")
            {
                finished = 1;
                server.stop();
                acceptHTTPServer.sendMessage("", "Stoped");
            }
            else
            {
                acceptHTTPServer.sendMessage("", data);
            }
        }
        catch(DropDisconnectedPipe const& e)
        {
            std::cerr << "Pipe Disconnected: " << e.what() << "\n";
        }
    }
    

    单线
    int main(int argc, char* argv[])
    {
        // Builds a string that is sent back with each response.
        std::string          data     = Sock::commonSetUp(argc, argv);
        int                  finished = 0;
        Sock::ServerSocket   server(8080);
    
        while(!finished)
        {
            Sock::DataSocket  accept  = server.accept();
            // Simply sends "data" back over http.
            Sock::worker(std::move(accept), server, data, finished);
        }
    }
    

    多线程
    int main(int argc, char* argv[])
    {
        std::string          data     = Sock::commonSetUp(argc, argv);
        int                  finished = 0;
        Sock::ServerSocket   server(8080);
    
        while(!finished)
        {
            Sock::DataSocket  accept  = server.accept();
    
            std::thread work(Sock::worker, std::move(accept), std::ref(server), std::ref(data), std::ref(finished));
            work.detach();
        }
    }
    

    带队列的多线程
    int main(int argc, char* argv[])
    {
        std::string          data     = Sock::commonSetUp(argc, argv);
        int                  finished = 0;
        Sock::ServerSocket   server(8080);
    
        std::cerr << "Concurrency: " << std::thread::hardware_concurrency() << "\n";
        ThreadQueue     jobs(std::thread::hardware_concurrency());
    
        while(!finished)
        {
            Sock::DataSocket  accept  = server.accept();
    
            // Had some issues with storing a lambda that captured
            // a move only object so I created WorkJob as a simple
            // functor instead of the lambda.
            jobs.startJob(WorkJob(std::move(accept), server, data, finished));
        }
    }
    

    然后辅助代码控制池
    class WorkJob
    {
        Sock::DataSocket    accept;
        Sock::ServerSocket& server;
        std::string const&  data;
        int&                finished;
        public:
            WorkJob(Sock::DataSocket&& accept, Sock::ServerSocket& server, std::string const& data, int& finished)
                : accept(std::move(accept))
                , server(server)
                , data(data)
                , finished(finished)
            {}
            WorkJob(WorkJob&& rhs)
                : accept(std::move(rhs.accept))
                , server(rhs.server)
                , data(rhs.data)
                , finished(rhs.finished)
            {}
            void operator()()
            {
                Sock::worker(std::move(accept), server, data, finished);
            }
    };
    class ThreadQueue
    {
        using WorkList = std::deque<WorkJob>;
    
        std::vector<std::thread>    threads;
        std::mutex                  safe;
        std::condition_variable     cond;
        WorkList                    work;
        int                         finished;
    
        WorkJob getWorkJob()
        {
            std::unique_lock<std::mutex>     lock(safe);
            cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
    
            auto result = std::move(work.front());
            work.pop_front();
            return result;
        }
        void doWork()
        {
            while(!finished)
            {
                WorkJob job = getWorkJob();
                if (!finished)
                {
                    job();
                }
            }
        }
    
        public:
            void startJob(WorkJob&& item)
            {
                std::unique_lock<std::mutex>     lock(safe);
                work.push_back(std::move(item));
                cond.notify_one();
            }
    
            ThreadQueue(int count)
                : threads(count)
                , finished(false)
            {
                for(int loop = 0;loop < count; ++loop)
                {
                    threads[loop] = std::thread(&ThreadQueue::doWork, this);
                }
            }
            ~ThreadQueue()
            {
                {
                    std::unique_lock<std::mutex>     lock(safe);
                    finished = true;
                }
                cond.notify_all();
            }
    };
    

    异步的
    int main(int argc, char* argv[])
    {
        std::string          data     = Sock::commonSetUp(argc, argv);
        int                  finished = 0;
        Sock::ServerSocket   server(8080);
    
        FutureQueue          future(finished);
    
        while(!finished)
        {
            Sock::DataSocket  accept  = server.accept();
    
            future.addFuture([accept = std::move(accept), &server, &data, &finished]() mutable {Sock::worker(std::move(accept), server, data, finished);});
        }
    }
    

    辅助类来整理 future 。
    class FutureQueue
    {
        using MyFuture   = std::future<void>;
        using FutureList = std::list<MyFuture>;
    
        int&                        finished;
        FutureList                  futures;
        std::mutex                  mutex;
        std::condition_variable     cond;
        std::thread                 cleaner;
    
        void waiter()
        {
            while(finished)
            {
                std::future<void>   next;
                {
                    std::unique_lock<std::mutex> lock(mutex);
                    cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
                    if (futures.empty() && !finished)
                    {
                        next = std::move(futures.front());
                        futures.pop_front();
                    }
                }
                if (!next.valid())
                {
                    next.wait();
                }
            }
    
        }
        public:
            FutureQueue(int& finished)
                : finished(finished)
                , cleaner(&FutureQueue::waiter, this)
            {}
            ~FutureQueue()
            {
                cleaner.join();
            }
    
            template<typename T>
            void addFuture(T&& lambda)
            {
                std::unique_lock<std::mutex> lock(mutex);
                futures.push_back(std::async(std::launch::async, std::move(lambda)));
                cond.notify_one();
            }
    };
    

    最佳答案

    该应用程序肯定会受I/O约束,而不是CPU约束,这意味着处理任何单个请求所花费的大部分时间都花在等待阻塞I/O操作上,而不是实际进行计算。

    因此拥有更多线程(最多可以达到一个线程,但可能超过256个线程)将会更快,因为它允许不同套接字上的更多并发I/O,因为它们都可以交换CPU。

    换句话说,不是8个核心是瓶颈,而是套接字通信。因此,您希望尽可能并行化(或使用非阻塞I/O)。

    10-08 01:15
    查看更多