问题描述
我在掌握如何正确处理从以多线程方式使用Boost Asio的多线程程序创建子进程方面遇到一些麻烦.
如果我理解正确,那么在Unix世界中启动子进程的方法是先调用fork()
,再调用exec*()
.另外,如果我理解正确,则调用fork()
将复制所有文件描述符,依此类推,除非标记为FD_CLOEXEC
,否则需要在子进程中将它们关闭 (从而在调用时被原子关闭) exec*()
).
在调用fork()
时需要通知Boost Asio,以便通过调用.但是,在多线程程序中,这会产生几个问题:
- 如果我理解正确的话,
-
套接字在默认情况下是由子进程继承的.可以将它们设置为
SOCK_CLOEXEC
-,但不能直接在创建时设置 *,因此,如果正在从另一个线程创建子进程,则会导致出现计时窗口. -
notify_fork()
要求没有其他线程调用任何其他io_service
函数,也不需要在与io_service
关联的任何其他I/O对象上调用任何函数.这似乎并不可行-毕竟程序是多线程的,这是有原因的. -
如果我理解正确,在
fork()
和exec*()
之间进行的任何函数调用都必须是异步信号安全的(请参见).没有关于notify_fork()
调用是异步信号安全的文档.实际上,如果我查看Boost Asio的源代码(至少在1.54版中),可能会调用,如果我正确理解的话,这是不是异步信号安全的方法(请参阅,也有其他电话不在白名单中.)
问题#1我可能可以通过将子进程和套接字+文件的创建分开来解决,以便确保在正在创建的套接字和设置SOCK_CLOEXEC
之间的窗口中没有子进程正在创建.问题2比较棘手,我可能需要确保所有 all asio处理程序线程都已停止,进行fork,然后再次重新创建它们,这在最佳情况下是很潮的,在最坏的情况下真的很糟糕(那我的待定计时器又如何呢?).问题#3似乎使正确使用它完全不可能.
如何与fork()
+ exec*()
一起在多线程程序中正确使用Boost Asio?...或者我是分叉"的?
请让我知道我是否对任何基本概念有误解(我是在Windows编程中长大的,而不是* nix ...).
*-实际上,可以在Linux上直接设置SOCK_CLOEXEC
来创建套接字,此套接字自2.6.27开始可用(请参见 socket()
文档).在Windows上,从Windows 7 SP 1/Windows Server 2008 R2 SP 1开始,相应的标记WSA_FLAG_NO_HANDLE_INHERIT
可用(请参阅). OS X似乎不支持此功能.
在多线程程序中, io_service::notify_fork()
在子级中调用不安全.但是,Boost.Asio希望根据 fork()
支持,因为这是子级关闭父级以前的内部文件描述符并创建新的描述符时的状态.尽管Boost.Asio明确列出了调用io_service::notify_fork()
的前提条件,并保证了fork()
期间其内部组件的状态,但对实现表示std::vector::push_back()
可以从免费存储区中分配内存,并且不能保证分配是异步信号安全的.
fork()
仍然是单线程的进程.子进程将保持单线程状态,并在父进程通过进程间通信指示执行fork()
和exec()
时执行.这种分离通过消除执行fork()
和exec()
时管理多个线程状态的需要,简化了问题.这是一个演示此方法的完整示例,其中多线程服务器将通过UDP接收文件名,并且子进程将执行fork()
和exec()
以对文件名运行/usr/bin/touch
.为了使示例更具可读性,我选择使用堆栈式协同程序.
#include <unistd.h> // execl, fork
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
/// @brief launcher receives a command from inter-process communication,
/// and will then fork, allowing the child process to return to
/// the caller.
class launcher
{
public:
launcher(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
std::string& command)
: io_service_(io_service),
socket_(socket),
command_(command)
{}
void operator()(boost::asio::yield_context yield)
{
std::vector<char> buffer;
while (command_.empty())
{
// Wait for server to write data.
std::cout << "launcher is waiting for data" << std::endl;
socket_.async_receive(boost::asio::null_buffers(), yield);
// Resize buffer and read all data.
buffer.resize(socket_.available());
socket_.receive(boost::asio::buffer(buffer));
io_service_.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service_.notify_fork(boost::asio::io_service::fork_child);
command_.assign(buffer.begin(), buffer.end());
}
else // parent
{
io_service_.notify_fork(boost::asio::io_service::fork_parent);
}
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& socket_;
std::string& command_;
};
using boost::asio::ip::udp;
/// @brief server reads filenames from UDP and then uses
/// inter-process communication to delegate forking and exec
/// to the child launcher process.
class server
{
public:
server(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
short port)
: io_service_(io_service),
launcher_socket_(socket),
socket_(boost::make_shared<udp::socket>(
boost::ref(io_service), udp::endpoint(udp::v4(), port)))
{}
void operator()(boost::asio::yield_context yield)
{
udp::endpoint sender_endpoint;
std::vector<char> buffer;
for (;;)
{
std::cout << "server is waiting for data" << std::endl;
// Wait for data to become available.
socket_->async_receive_from(boost::asio::null_buffers(),
sender_endpoint, yield);
// Resize buffer and read all data.
buffer.resize(socket_->available());
socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint);
std::cout << "server got data: ";
std::cout.write(&buffer[0], buffer.size());
std::cout << std::endl;
// Write filename to launcher.
launcher_socket_.async_send(boost::asio::buffer(buffer), yield);
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& launcher_socket_;
// To be used as a coroutine, server must be copyable, so make socket_
// copyable.
boost::shared_ptr<udp::socket> socket_;
};
int main(int argc, char* argv[])
{
std::string filename;
// Try/catch provides exception handling, but also allows for the lifetime
// of the io_service and its IO objects to be controlled.
try
{
if (argc != 2)
{
std::cerr << "Usage: <port>\n";
return 1;
}
boost::thread_group threads;
boost::asio::io_service io_service;
// Create two connected sockets for inter-process communication.
boost::asio::local::datagram_protocol::socket parent_socket(io_service);
boost::asio::local::datagram_protocol::socket child_socket(io_service);
boost::asio::local::connect_pair(parent_socket, child_socket);
io_service.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service.notify_fork(boost::asio::io_service::fork_child);
parent_socket.close();
boost::asio::spawn(io_service,
launcher(io_service, child_socket, filename));
}
else // parent
{
io_service.notify_fork(boost::asio::io_service::fork_parent);
child_socket.close();
boost::asio::spawn(io_service,
server(io_service, parent_socket, std::atoi(argv[1])));
// Spawn additional threads.
for (std::size_t i = 0; i < 3; ++i)
{
threads.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service));
}
}
io_service.run();
threads.join_all();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
// Now that the io_service and IO objects have been destroyed, all internal
// Boost.Asio file descriptors have been closed, so the execl should be
// in a clean state. If the filename has been set, then exec touch.
if (!filename.empty())
{
std::cout << "creating file: " << filename << std::endl;
execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0));
}
}
端子1:
$ ls a.out example.cpp $ ./a.out 12345 server is waiting for data launcher is waiting for data server got data: a server is waiting for data launcher is waiting for data creating file: a server got data: b server is waiting for data launcher is waiting for data creating file: b server got data: c server is waiting for data launcher is waiting for data creating file: c $ ls a a.out b c example.cpp
终端2:
$ nc -u 127.0.0.1 12345 abc
I'm having some trouble grasping how to correctly handle creating a child process from a multithreaded program that uses Boost Asio in a multithreaded fashion.
If I understand correctly, the way to launch a child process in the Unix world is to call fork()
followed by an exec*()
. Also, if I understand correctly, calling fork()
will duplicate all file descriptors and so on and these need to be closed in the child process unless marked as FD_CLOEXEC
(and thereby being atomically closed when calling exec*()
).
Boost Asio requires to be notified when fork()
is called in order to operate correctly by calling notify_fork()
. However, in a multithreaded program this creates several issues:
Sockets are by default inherited by child processes if I understand correctly. They can be set to
SOCK_CLOEXEC
- but not directly at creation*, thus leading to a timing window if a child process is being created from another thread.notify_fork()
requires that no other thread calls any otherio_service
function, nor any function on any other I/O object associated with theio_service
. This does not really seem to be feasible - after all the program is multithreaded for a reason.If I understand correctly, any function call made between
fork()
andexec*()
needs to be async signal safe (seefork()
documentation). There is no documentation of thenotify_fork()
call being async signal safe. In fact, if I look at the source code for Boost Asio (at least in version 1.54), there may be calls to pthread_mutex_lock, which is not async signal safe if I understand correctly (see Signal Concepts, there are also other calls being made that are not on the white list).
Issue #1 I can probably work around by separating creation of child processes and sockets + files so that I can ensure that no child process is being created in the window between a socket being created and setting SOCK_CLOEXEC
. Issue #2 is trickier, I would probably need to make sure that all asio handler threads are stopped, do the fork and then recreate them again, which is tideous at best, and really really bad at worst (what about my pending timers??). Issue #3 seems to make it entirely impossible to use this correctly.
How do I correctly use Boost Asio in a multithreaded program together with fork()
+ exec*()
?... or am I "forked"?
Please let me know if I have misunderstood any fundamental concepts (I am raised on Windows programming, not *nix...).
Edit:* - Actually it is possible to create sockets with SOCK_CLOEXEC
set directly on Linux, available since 2.6.27 (see socket()
documentation). On Windows, the corresponding flag WSA_FLAG_NO_HANDLE_INHERIT
is available since Windows 7 SP 1 / Windows Server 2008 R2 SP 1 (see WSASocket()
documentation). OS X does not seem to support this though.
In a multi-threaded program, io_service::notify_fork()
is not safe to invoke in the child. Yet, Boost.Asio expects it to be called based on the fork()
support, as this is when the child closes the parent's previous internal file descriptors and creates new ones. While Boost.Asio explicitly list the pre-conditions for invoking io_service::notify_fork()
, guaranteeing the state of its internal components during the fork()
, a brief glance at the implementation indicates that std::vector::push_back()
may allocate memory from the free store, and the allocation is not guaranteed to be async-signal-safe.
With that said, one solution that may be worth considering is fork()
the process when it is still single threaded. The child process will remain single threaded and perform fork()
and exec()
when it is told to do so by the parent process via inter-process communication. This separation simplifies the problem by removing the need to manage the state of multiple threads while performing fork()
and exec()
.
Here is a complete example demonstrating this approach, where the multi-threaded server will receive filenames via UDP and a child process will perform fork()
and exec()
to run /usr/bin/touch
on the filename. In hopes of making the example slightly more readable, I have opted to use stackful coroutines.
#include <unistd.h> // execl, fork
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
/// @brief launcher receives a command from inter-process communication,
/// and will then fork, allowing the child process to return to
/// the caller.
class launcher
{
public:
launcher(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
std::string& command)
: io_service_(io_service),
socket_(socket),
command_(command)
{}
void operator()(boost::asio::yield_context yield)
{
std::vector<char> buffer;
while (command_.empty())
{
// Wait for server to write data.
std::cout << "launcher is waiting for data" << std::endl;
socket_.async_receive(boost::asio::null_buffers(), yield);
// Resize buffer and read all data.
buffer.resize(socket_.available());
socket_.receive(boost::asio::buffer(buffer));
io_service_.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service_.notify_fork(boost::asio::io_service::fork_child);
command_.assign(buffer.begin(), buffer.end());
}
else // parent
{
io_service_.notify_fork(boost::asio::io_service::fork_parent);
}
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& socket_;
std::string& command_;
};
using boost::asio::ip::udp;
/// @brief server reads filenames from UDP and then uses
/// inter-process communication to delegate forking and exec
/// to the child launcher process.
class server
{
public:
server(boost::asio::io_service& io_service,
boost::asio::local::datagram_protocol::socket& socket,
short port)
: io_service_(io_service),
launcher_socket_(socket),
socket_(boost::make_shared<udp::socket>(
boost::ref(io_service), udp::endpoint(udp::v4(), port)))
{}
void operator()(boost::asio::yield_context yield)
{
udp::endpoint sender_endpoint;
std::vector<char> buffer;
for (;;)
{
std::cout << "server is waiting for data" << std::endl;
// Wait for data to become available.
socket_->async_receive_from(boost::asio::null_buffers(),
sender_endpoint, yield);
// Resize buffer and read all data.
buffer.resize(socket_->available());
socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint);
std::cout << "server got data: ";
std::cout.write(&buffer[0], buffer.size());
std::cout << std::endl;
// Write filename to launcher.
launcher_socket_.async_send(boost::asio::buffer(buffer), yield);
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::local::datagram_protocol::socket& launcher_socket_;
// To be used as a coroutine, server must be copyable, so make socket_
// copyable.
boost::shared_ptr<udp::socket> socket_;
};
int main(int argc, char* argv[])
{
std::string filename;
// Try/catch provides exception handling, but also allows for the lifetime
// of the io_service and its IO objects to be controlled.
try
{
if (argc != 2)
{
std::cerr << "Usage: <port>\n";
return 1;
}
boost::thread_group threads;
boost::asio::io_service io_service;
// Create two connected sockets for inter-process communication.
boost::asio::local::datagram_protocol::socket parent_socket(io_service);
boost::asio::local::datagram_protocol::socket child_socket(io_service);
boost::asio::local::connect_pair(parent_socket, child_socket);
io_service.notify_fork(boost::asio::io_service::fork_prepare);
if (fork() == 0) // child
{
io_service.notify_fork(boost::asio::io_service::fork_child);
parent_socket.close();
boost::asio::spawn(io_service,
launcher(io_service, child_socket, filename));
}
else // parent
{
io_service.notify_fork(boost::asio::io_service::fork_parent);
child_socket.close();
boost::asio::spawn(io_service,
server(io_service, parent_socket, std::atoi(argv[1])));
// Spawn additional threads.
for (std::size_t i = 0; i < 3; ++i)
{
threads.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service));
}
}
io_service.run();
threads.join_all();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
// Now that the io_service and IO objects have been destroyed, all internal
// Boost.Asio file descriptors have been closed, so the execl should be
// in a clean state. If the filename has been set, then exec touch.
if (!filename.empty())
{
std::cout << "creating file: " << filename << std::endl;
execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0));
}
}
Terminal 1:
$ ls a.out example.cpp $ ./a.out 12345 server is waiting for data launcher is waiting for data server got data: a server is waiting for data launcher is waiting for data creating file: a server got data: b server is waiting for data launcher is waiting for data creating file: b server got data: c server is waiting for data launcher is waiting for data creating file: c $ ls a a.out b c example.cpp
Terminal 2:
$ nc -u 127.0.0.1 12345 abc
这篇关于如何在多线程程序中使用boost :: asio正确处理fork()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!