我想要的是当一个消息队列收到一个整数N时,在N秒后将调用处理程序函数。下面是我的代码。

如果两个附近消息队列的持续时间秒数大于int N,它将运行正常,但是当两个接收到的消息队列之间的持续时间秒数小于N时,该处理程序将在一个处理程序中打印“操作取消”。想。

非常感谢您的帮助。

#include <boost/asio.hpp>
#include <zmq.h>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;

void* context = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);


void handler(const boost::system::error_code &ec) {
    std::cout << "hello, world" << "\t" << ec.message() << std::endl;
}

void run() {
    io_service.run();
}

void thread_listener() {

     int nRecv;
     boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(0));
     while( true ) {
         zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
         std::cout << nRecv << std::endl;
         timer.expires_from_now(boost::posix_time::seconds(nRecv));
         timer.async_wait(handler);
     }

 }

 int main(int argc, char* argv[]) {

     boost::asio::io_service::work work(io_service);

     zmq_bind(sock_pull, "tcp://*:60000");
     boost::thread tThread(thread_listener);
     boost::thread tThreadRun(run);
     tThread.join();
     tThreadRun.join();
     return 0;

 }

最佳答案

你打电话时

timer.expires_from_now(boost::posix_time::seconds(nRecv));

as the documentation states取消任何未决的异步计时器。

如果您希望在给定的时间内有重叠的请求在飞行中,那么一个计时器显然是不够的。幸运的是,在Asio中有一个众所周知的绑定(bind)共享指针模式,您可以使用它来模仿每个响应的“ session ”。

假设您定义了一个 session 以包含其自己的私有(private)计时器:
struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N))
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

现在,替换使用硬编码计时器实例的代码很简单:
 timer.expires_from_now(boost::posix_time::seconds(nRecv));
 timer.async_wait(handler);

session 开始:
 boost::make_shared<session>(io_service, nRecv)->start();

一个完整的示例(带有适当存根的ZMQ内容): Live On Coliru
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <iostream>

boost::asio::io_service io_service;

/////////////////////////////////////////////////////////////////////////
// I love stubbing out stuff I don't want to install just to help others
enum { ZMQ_PULL };
static void* zmq_ctx_new()         { return nullptr; }
static void* zmq_socket(void*,int) { return nullptr; }
static void  zmq_bind(void*,char const*) {}
static void  zmq_recv(void*,int*data,size_t,int)
{
    boost::this_thread::sleep_for(boost::chrono::milliseconds(rand()%1000));
    *data = 2;
}
// End of stubs :)
/////////////////////////////////////////////////////////////////////////

void* context  = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N))
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

    ~session() {
        std::cout << "bye (session end)\n";
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

void run() {
    io_service.run();
}

void thread_listener() {
    int nRecv = 0;
    for(int n=0; n<4; ++n) {
        zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
        std::cout << nRecv << std::endl;

        boost::make_shared<session>(io_service, nRecv)->start();
    }
}

int main() {
    auto work = boost::make_shared<boost::asio::io_service::work>(io_service);

    zmq_bind(sock_pull, "tcp://*:60000");
    boost::thread tThread(thread_listener);
    boost::thread tThreadRun(run);

    tThread.join();
    work.reset();

    tThreadRun.join();
}

关于c++ - 在N秒内两次提升asio最后期限time_timer async_wait(N秒)导致操作被取消,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/25960268/

10-11 18:57