他用Boost::Asio方法async_send_to
发送多个缓冲区的最佳方法是什么?
而且整个发送过程可以随时重复。此外,我想确定每个发送过程的(正确)经过时间。
我以这种方式尝试过:
//MainWindow.h
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = 0);
~MainWindow();
private slots:
void on_connectPushButton_clicked();
void on_asyncSendPushButton_clicked();
private:
Ui::MainWindow *ui;
QTime m_Timer;
int m_BufferSize;
int m_NumBuffersToSend;
int m_TransferredBuffers;
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
void handle_send(const boost::system::error_code& error, std::size_t size);
void stopTimerAndLog();
};
//MainWindow.cpp
#include "MainWindow.h"
#include "ui_MainWindow.h"
//Some Qt includes
#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>
using boost::asio::ip::udp;
MainWindow::MainWindow(QWidget *parent) :
m_BufferSize(0),
m_NumBuffersToSend(0),
m_TransferredBuffers(0),
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
}
MainWindow::~MainWindow()
{
delete ui;
}
void MainWindow::on_connectPushButton_clicked()
{
try
{
udp::resolver resolver(m_IOService);
udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
ui->serverPortLineEdit->text().toStdString());
m_ReceiverEndpoint = *resolver.resolve(query);
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void MainWindow::stopTimerAndLog()
{
int tmm = m_Timer.elapsed();
double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
/ ( 1024.0 * 1024.0 * tmm) * 8.0;
LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}
void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
m_TransferredBuffers++;
if (error)
{
//missing error propagation to main thread
LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
}
if ( m_TransferredBuffers >= m_NumBuffersToSend )
{
stopTimerAndLog();
m_IOService.stop();
}
}
void MainWindow::on_asyncSendPushButton_clicked()
{
try
{
m_BufferSize = ui->sendBufferSpinBox->value();
char* data = new char[m_BufferSize];
memset(data, 0, m_BufferSize);
m_NumBuffersToSend = ui->numBufferSpinBox->value();
m_Timer.start();
for (int i=0; i < m_NumBuffersToSend; i++)
{
memset(data, i, m_BufferSize);
m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
m_ReceiverEndpoint,
boost::bind(&MainWindow::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
m_TransferredBuffers = 0;
m_IOService.run();
delete[] data;
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
如您所见,用户可以单击connect按钮(
on_connectPushButton_clicked
)。然后,通过单击异步发送按钮(on_asyncSendPushButton_clicked
)开始发送过程。在这里,我启动计时器并调用m_NumBuffersToSend
和async_send_to方法。然后我运行IOService
。对于每个async_send_to
,将调用处理程序handle_send
,并且m_TransferredBuffers
变量将递增,直到达到m_NumBuffersToSend
为止。如果是这种情况,我将停止计时器和IOService
。但是,如果我将程序中计算的时间与Wireshark实际发送的udp进行比较,则总会有很大的差异。如何获得更准确的时间计算?
是否可以将
m_IOService.run();
调用放在on_asyncSendPushButton_clicked
之外? 最佳答案
好。
我不确定你在观察什么。这是答案
是的,您应该使用io_service::work
保持IO服务运行。这是一个演示程序:
Live On Coliru
Run
是随机配置的:struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
void stopTimerAndLog() const;
};
stopTimerAndLog
中进行(昂贵的)IO,我们将样本添加到累加器中:void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
}
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
// Demo destructor joins IO thread, making sure all stats are final
mutex
是多余的,但是GoodPractive(TM)是因为您可能要使用多个IO线程进行测试输出:
avg. Buffer size: 613.82, std.dev. 219.789
avg. b/w: 160.61 mbps, std.dev. 81.061
avg. time: 153.64 μs, std.dev. 39.0163
完整 list
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
#include <iostream>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>
using boost::asio::ip::udp;
typedef std::chrono::high_resolution_clock Clock;
namespace demo_results {
using namespace boost::accumulators;
static std::mutex mx;
accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
}
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
Clock::duration elapsed;
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
#if 0
std::cout << __FUNCTION__ << " -----------------------------------------------\n";
std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size() << "\n";
std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
std::cout << __FUNCTION__ << ": " << "Time: " << tmm << " μs\n";
std::cout << __FUNCTION__ << ": " << mBitPerSecond << " MBit/s\n";
#endif
}
typedef boost::shared_ptr<Run> Ptr;
};
struct Demo {
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::io_service::work> m_work;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
std::thread m_io_thread;
Demo() :
m_IOService(),
m_work(new boost::asio::io_service::work(m_IOService)),
m_io_thread([this] { m_IOService.run(); })
{
}
~Demo() {
m_work.reset();
m_io_thread.join();
}
void on_connect(std::string const& host, std::string const& port)
{
try {
udp::resolver resolver(m_IOService);
m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void perform_run(Run::Ptr state) {
if (state->remainingToSend) {
std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
} else {
state->stopTimerAndLog();
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
state->remainingToSend -= 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
perform_run(state); // remaining buffers for run
}
void on_async_testrun() {
perform_run(boost::make_shared<Run>());
}
};
int main(int argc, char const** argv)
{
assert(argc==3);
{
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
} // Demo destructor joins IO thread, making sure all stats are final
using namespace boost::accumulators;
std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. " << sqrt(variance(demo_results::bufsize)) << "\n";
std::cout << "avg. b/w: " << mean(demo_results::mbps) << " mbps, std.dev. " << sqrt(variance(demo_results::mbps)) << "\n";
std::cout << "avg. time: " << mean(demo_results::micros) << " μs, std.dev. " << sqrt(variance(demo_results::micros)) << "\n";
}
关于c++ - 用Boost::Asio方法async_send_to发送多个缓冲区的最佳方法是什么,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/28594736/