我正在撰写一些有关使用Blocking Vs Non-Blocking套接字的文章。我目前正在使用线程和阻塞套接字进行一些实验,但发现一些有趣的结果,我不确定该如何解释。
注意:我知道现代服务器使用具有非阻塞套接字的事件驱动模型来获得更好的性能,我正在努力解决这个问题,但是我想首先获取基线数据编号。
我想我应该问的问题如下。但是,对于正在发生的事情或我应该实际询问或需要的时间/度量/氨基葡萄糖的任何输入,我们将不胜感激。
设置
实验正在Amazon上运行:
Instance T vCPUs Memory (GiB) Storage (GB) Network
c3.2xlarge 8 15 2 x 80 SSD High
我正在使用siege对服务器进行负载测试:
> wc data.txt
0 1 32 data.txt
> siege --delay=0.001 --time=1m --concurrent=<concurrency> -H 'Content-Length: 32' -q '<host>/message POST < data.txt'
服务器:
我有四个版本的代码。这是HTTP服务器的最基本的基本类型。无论您要求什么,您都将得到相同的响应(这基本上是测试吞吐量)。
然后,每个已接受的请求都由
std::thread
处理,该文件已分离。 固定大小的
std::thread
线程池。每个接受的请求都会创建一个作业,该作业将添加到作业队列中,以供线程池处理。 std::async()
的多线程每个接受的请求都通过`std::async()执行,将来存储在队列中。辅助线程等待每个将来完成,然后再丢弃它。
期望
它应该以最大速率达到最高。
但是,当存在大量并发连接时,性能将大大下降。我的实验在8核系统上的255个 Activity 连接(因此有255个线程)达到最高。
因为我们创建的线程数只有硬件能够自然支持的数量,所以性能不会下降。
尽管我希望这比手写线程池更有效。
实际结果
尝试了实际的并发大小。
1, 2, 4, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 255
我对“多”线程版本的性能感到惊讶。因此,我将线程池版本的大小加倍,以查看发生了什么。
ThreadQueue jobs(std::thread::hardware_concurrency());
// Changed this line to:
ThreadQueue jobs(std::thread::hardware_concurrency() * 2);
这就是为什么您在图形中看到两行线程池的原因。
需要帮忙
标准库
std::async()
是最佳版本并不奇怪。但是,我对具有基本相同性能的多线程版本感到十分震惊。此版本(多线程)为每个接受的传入连接创建一个新线程,然后简单地分离该线程(允许其运行到完成)。当并发达到255时,我们将在进程中运行255个后台线程。
所以问题是:
鉴于
Socket::worker()
的运行时间很短,我无法相信与这项工作相比,创建线程的成本可以忽略不计。另外,由于它保持与std::async()
相似的性能,因此似乎表明在后台进行了一些重用。是否有人对线程重用的标准要求有任何了解,以及我应该期望的重用行为是什么?
阻塞模型将在什么时候崩溃?在255个并发请求时,我并不期望线程模型能够跟上。我显然需要在这里重新设定期望。
代码
套接字包装器代码是标准套接字的非常薄的一层(只是在出错时抛出异常)。如果需要,可以使用current code is here,但我认为这并不重要。
该代码的完整源代码是available here。
socks :: worker
这是所有服务器通用的代码共享位。基本上,它接收一个接受的套接字对象(通过移动),并且基本上将
data
对象写入该套接字。void worker(DataSocket&& accepted, ServerSocket& server, std::string const& data, int& finished)
{
DataSocket accept(std::move(accepted));
HTTPServer acceptHTTPServer(accept);
try
{
std::string message;
acceptHTTPServer.recvMessage(message);
// std::cout << message << "\n";
if (!finished && message == "Done")
{
finished = 1;
server.stop();
acceptHTTPServer.sendMessage("", "Stoped");
}
else
{
acceptHTTPServer.sendMessage("", data);
}
}
catch(DropDisconnectedPipe const& e)
{
std::cerr << "Pipe Disconnected: " << e.what() << "\n";
}
}
单线
int main(int argc, char* argv[])
{
// Builds a string that is sent back with each response.
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
while(!finished)
{
Sock::DataSocket accept = server.accept();
// Simply sends "data" back over http.
Sock::worker(std::move(accept), server, data, finished);
}
}
多线程
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
while(!finished)
{
Sock::DataSocket accept = server.accept();
std::thread work(Sock::worker, std::move(accept), std::ref(server), std::ref(data), std::ref(finished));
work.detach();
}
}
带队列的多线程
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
std::cerr << "Concurrency: " << std::thread::hardware_concurrency() << "\n";
ThreadQueue jobs(std::thread::hardware_concurrency());
while(!finished)
{
Sock::DataSocket accept = server.accept();
// Had some issues with storing a lambda that captured
// a move only object so I created WorkJob as a simple
// functor instead of the lambda.
jobs.startJob(WorkJob(std::move(accept), server, data, finished));
}
}
然后辅助代码控制池
class WorkJob
{
Sock::DataSocket accept;
Sock::ServerSocket& server;
std::string const& data;
int& finished;
public:
WorkJob(Sock::DataSocket&& accept, Sock::ServerSocket& server, std::string const& data, int& finished)
: accept(std::move(accept))
, server(server)
, data(data)
, finished(finished)
{}
WorkJob(WorkJob&& rhs)
: accept(std::move(rhs.accept))
, server(rhs.server)
, data(rhs.data)
, finished(rhs.finished)
{}
void operator()()
{
Sock::worker(std::move(accept), server, data, finished);
}
};
class ThreadQueue
{
using WorkList = std::deque<WorkJob>;
std::vector<std::thread> threads;
std::mutex safe;
std::condition_variable cond;
WorkList work;
int finished;
WorkJob getWorkJob()
{
std::unique_lock<std::mutex> lock(safe);
cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
auto result = std::move(work.front());
work.pop_front();
return result;
}
void doWork()
{
while(!finished)
{
WorkJob job = getWorkJob();
if (!finished)
{
job();
}
}
}
public:
void startJob(WorkJob&& item)
{
std::unique_lock<std::mutex> lock(safe);
work.push_back(std::move(item));
cond.notify_one();
}
ThreadQueue(int count)
: threads(count)
, finished(false)
{
for(int loop = 0;loop < count; ++loop)
{
threads[loop] = std::thread(&ThreadQueue::doWork, this);
}
}
~ThreadQueue()
{
{
std::unique_lock<std::mutex> lock(safe);
finished = true;
}
cond.notify_all();
}
};
异步的
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
FutureQueue future(finished);
while(!finished)
{
Sock::DataSocket accept = server.accept();
future.addFuture([accept = std::move(accept), &server, &data, &finished]() mutable {Sock::worker(std::move(accept), server, data, finished);});
}
}
辅助类来整理 future 。
class FutureQueue
{
using MyFuture = std::future<void>;
using FutureList = std::list<MyFuture>;
int& finished;
FutureList futures;
std::mutex mutex;
std::condition_variable cond;
std::thread cleaner;
void waiter()
{
while(finished)
{
std::future<void> next;
{
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
if (futures.empty() && !finished)
{
next = std::move(futures.front());
futures.pop_front();
}
}
if (!next.valid())
{
next.wait();
}
}
}
public:
FutureQueue(int& finished)
: finished(finished)
, cleaner(&FutureQueue::waiter, this)
{}
~FutureQueue()
{
cleaner.join();
}
template<typename T>
void addFuture(T&& lambda)
{
std::unique_lock<std::mutex> lock(mutex);
futures.push_back(std::async(std::launch::async, std::move(lambda)));
cond.notify_one();
}
};
最佳答案
该应用程序肯定会受I/O约束,而不是CPU约束,这意味着处理任何单个请求所花费的大部分时间都花在等待阻塞I/O操作上,而不是实际进行计算。
因此拥有更多线程(最多可以达到一个线程,但可能超过256个线程)将会更快,因为它允许不同套接字上的更多并发I/O,因为它们都可以交换CPU。
换句话说,不是8个核心是瓶颈,而是套接字通信。因此,您希望尽可能并行化(或使用非阻塞I/O)。