问题描述
我正在尝试实现由C ++代码控制的硬件设备的同步操作.
假定可以在其中执行Open/Close
的两种类型的设备.我需要实现的是为Specified Duration
打开一种类型的设备.第二类设备也是如此.
我已经用boost::deadline_timer
编写了代码:
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
class Test : public std::enable_shared_from_this <Test>
{
public:
Test() :io_(), timerOne_(io_),timerTwo_(io_){}
void Open(int num);
void Close(int num);
void TimedOpen(int num, int dur);
void Run();
private:
boost::asio::io_service io_;
boost::asio::deadline_timer timerOne_;
boost::asio::deadline_timer timerTwo_;
};
void Test::Open(int type)
{
std::cout << "Open for Number : " << type << std::endl;
}
void Test::Close(int type)
{
std::cout << "Close for Number : " << type << std::endl;
}
void Test::TimedOpen(int type, int dur)
{
switch (type)
{
case 1:
{
io_.reset();
auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
fn(type);
timerOne_.expires_from_now(boost::posix_time::seconds(dur));
timerOne_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
Run();
std::cout << "Function Exiting" << std::endl;
std::cout << "-----------------------------------------------" << std::endl;
return;
}
case 2:
{
io_.reset();
auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
fn(type);
timerTwo_.expires_from_now(boost::posix_time::seconds(dur));
timerTwo_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
Run();
std::cout << "Function Exiting" << std::endl;
std::cout << "-----------------------------------------------" << std::endl;
return;
}
}
}
void Test::Run()
{
boost::thread th(boost::bind(&boost::asio::io_service::run, &io_));
}
int main()
{
auto t = std::make_shared<Test>();
t->TimedOpen(1, 60);
t->TimedOpen(2, 30);
t->TimedOpen(1, 5);
t->TimedOpen(2, 2);
char line[128];
while (std::cin.getline(line, 128))
{
if (strcmp(line, "\n")) break;
}
return 0;
}
输出为:
Open for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Function Exiting
-----------------------------------------------
Open for Number : 1
Close for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Close for Number : 2
Function Exiting
-----------------------------------------------
Close for Number : 2
Close for Number : 1
对于timerOne_
的
它不等待先前的wait
到期,即,一旦执行t->TimedOpen(1, 5)
,先前的动作t->TimedOpen(1, 60)
被取消.
所以Close for Number : 1
出现在输出中,而无需等待t->TimedOpen(1, 60)
.
我要实现的是,如果multiple waits are encountered
对于任何类型的timer
,则所有操作都应排队,即
如果我输入:
t->TimedOpen(1, 60);
t->TimedOpen(1, 10);
t->TimedOpen(1, 5);
它应该在60+10+5
秒内执行TimedOpen Operation
.目前仅执行5秒钟.另外,它应该是非阻塞的,即我不能使用wait() instead of async_wait()
.
我该如何实现?
摘要:我的要求是安排对boost::deadline_timer()
的操作,即除非先前的等待时间到期,否则对它的多个操作都将排队.
就像在评论中提到的那样,您希望每个类型"都有队列.
让每种类型的队列都称为会话".
通过在单个strand
¹上链接来自单个队列的所有异步等待,您可以获得有效的序列化(也避免了队列/会话上的同步).
唯一棘手的问题是在没有任何人在飞行时开始异步等待.不变的是,异步操作在iff !queue_.empty()
进行中:
struct Session : std::enable_shared_from_this<Session> {
Session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {}
void Enqueue(int duration) {
auto This = shared_from_this();
strand_.post([This,duration,this] {
std::cout << "t0 + " << std::setw(4) << mark() << "ms Enqueue for Number: " << type << " (dur:" << duration << ")\n";
This->queue_.push(duration);
if (This->queue_.size() == 1)
This->Wait();
});
}
private:
boost::asio::strand strand_;
boost::asio::deadline_timer timer_;
int type;
std::queue<int> queue_;
void Close() {
assert(!queue_.empty());
std::cout << "t0 + " << std::setw(4) << mark() << "ms Close for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n";
queue_.pop();
Wait();
}
void Wait() {
if (!queue_.empty()) {
std::cout << "t0 + " << std::setw(4) << mark() << "ms Open for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n";
timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front()));
timer_.async_wait(strand_.wrap(std::bind(&Session::Close, shared_from_this())));
}
}
};
现在Test
类变得更加简单(事实上,它根本不需要共享",但是我把这个细节留给了读者,这是众所周知的练习):
class Test : public std::enable_shared_from_this<Test> {
using guard = boost::lock_guard<boost::mutex>;
public:
Test() : io_(), work_(boost::asio::io_service::work(io_)) {
io_thread = boost::thread([this] { io_.run(); });
}
void TimedOpen(int num, int duration);
void Stop() {
{
guard lk(mx_);
if (work_) work_.reset();
}
io_thread.join();
}
~Test() {
Stop();
guard lk(mx_);
timers_ex_.clear();
}
private:
mutable boost::mutex mx_;
boost::asio::io_service io_;
boost::optional<boost::asio::io_service::work> work_;
std::map<int, std::shared_ptr<Session> > timers_ex_;
boost::thread io_thread;
};
void Test::TimedOpen(int type, int duration) {
guard lk(mx_);
auto &session = timers_ex_[type];
if (!session) session = std::make_shared<Session>(io_, type);
session->Enqueue(duration);
}
如您所见,我已经
- 推断为任意数量的类型
- 使操作具有线程安全性
- 添加了自
t0
以来的相对时间戳(以毫秒为单位) - 修复了完全损坏的
io_service
寿命.现在,施工开始了.work_
变量在空闲时保持活动状态. -
Stop()
将其关闭(首先耗尽会话队列). - 销毁隐式调用
Stop()
这是一个测试运行:
int main() {
auto t = std::make_shared<Test>();
t->TimedOpen(1, 300);
t->TimedOpen(2, 150);
t->TimedOpen(1, 50);
t->TimedOpen(2, 20);
boost::this_thread::sleep_for(boost::chrono::milliseconds(400));
std::cout << "================\n";
t->TimedOpen(1, 50);
t->TimedOpen(2, 20);
t->TimedOpen(1, 300);
t->TimedOpen(2, 150);
t->Stop();
}
打印
t0 + 0ms Enqueue for Number: 1 (dur:300)
t0 + 0ms Open for Number : 1 (dur:300) (depth 1)
t0 + 0ms Enqueue for Number: 2 (dur:150)
t0 + 0ms Open for Number : 2 (dur:150) (depth 1)
t0 + 0ms Enqueue for Number: 1 (dur:50)
t0 + 0ms Enqueue for Number: 2 (dur:20)
t0 + 150ms Close for Number : 2 (dur:150) (depth 2)
t0 + 150ms Open for Number : 2 (dur:20) (depth 1)
t0 + 170ms Close for Number : 2 (dur:20) (depth 1)
t0 + 300ms Close for Number : 1 (dur:300) (depth 2)
t0 + 300ms Open for Number : 1 (dur:50) (depth 1)
t0 + 350ms Close for Number : 1 (dur:50) (depth 1)
================
t0 + 400ms Enqueue for Number: 1 (dur:50)
t0 + 400ms Open for Number : 1 (dur:50) (depth 1)
t0 + 400ms Enqueue for Number: 2 (dur:20)
t0 + 400ms Open for Number : 2 (dur:20) (depth 1)
t0 + 400ms Enqueue for Number: 1 (dur:300)
t0 + 400ms Enqueue for Number: 2 (dur:150)
t0 + 420ms Close for Number : 2 (dur:20) (depth 2)
t0 + 420ms Open for Number : 2 (dur:150) (depth 1)
t0 + 450ms Close for Number : 1 (dur:50) (depth 2)
t0 + 450ms Open for Number : 1 (dur:300) (depth 1)
t0 + 570ms Close for Number : 2 (dur:150) (depth 1)
t0 + 750ms Close for Number : 1 (dur:300) (depth 1)
¹为什么我需要使用boost :: asio吗? I am trying to achieve synchronization operation for hardware devices controlled by my C++ code. Suppose Two types of devices are there on which I can perform I have written code with The Output is: For So What I want to achieve is that if If I type: It should do How do I achieve it? Summary:My requirement is to schedule operations on a Like was mentioned in a comment, you will want to have queues per "type". Let's name the per-type queue a "session". By chaining all async waits from a single queue on a single The only tricky bit is to start async wait when none is in flight. The invariant is that async operations are in flight iff Now the As you can see I've Here's a test run: Prints ¹ Why do I need strand per connection when using boost::asio? 这篇关于确保boost :: deadline_timer不接受新的等待,除非先前的等待已过期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!Open/Close
.What I need to achieve is Open one type of device for Specified Duration
. Same is true for Second type Of device.boost::deadline_timer
:#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
class Test : public std::enable_shared_from_this <Test>
{
public:
Test() :io_(), timerOne_(io_),timerTwo_(io_){}
void Open(int num);
void Close(int num);
void TimedOpen(int num, int dur);
void Run();
private:
boost::asio::io_service io_;
boost::asio::deadline_timer timerOne_;
boost::asio::deadline_timer timerTwo_;
};
void Test::Open(int type)
{
std::cout << "Open for Number : " << type << std::endl;
}
void Test::Close(int type)
{
std::cout << "Close for Number : " << type << std::endl;
}
void Test::TimedOpen(int type, int dur)
{
switch (type)
{
case 1:
{
io_.reset();
auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
fn(type);
timerOne_.expires_from_now(boost::posix_time::seconds(dur));
timerOne_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
Run();
std::cout << "Function Exiting" << std::endl;
std::cout << "-----------------------------------------------" << std::endl;
return;
}
case 2:
{
io_.reset();
auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1);
fn(type);
timerTwo_.expires_from_now(boost::posix_time::seconds(dur));
timerTwo_.async_wait(std::bind(&Test::Close, shared_from_this(), type));
Run();
std::cout << "Function Exiting" << std::endl;
std::cout << "-----------------------------------------------" << std::endl;
return;
}
}
}
void Test::Run()
{
boost::thread th(boost::bind(&boost::asio::io_service::run, &io_));
}
int main()
{
auto t = std::make_shared<Test>();
t->TimedOpen(1, 60);
t->TimedOpen(2, 30);
t->TimedOpen(1, 5);
t->TimedOpen(2, 2);
char line[128];
while (std::cin.getline(line, 128))
{
if (strcmp(line, "\n")) break;
}
return 0;
}
Open for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Function Exiting
-----------------------------------------------
Open for Number : 1
Close for Number : 1
Function Exiting
-----------------------------------------------
Open for Number : 2
Close for Number : 2
Function Exiting
-----------------------------------------------
Close for Number : 2
Close for Number : 1
timerOne_
It does not wait for previous wait
to expire i.e. as soon as t->TimedOpen(1, 5)
is executed the previous action t->TimedOpen(1, 60)
is cancelled. Close for Number : 1
appears in output without waiting for t->TimedOpen(1, 60)
.multiple waits are encountered
for any type of timer
, all the operations should be queued i.e. t->TimedOpen(1, 60);
t->TimedOpen(1, 10);
t->TimedOpen(1, 5);
TimedOpen Operation
for 60+10+5
seconds. Currently it does only for 5 secs. Also It should be non blocking i.e. I can not use wait() instead of async_wait()
.boost::deadline_timer()
i.e. multiple operations on it will be queued unless previous wait is expired.strand
¹ you get effective serialization (also avoids synchronization on the queue/session).!queue_.empty()
:struct Session : std::enable_shared_from_this<Session> {
Session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {}
void Enqueue(int duration) {
auto This = shared_from_this();
strand_.post([This,duration,this] {
std::cout << "t0 + " << std::setw(4) << mark() << "ms Enqueue for Number: " << type << " (dur:" << duration << ")\n";
This->queue_.push(duration);
if (This->queue_.size() == 1)
This->Wait();
});
}
private:
boost::asio::strand strand_;
boost::asio::deadline_timer timer_;
int type;
std::queue<int> queue_;
void Close() {
assert(!queue_.empty());
std::cout << "t0 + " << std::setw(4) << mark() << "ms Close for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n";
queue_.pop();
Wait();
}
void Wait() {
if (!queue_.empty()) {
std::cout << "t0 + " << std::setw(4) << mark() << "ms Open for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n";
timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front()));
timer_.async_wait(strand_.wrap(std::bind(&Session::Close, shared_from_this())));
}
}
};
Test
class becomes much simpler (in fact it doesn't need to be "shared" at all, but I've left that detail as the proverbial exercise for the reader):class Test : public std::enable_shared_from_this<Test> {
using guard = boost::lock_guard<boost::mutex>;
public:
Test() : io_(), work_(boost::asio::io_service::work(io_)) {
io_thread = boost::thread([this] { io_.run(); });
}
void TimedOpen(int num, int duration);
void Stop() {
{
guard lk(mx_);
if (work_) work_.reset();
}
io_thread.join();
}
~Test() {
Stop();
guard lk(mx_);
timers_ex_.clear();
}
private:
mutable boost::mutex mx_;
boost::asio::io_service io_;
boost::optional<boost::asio::io_service::work> work_;
std::map<int, std::shared_ptr<Session> > timers_ex_;
boost::thread io_thread;
};
void Test::TimedOpen(int type, int duration) {
guard lk(mx_);
auto &session = timers_ex_[type];
if (!session) session = std::make_shared<Session>(io_, type);
session->Enqueue(duration);
}
t0
io_service
lifetime. Now, construction starts the service. The work_
variable keeps it alive when idle.Stop()
shuts it down (draining the session queues first).Stop()
implicitlyint main() {
auto t = std::make_shared<Test>();
t->TimedOpen(1, 300);
t->TimedOpen(2, 150);
t->TimedOpen(1, 50);
t->TimedOpen(2, 20);
boost::this_thread::sleep_for(boost::chrono::milliseconds(400));
std::cout << "================\n";
t->TimedOpen(1, 50);
t->TimedOpen(2, 20);
t->TimedOpen(1, 300);
t->TimedOpen(2, 150);
t->Stop();
}
t0 + 0ms Enqueue for Number: 1 (dur:300)
t0 + 0ms Open for Number : 1 (dur:300) (depth 1)
t0 + 0ms Enqueue for Number: 2 (dur:150)
t0 + 0ms Open for Number : 2 (dur:150) (depth 1)
t0 + 0ms Enqueue for Number: 1 (dur:50)
t0 + 0ms Enqueue for Number: 2 (dur:20)
t0 + 150ms Close for Number : 2 (dur:150) (depth 2)
t0 + 150ms Open for Number : 2 (dur:20) (depth 1)
t0 + 170ms Close for Number : 2 (dur:20) (depth 1)
t0 + 300ms Close for Number : 1 (dur:300) (depth 2)
t0 + 300ms Open for Number : 1 (dur:50) (depth 1)
t0 + 350ms Close for Number : 1 (dur:50) (depth 1)
================
t0 + 400ms Enqueue for Number: 1 (dur:50)
t0 + 400ms Open for Number : 1 (dur:50) (depth 1)
t0 + 400ms Enqueue for Number: 2 (dur:20)
t0 + 400ms Open for Number : 2 (dur:20) (depth 1)
t0 + 400ms Enqueue for Number: 1 (dur:300)
t0 + 400ms Enqueue for Number: 2 (dur:150)
t0 + 420ms Close for Number : 2 (dur:20) (depth 2)
t0 + 420ms Open for Number : 2 (dur:150) (depth 1)
t0 + 450ms Close for Number : 1 (dur:50) (depth 2)
t0 + 450ms Open for Number : 1 (dur:300) (depth 1)
t0 + 570ms Close for Number : 2 (dur:150) (depth 1)
t0 + 750ms Close for Number : 1 (dur:300) (depth 1)