Boris的article向我们展示了如何创建boost::asio的扩展。我尝试在已注册的信号上添加signal_set和async_wait。然后程序挂起,直到触发第二个SIGINT。虽然,我只想在一个信号内完成它。

这是我的代码。我在Ubuntu上使用gcc-4.6.3和boost-1.52.0对其进行了测试。

编译 -
gcc -I/boost_inc -L/boot_lib main.cpp -lpthread -lboost_system -lboost_thread

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <cstddef>

template <typename Service>
class basic_timer
  : public boost::asio::basic_io_object<Service>
{
  public:
    explicit basic_timer(boost::asio::io_service &io_service)
      : boost::asio::basic_io_object<Service>(io_service)
    {}

    void wait(std::size_t seconds)
    { return this->service.wait(this->implementation, seconds); }

    template <typename Handler>
    void async_wait(std::size_t seconds, Handler handler)
    { this->service.async_wait(this->implementation, seconds, handler); }
};

class timer_impl;

template <typename TimerImplementation = timer_impl>
class basic_timer_service
  : public boost::asio::io_service::service
{
  public:
    static boost::asio::io_service::id id;

    explicit basic_timer_service(boost::asio::io_service &io_service)
      : boost::asio::io_service::service(io_service),
      async_work_(new boost::asio::io_service::work(async_io_service_)),
      async_thread_(
        boost::bind(&boost::asio::io_service::run, &async_io_service_))
    {}

    ~basic_timer_service()
    {
      async_work_.reset();
      async_io_service_.stop();
      async_thread_.join();  // program is blocked here until the second
                             // signal is triggerd
      async_io_service_.reset();
    }

    typedef boost::shared_ptr<TimerImplementation> implementation_type;

    void construct(implementation_type &impl)
    {
      impl.reset(new TimerImplementation());
    }

    void destroy(implementation_type &impl)
    {
      impl->destroy();
      impl.reset();
    }

    void wait(implementation_type &impl, std::size_t seconds)
    {
      boost::system::error_code ec;
      impl->wait(seconds, ec);
      boost::asio::detail::throw_error(ec);
    }

    template <typename Handler>
    class wait_operation
    {
      public:
        wait_operation(
          implementation_type &impl,
          boost::asio::io_service &io_service,
          std::size_t seconds, Handler handler)
          : impl_(impl),
          io_service_(io_service),
          work_(io_service),
          seconds_(seconds),
          handler_(handler)
        {}

        void operator()() const
        {
          implementation_type impl = impl_.lock();
          if (!io_service_.stopped() && impl)
          {
              boost::system::error_code ec;
              impl->wait(seconds_, ec);
              this->io_service_.post(
                boost::asio::detail::bind_handler(handler_, ec));
          }
          else
          {
              this->io_service_.post(
                boost::asio::detail::bind_handler(
                  handler_, boost::asio::error::operation_aborted));
          }
      }

      private:
        boost::weak_ptr<TimerImplementation> impl_;
        boost::asio::io_service &io_service_;
        boost::asio::io_service::work work_;
        std::size_t seconds_;
        Handler handler_;
    };

    template <typename Handler>
    void async_wait(
      implementation_type &impl,
      std::size_t seconds, Handler handler)
    {
      this->async_io_service_.post(
        wait_operation<Handler>(
          impl, this->get_io_service(), seconds, handler));
    }

  private:
    void shutdown_service()
    {}

    boost::asio::io_service async_io_service_;
    boost::scoped_ptr<boost::asio::io_service::work> async_work_;
    boost::thread async_thread_;
};

class timer_impl
{
  public:
    timer_impl()
    {}

    ~timer_impl()
    {}

    void destroy()
    {}

    void wait(std::size_t seconds, boost::system::error_code &ec)
    {
      sleep(seconds);
      ec = boost::system::error_code();
    }
};


typedef basic_timer<basic_timer_service<> > timer;

template <typename TimerImplementation>
boost::asio::io_service::id basic_timer_service<TimerImplementation>::id;
void wait_handler(const boost::system::error_code &ec)
{
  std::cout << "5 s." << std::endl;
}

int main()
{
  {
    boost::asio::io_service io_service;
    boost::asio::signal_set signals(io_service);
    timer t(io_service);

    signals.add(SIGINT);
    signals.async_wait(
      boost::bind(&boost::asio::io_service::stop, &io_service));

    t.async_wait(2, wait_handler);

    std:: cout << "async called\n" ;
    io_service.run();
  }

  { // this block will not be executed
    boost::asio::io_service io_service;
    timer t(io_service);

    t.async_wait(2, wait_handler);
    std:: cout << "async called\n" ;
    io_service.run();
  }
  return 0;
}

最佳答案

在尝试了asio的作者提供的example之后,我遇到了相同的行为。因此,我深入研究了该库的源代码,发现该源代码使用 io_service_impl 的接口(interface),而不是io_service的接口(interface)。此外,发布到io_service_impl的操作函子与io_service调用的函数不同。总之,我决定根据asio的内部接口(interface)重写计时器示例。

我在此介绍重写的计时器示例。

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <cstddef>

#define get_service_impl(X) \
  ba::use_service<bad::io_service_impl>(X)

namespace ba = boost::asio;
namespace bad = boost::asio::detail;

// Nothing changed
template <typename Service>
class basic_timer
  : public boost::asio::basic_io_object<Service>
{
public:
  explicit basic_timer(boost::asio::io_service &io_service)
    : boost::asio::basic_io_object<Service>(io_service)
  {}

  void wait(std::size_t seconds)
  { return this->service.wait(this->implementation, seconds); }

  template <typename Handler>
  void async_wait(std::size_t seconds, Handler handler)
  { this->service.async_wait(this->implementation, seconds, handler); }
};

// Nothing changed
class timer_impl
{
public:
  void wait(std::size_t seconds, boost::system::error_code &ec)
  {
    sleep(seconds);
    ec = boost::system::error_code();
  }
};

// ----- Change a lot! --------
class basic_timer_service
: public boost::asio::io_service::service
{
public:
  typedef boost::asio::detail::socket_ops::shared_cancel_token_type
    implementation_type;

  static boost::asio::io_service::id id;

  explicit basic_timer_service(boost::asio::io_service &io_service)
    : boost::asio::io_service::service(io_service),
      io_service_impl_(get_service_impl(io_service)),
      work_io_service_( new boost::asio::io_service ),
      work_io_service_impl_(get_service_impl(*work_io_service_)),
      work_(new ba::io_service::work(*work_io_service_)),
      work_thread_() // do not create thread here
  {}

  ~basic_timer_service()
  {  shutdown_service(); }

  void construct(implementation_type &impl)
  { impl.reset(new timer_impl()); }

  void cancel(implementation_type &impl)
  {
        impl.reset((void*)0, boost::asio::detail::socket_ops::noop_deleter());
  }

  void destroy(implementation_type &impl)
  { impl.reset(); }

  void shutdown_service()
  {
    work_.reset();
    if(work_io_service_.get()){
      work_io_service_->stop();
      if (work_thread_.get()){
        work_thread_->join();
        work_thread_.reset();
      }
    }
    work_io_service_.reset();
  }

  void wait(implementation_type &impl, std::size_t seconds)
  {
    boost::system::error_code ec;
    // XXX I not sure this is safe
    timer_impl *impl_ptr = static_cast<timer_impl*>(impl.get());
    impl_ptr->wait(seconds, ec);
    boost::asio::detail::throw_error(ec);
  }

  template <typename Handler>
  class wait_operation
  : public boost::asio::detail::operation
  {
  public:
    BOOST_ASIO_DEFINE_HANDLER_PTR(wait_operation);

    // namespace ba = boost::asio
    // namespace bad = boost::asio::detail
    wait_operation(
      bad::socket_ops::weak_cancel_token_type cancel_token,
      std::size_t seconds,
      bad::io_service_impl& ios,
      Handler handler)
      : bad::operation(&wait_operation::do_complete),
      cancel_token_(cancel_token),
      seconds_(seconds),
      io_service_impl_(ios),
      handler_(handler)
    {}

    static void do_complete(
      bad::io_service_impl *owner,
      bad::operation *base,
      boost::system::error_code const & /* ec */ ,
      std::size_t /* byte_transferred */ )
    {
      wait_operation *o(static_cast<wait_operation*>(base));
      ptr p = { boost::addressof(o->handler_), o, o};

      // Distinguish between main io_service and private io_service
      if(owner && owner != &o->io_service_impl_)
      { // private io_service

        // Start blocking call
        bad::socket_ops::shared_cancel_token_type lock =
          o->cancel_token_.lock();

        if(!lock){
          o->ec_ = boost::system::error_code(
            ba::error::operation_aborted,
                            boost::system::system_category());
        }else{
          timer_impl *impl = static_cast<timer_impl*>(lock.get());
          impl->wait(o->seconds_, o->ec_);
        }
        // End of blocking call

        o->io_service_impl_.post_deferred_completion(o);
        p.v = p.p = 0;
      }else{ // main io_service
        bad::binder1<Handler, boost::system::error_code>
          handler(o->handler_, o->ec_);
        p.h = boost::addressof(handler.handler_);
        p.reset();
        if(owner){
          bad::fenced_block b(bad::fenced_block::half);
          boost_asio_handler_invoke_helpers::invoke(
                        handler, handler.handler_);
        }
      }
    }

  private:
    bad::socket_ops::weak_cancel_token_type cancel_token_;
    std::size_t seconds_;
    bad::io_service_impl &io_service_impl_;
    Handler handler_;
    boost::system::error_code ec_;
  };

  template <typename Handler>
  void async_wait(
    implementation_type &impl,
    std::size_t seconds, Handler handler)
  {
    typedef wait_operation<Handler> op;
    typename op::ptr p = {
      boost::addressof(handler),
      boost_asio_handler_alloc_helpers::allocate(
        sizeof(op), handler), 0};
    p.p = new (p.v) op(impl, seconds, io_service_impl_, handler);
    start_op(p.p);
    p.v = p.p = 0;
  }

protected:
  // Functor for runing background thread
  class work_io_service_runner
  {
  public:
    work_io_service_runner(ba::io_service &io_service)
      : io_service_(io_service) {}

    void operator()(){ io_service_.run(); }
  private:
    ba::io_service &io_service_;
  };

  void start_op(bad::operation* op)
  {
    start_work_thread();
    io_service_impl_.work_started();
    work_io_service_impl_.post_immediate_completion(op);
  }

  void start_work_thread()
  {
    bad::mutex::scoped_lock lock(mutex_);
    if (!work_thread_.get())
    {
      work_thread_.reset(new bad::thread(
          work_io_service_runner(*work_io_service_)));
    }
  }

  bad::io_service_impl& io_service_impl_;

private:
  bad::mutex mutex_;
  boost::scoped_ptr<ba::io_service> work_io_service_;
  bad::io_service_impl &work_io_service_impl_;
  boost::scoped_ptr<ba::io_service::work> work_;
  boost::scoped_ptr<bad::thread> work_thread_;
};

boost::asio::io_service::id basic_timer_service::id;

typedef basic_timer<basic_timer_service> timer;

void wait_handler(const boost::system::error_code &ec)
{
  if(!ec)
    std::cout << "wait_handler is called\n" ;
  else
    std::cerr << "Error: " << ec.message() << "\n";
}

int main()
{
  {
    boost::asio::io_service io_service;
    boost::asio::signal_set signals(io_service);
    timer t(io_service);

    signals.add(SIGINT);
    signals.async_wait(
      boost::bind(&boost::asio::io_service::stop, &io_service));

    t.async_wait(2, wait_handler);

    std:: cout << "async called\n" ;
    io_service.run();
    std:: cout << "exit loop\n";
  }

  {
    boost::asio::io_service io_service;
    timer t(io_service);

    t.async_wait(2, wait_handler);
    std:: cout << "async called\n" ;
    io_service.run();
  }
  return 0;
}

编译
gcc -I/boost_inc -L/boot_lib main.cpp -lpthread -lboost_system -lboost_thread
新计时器工作正常。我仍然想知道如何编写asio的非侵入式扩展。

09-06 12:02