下面的代码旨在执行以下操作:我有一个包装boost asio的解析器对象。解析器对象包含io服务和工作程序,因此io服务运行功能永远不会返回。只要解析器对象处于 Activity 状态,就可以发出异步请求。当解析器对象超出范围并且队列中仍然有请求时,我要完成所有操作,然后解析器对象将被销毁。
在这种情况下,根本没有调用任何处理程序,我也不知道为什么。我认为共享指针和某些依赖周期可能存在问题。使用valgrind
运行会报告“可能丢失了内存”。
有什么想法可以使此工作正常进行,直到所有工作完成后,解析程序对象才能保持 Activity 状态?
#include <boost/asio.hpp>
#include <memory>
#include <thread>
#include <functional>
#include <string>
#include <iostream>
struct Resolver : public std::enable_shared_from_this<Resolver> {
boost::asio::io_service io_service;
std::unique_ptr<boost::asio::io_service::work> work;
std::unique_ptr<std::thread> iothread;
struct Query : public std::enable_shared_from_this<Query>{
std::shared_ptr<Resolver> service;
boost::asio::ip::tcp::resolver resolver;
boost::asio::ip::tcp::resolver::query query;
std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler;
Query(std::shared_ptr<Resolver> res, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler, const std::string &name) : resolver(res->io_service), query(name, ""), handler(handler) {
service = res;
}
void start() {
auto self = shared_from_this();
resolver.async_resolve(query, [self](const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator iterator){
self->handler(iterator);
});
}
};
Resolver() {
work.reset(new boost::asio::io_service::work(io_service));
iothread.reset(new std::thread(std::bind(&Resolver::io, this)));
}
~Resolver() {
std::cout << "Resolver destroyed" << std::endl;
work.reset();
iothread->join();
}
void io() {
io_service.run();
}
void asyncResolve(const std::string &name, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> fn) {
auto query = std::make_shared<Query>(shared_from_this(), fn, name);
query->start();
}
};
void test(boost::asio::ip::tcp::resolver::iterator it) {
std::cout << "Test" << std::endl;
std::cout << it->endpoint().address().to_string() << std::endl;
}
int main(int argc, const char **argv) {
auto res = std::make_shared<Resolver>();
res->asyncResolve("stackoverflow.com", &test);
res->asyncResolve("stackoverflow.com", &test);
res->asyncResolve("stackoverflow.com", &test);
res->asyncResolve("stackoverflow.com", &test);
res->asyncResolve("stackoverflow.com", &test);
}
最佳答案
仅运行服务(io_service::run()
)已经可以确保所有异步操作都已完成(请参阅the documentation)。
您已经在工作线程上执行了此操作,然后加入了该线程,所以应该没问题!
唯一的异常(exception)是如果处理程序抛出,因此要更加精确,您应该处理run()
中的异常:Should the exception thrown by boost::asio::io_service::run() be caught?
void io() {
// http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
for (;;) {
try {
io_service.run();
break; // exited normally
} catch (std::exception const &e) {
std::cerr << "[Resolver] An unexpected error occurred: " << e.what();
} catch (...) {
std::cerr << "[Resolver] An unexpected error occurred";
}
}
}
所以...问题出在哪里?
这个问题很挑剔,并且隐藏在线程和shared_ptr之间。
共享指针使
~Resolver
在辅助线程上运行。这意味着您无法对工作线程进行 join()
(因为线程永远无法加入自身)。一个好的实现将引发异常,从而导致进程终止。还有更多:如果在工作线程正在处理异步任务时仅退出
main()
,则在将像std::cout
这样的全局变量删除后,完成处理程序可能会运行。因此,要真正**看到* Resolver
完成工作并销毁,您需要确保main
不会过快退出。简化:
现在,以下是一个简化的示例,它确实表明异步操作确实完成了:(仍然存在问题):
#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <thread>
#include <iostream>
class Resolver : public std::enable_shared_from_this<Resolver> {
using tcp = boost::asio::ip::tcp;
using io_service = boost::asio::io_service;
io_service _svc;
tcp::resolver resolver { _svc };
boost::optional<io_service::work> work { _svc };
std::thread _worker { [this] { event_loop(); } };
void event_loop() {
// http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
for (;;) {
std::cout << __PRETTY_FUNCTION__ << "\n";
try {
_svc.run();
break; // exited normally
} catch (std::exception const &e) {
std::cerr << "[Resolver] An unexpected error occurred: " << e.what() << "\n";
} catch (...) {
std::cerr << "[Resolver] An unexpected error occurred\n";
}
}
std::cout << "EXIT " << __PRETTY_FUNCTION__ << "\n";
}
public:
~Resolver() {
std::cout << __PRETTY_FUNCTION__ << "\n";
work.reset();
}
using Endpoint = tcp::endpoint;
using Callback = std::function<void(Endpoint)>;
void asyncResolve(std::string const& name, Callback fn) {
auto self = shared_from_this();
resolver.async_resolve({name, ""}, [self,fn](boost::system::error_code ec, tcp::resolver::iterator it) {
if (!ec) fn(it->endpoint());
});
}
};
void test_handler(Resolver::Endpoint ep) {
std::cout << "Test: " << ep << "\n";
}
int main() {
{
auto res = std::make_shared<Resolver>();
for (auto fqdn : {"stackoverflow.com", "google.com", "localhost"})
res->asyncResolve(fqdn, test_handler);
}
std::cout << "Released shared resolver\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Main exit\n";
}
打印品:
void Resolver::event_loop()
Released shared resolver
Test: 151.101.65.69:0
Test: 172.217.17.46:0
Test: 127.0.0.1:0
Resolver::~Resolver()
terminate called without an active exception
处理程序跟踪:
剩下的问题
最终的问题是,现在我们不加入线程。这是从
std::thread::~thread
析构函数抛出的。这是一个棘手的问题:join()
,因为我们可能在上是,而该工作线程detach()
,因为这将创建一个数据争用,在析构函数完成后,工作线程仍在运行。 选项包括:
_svc::run()
,而不是join()
-线程。这是可行的,但是如果服务用于更多异步任务,则不合适,因为这样做的副作用是排队的操作可能在导致析构函数运行的线程上运行。 join()
;如果不是,则调用run()
。这始终是安全的,因为可以将run()
称为嵌套,并且操作仍按预期的方式从工作线程运行join
并使用error_condition system_error
resource_deadlock_would_occur
异常我想说第二个是最干净的。但是在您的简单示例中,第一个选项没有问题,因为(a)如果存在现成的解析操作,则析构函数将始终从工作线程运行;(b)如果没有,则服务队列必须为空,因此
run()
实际上不会执行任何操作。因此,这里有一个解决方法:
~Resolver() {
std::cout << __PRETTY_FUNCTION__ << "\n";
work.reset();
event_loop();
if (_worker.joinable()) {
if (_worker.get_id() == std::this_thread::get_id())
_worker.detach();
else
_worker.join();
}
}
现在的输出是
void Resolver::event_loop()
Released shared resolver
Test: 151.101.193.69:0
Test: 216.58.212.238:0
Test: 127.0.0.1:0
Resolver::~Resolver()
void Resolver::event_loop()
Main exit