如果我想将boost::asio中的内容集成到基于文件描述符(选择/轮询)的eventloop中,该如何实现?其他具有异步功能的库提供要分发的文件描述符,该文件描述符将在工作完成后立即变得可读,以便您可以将其集成到eventloop的select / poll中,并使其调用该库的处理回调(像单次事件处理一样)。
一个很好的例子是线程池中的异步名称解析器,就像this question中讨论的那样。
最佳答案
基于this answer中的示例,我想到了使用通用处理程序的解决方案,该处理程序写入唤醒管道,然后将处理程序调用发布到另一个io_service中。可以在基于文件描述符的事件循环中使用管道的读取端,并从那里调用回调run_handler(),这将清除管道并在主线程中运行挂起的处理程序。
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
/// @brief Type used to emulate asynchronous host resolution with a
/// dedicated thread pool.
class resolver {
public:
resolver(const std::size_t pool_size)
: work_(boost::ref(resolver_service_)) {
// Create wake-up pipe
pipe(pipe_);
fcntl(pipe_[0], F_SETFL, O_NONBLOCK);
// Create pool.
for (std::size_t i = 0; i < pool_size; ++i)
threads_.create_thread(boost::bind(&boost::asio::io_service::run,
&resolver_service_));
}
~resolver() {
work_ = boost::none;
threads_.join_all();
}
template <typename QueryOrEndpoint, typename Handler>
void async_resolve(QueryOrEndpoint query, Handler handler) {
resolver_service_.post(boost::bind(
&resolver::do_async_resolve<QueryOrEndpoint, Handler>, this,
query, handler));
}
// callback for eventloop in main thread
void run_handler() {
char c;
// clear wake-up pipe
while (read(pipe_[0], &c, 1) > 0);
// run handler posted from resolver threads
handler_service_.poll();
handler_service_.reset();
}
// get read end of wake up pipe for polling in eventloop
int fd() {
return pipe_[0];
}
private:
/// @brief Resolve address and invoke continuation handler.
template <typename QueryOrEndpoint, typename Handler>
void do_async_resolve(const QueryOrEndpoint& query, Handler handler) {
typedef typename QueryOrEndpoint::protocol_type protocol_type;
typedef typename protocol_type::resolver resolver_type;
// Resolve synchronously, as synchronous resolution will perform work
// in the calling thread. Thus, it will not use Boost.Asio's internal
// thread that is used for asynchronous resolution.
boost::system::error_code error;
resolver_type resolver(resolver_service_);
typename resolver_type::iterator result = resolver.resolve(query, error);
// post handler callback to service running in main thread
handler_service_.post(boost::bind(handler, error, result));
// wake up eventloop in main thread
write(pipe_[1], "*", 1);
}
private:
boost::asio::io_service resolver_service_;
boost::asio::io_service handler_service_;
boost::optional<boost::asio::io_service::work> work_;
boost::thread_group threads_;
int pipe_[2];
};
template <typename ProtocolType>
void handle_resolve(
const boost::system::error_code& error,
typename ProtocolType::resolver::iterator iterator) {
std::stringstream stream;
stream << "handle_resolve:\n"
" " << error.message() << "\n";
if (!error)
stream << " " << iterator->endpoint() << "\n";
std::cout << stream.str();
std::cout.flush();
}
int main() {
// Resolver will emulate asynchronous host resolution with a pool of 5
// threads.
resolver resolver(5);
namespace ip = boost::asio::ip;
resolver.async_resolve(
ip::udp::resolver::query("localhost", "12345"),
&handle_resolve<ip::udp>);
resolver.async_resolve(
ip::tcp::resolver::query("www.google.com", "80"),
&handle_resolve<ip::tcp>);
resolver.async_resolve(
ip::udp::resolver::query("www.stackoverflow.com", "80"),
&handle_resolve<ip::udp>);
resolver.async_resolve(
ip::icmp::resolver::query("some.other.address", "54321"),
&handle_resolve<ip::icmp>);
pollfd fds;
fds.fd = resolver.fd();
fds.events = POLLIN;
// simple eventloop
while (true) {
if (poll(&fds, 1, 2000)) // waiting for wakeup call
resolver.run_handler(); // call resolve handler
else
break;
}
}
关于c++ - 将boost::asio集成到基于文件描述符的事件循环中(选择/轮询),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24449936/