如果我想将boost::asio中的内容集成到基于文件描述符(选择/轮询)的eventloop中,该如何实现?其他具有异步功能的库提供要分发的文件描述符,该文件描述符将在工作完成后立即变得可读,以便您可以将其集成到eventloop的select / poll中,并使其调用该库的处理回调(像单次事件处理一样)。

一个很好的例子是线程池中的异步名称解析器,就像this question中讨论的那样。

最佳答案

基于this answer中的示例,我想到了使用通用处理程序的解决方案,该处理程序写入唤醒管道,然后将处理程序调用发布到另一个io_service中。可以在基于文件描述符的事件循环中使用管道的读取端,并从那里调用回调run_handler(),这将清除管道并在主线程中运行挂起的处理程序。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>

/// @brief Type used to emulate asynchronous host resolution with a
///        dedicated thread pool.
class resolver {
public:
  resolver(const std::size_t pool_size)
    : work_(boost::ref(resolver_service_)) {
    // Create wake-up pipe
    pipe(pipe_);
    fcntl(pipe_[0], F_SETFL, O_NONBLOCK);
    // Create pool.
    for (std::size_t i = 0; i < pool_size; ++i)
      threads_.create_thread(boost::bind(&boost::asio::io_service::run,
        &resolver_service_));
  }

  ~resolver() {
    work_ = boost::none;
    threads_.join_all();
  }

  template <typename QueryOrEndpoint, typename Handler>
  void async_resolve(QueryOrEndpoint query, Handler handler) {
    resolver_service_.post(boost::bind(
        &resolver::do_async_resolve<QueryOrEndpoint, Handler>, this,
        query, handler));
  }

  // callback for eventloop in main thread
  void run_handler() {
    char c;
    // clear wake-up pipe
    while (read(pipe_[0], &c, 1) > 0);
    // run handler posted from resolver threads
    handler_service_.poll();
    handler_service_.reset();
  }

  // get read end of wake up pipe for polling in eventloop
  int fd() {
    return pipe_[0];
  }

private:
  /// @brief Resolve address and invoke continuation handler.
  template <typename QueryOrEndpoint, typename Handler>
  void do_async_resolve(const QueryOrEndpoint& query, Handler handler) {
    typedef typename QueryOrEndpoint::protocol_type protocol_type;
    typedef typename protocol_type::resolver        resolver_type;

    // Resolve synchronously, as synchronous resolution will perform work
    // in the calling thread.  Thus, it will not use Boost.Asio's internal
    // thread that is used for asynchronous resolution.
    boost::system::error_code error;
    resolver_type resolver(resolver_service_);
    typename resolver_type::iterator result = resolver.resolve(query, error);

    // post handler callback to service running in main thread
    handler_service_.post(boost::bind(handler, error, result));
    // wake up eventloop in main thread
    write(pipe_[1], "*", 1);
  }

private:
  boost::asio::io_service resolver_service_;
  boost::asio::io_service handler_service_;
  boost::optional<boost::asio::io_service::work> work_;
  boost::thread_group threads_;
  int pipe_[2];
};

template <typename ProtocolType>
void handle_resolve(
    const boost::system::error_code& error,
    typename ProtocolType::resolver::iterator iterator) {
  std::stringstream stream;
  stream << "handle_resolve:\n"
            "  " << error.message() << "\n";
  if (!error)
    stream << "  " << iterator->endpoint() << "\n";

  std::cout << stream.str();
  std::cout.flush();
}

int main() {
  // Resolver will emulate asynchronous host resolution with a pool of 5
  // threads.
  resolver resolver(5);

  namespace ip = boost::asio::ip;
  resolver.async_resolve(
      ip::udp::resolver::query("localhost", "12345"),
      &handle_resolve<ip::udp>);
  resolver.async_resolve(
      ip::tcp::resolver::query("www.google.com", "80"),
      &handle_resolve<ip::tcp>);
  resolver.async_resolve(
      ip::udp::resolver::query("www.stackoverflow.com", "80"),
      &handle_resolve<ip::udp>);
  resolver.async_resolve(
      ip::icmp::resolver::query("some.other.address", "54321"),
      &handle_resolve<ip::icmp>);

  pollfd fds;
  fds.fd = resolver.fd();
  fds.events = POLLIN;

  // simple eventloop
  while (true) {
    if (poll(&fds, 1, 2000)) // waiting for wakeup call
      resolver.run_handler(); // call resolve handler
    else
      break;
  }
}

关于c++ - 将boost::asio集成到基于文件描述符的事件循环中(选择/轮询),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24449936/

10-11 18:35