他用Boost::Asio方法async_send_to发送多个缓冲区的最佳方法是什么?
而且整个发送过程可以随时重复。此外,我想确定每个发送过程的(正确)经过时间。

我以这种方式尝试过:

//MainWindow.h

class MainWindow : public QMainWindow
{
    Q_OBJECT
public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();
private slots:
    void on_connectPushButton_clicked();
    void on_asyncSendPushButton_clicked();
private:
    Ui::MainWindow *ui;
    QTime m_Timer;
    int m_BufferSize;
    int m_NumBuffersToSend;
    int m_TransferredBuffers;
    boost::asio::io_service m_IOService;
    std::unique_ptr<boost::asio::ip::udp::socket>     m_pSocket;
    boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
    void handle_send(const boost::system::error_code& error, std::size_t size);
    void stopTimerAndLog();
};

//MainWindow.cpp

#include "MainWindow.h"
#include "ui_MainWindow.h"

//Some Qt includes

#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>

using boost::asio::ip::udp;

MainWindow::MainWindow(QWidget *parent) :
    m_BufferSize(0),
    m_NumBuffersToSend(0),
    m_TransferredBuffers(0),
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);
}

MainWindow::~MainWindow()
{
    delete ui;
}


void MainWindow::on_connectPushButton_clicked()
{
    try
    {
        udp::resolver resolver(m_IOService);
        udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
                ui->serverPortLineEdit->text().toStdString());
        m_ReceiverEndpoint = *resolver.resolve(query);
        m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
        m_pSocket->open(udp::v4());
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

void MainWindow::stopTimerAndLog()
{
    int tmm = m_Timer.elapsed();
    double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
            / ( 1024.0 * 1024.0 * tmm) * 8.0;
    LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
    ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}

void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
    m_TransferredBuffers++;

    if (error)
    {
        //missing error propagation to main thread
        LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
        LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
    }

    if ( m_TransferredBuffers >= m_NumBuffersToSend )
    {
        stopTimerAndLog();
        m_IOService.stop();
    }
}

void MainWindow::on_asyncSendPushButton_clicked()
{
    try
    {
        m_BufferSize = ui->sendBufferSpinBox->value();
        char* data = new char[m_BufferSize];
        memset(data, 0, m_BufferSize);
        m_NumBuffersToSend = ui->numBufferSpinBox->value();

        m_Timer.start();

        for (int i=0; i < m_NumBuffersToSend; i++)
        {
            memset(data, i, m_BufferSize);

            m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
                    m_ReceiverEndpoint,
                    boost::bind(&MainWindow::handle_send, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
        }
        m_TransferredBuffers = 0;
        m_IOService.run();
        delete[] data;
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

如您所见,用户可以单击connect按钮(on_connectPushButton_clicked)。然后,通过单击异步发送按钮(on_asyncSendPushButton_clicked)开始发送过程。在这里,我启动计时器并调用m_NumBuffersToSend和async_send_to方法。然后我运行IOService。对于每个async_send_to,将调用处理程序handle_send,并且m_TransferredBuffers变量将递增,直到达到m_NumBuffersToSend为止。如果是这种情况,我将停止计时器和IOService

但是,如果我将程序中计算的时间与Wireshark实际发送的udp进行比较,则总会有很大的差异。如何获得更准确的时间计算?

是否可以将m_IOService.run();调用放在on_asyncSendPushButton_clicked之外?

最佳答案

好。

我不确定你在观察什么。这是答案



是的,您应该使用io_service::work保持IO服务运行。这是一个演示程序:

Live On Coliru

  • 我创建了一个IO线程来服务异步操作/完成处理程序
  • 我剥离了Qt依赖项; demo Run是随机配置的:
    struct Run {
        std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
        int remainingToSend      = rand()%10 + 1;
        int transferredBuffers   = 0;
        Clock::time_point start  = Clock::now();
    
        void stopTimerAndLog() const;
    };
    
  • 作为奖励,我使用Boost Accumulators
  • 添加了适当的统计信息
  • 而不是在stopTimerAndLog中进行(昂贵的)IO,我们将样本添加到累加器中:
    void stopTimerAndLog()
    {
        using namespace std::chrono;
    
        Clock::duration const elapsed = Clock::now() - start;
        int tmm = duration_cast<microseconds>(elapsed).count();
    
        double mBitPerSecond = tmm
            ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
            : std::numeric_limits<double>::infinity();
    
        std::lock_guard<std::mutex> lk(demo_results::mx);
    
        demo_results::bufsize(buffer.size());
        demo_results::micros(tmm);
        if (tmm)
            demo_results::mbps(mBitPerSecond);
    }
    
  • 您可以重叠运行多个演示运行:
    Demo demo;
    demo.on_connect(argv[1], argv[2]);
    
    for (int i = 0; i<100; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        demo.on_async_testrun();
    }
    // Demo destructor joins IO thread, making sure all stats are final
    
  • 保护统计信息的mutex是多余的,但是GoodPractive(TM)是因为您可能要使用多个IO线程进行测试

  • 输出:
    avg. Buffer size: 613.82, std.dev. 219.789
    avg. b/w:         160.61 mbps, std.dev. 81.061
    avg. time:        153.64 μs, std.dev. 39.0163
    

    完整 list
    #include <boost/asio.hpp>
    #include <boost/array.hpp>
    #include <boost/make_shared.hpp>
    #include <boost/bind.hpp>
    #include <thread>
    #include <mutex>
    #include <chrono>
    #include <memory>
    #include <iostream>
    #include <boost/accumulators/accumulators.hpp>
    #include <boost/accumulators/statistics.hpp>
    
    using boost::asio::ip::udp;
    typedef std::chrono::high_resolution_clock Clock;
    
    namespace demo_results {
        using namespace boost::accumulators;
    
        static std::mutex mx;
        accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
    }
    
    struct Run {
        std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
        int remainingToSend      = rand()%10 + 1;
        int transferredBuffers   = 0;
        Clock::time_point start  = Clock::now();
        Clock::duration elapsed;
    
        void stopTimerAndLog()
        {
            using namespace std::chrono;
    
            Clock::duration const elapsed = Clock::now() - start;
            int tmm = duration_cast<microseconds>(elapsed).count();
    
            double mBitPerSecond = tmm
                ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
                : std::numeric_limits<double>::infinity();
    
            std::lock_guard<std::mutex> lk(demo_results::mx);
    
            demo_results::bufsize(buffer.size());
            demo_results::micros(tmm);
            if (tmm)
                demo_results::mbps(mBitPerSecond);
    
    #if 0
            std::cout << __FUNCTION__ << "  -----------------------------------------------\n";
            std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size()      << "\n";
            std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
            std::cout << __FUNCTION__ << ": " << "Time: "        << tmm                << " μs\n";
            std::cout << __FUNCTION__ << ": " << mBitPerSecond   << " MBit/s\n";
    #endif
        }
    
        typedef boost::shared_ptr<Run> Ptr;
    };
    
    struct Demo {
        boost::asio::io_service                        m_IOService;
        std::unique_ptr<boost::asio::io_service::work> m_work;
        std::unique_ptr<boost::asio::ip::udp::socket>  m_pSocket;
        boost::asio::ip::udp::endpoint                 m_ReceiverEndpoint;
        std::thread                                    m_io_thread;
    
        Demo() :
            m_IOService(),
            m_work(new boost::asio::io_service::work(m_IOService)),
            m_io_thread([this] { m_IOService.run(); })
        {
        }
    
        ~Demo() {
            m_work.reset();
            m_io_thread.join();
        }
    
        void on_connect(std::string const& host, std::string const& port)
        {
            try {
                udp::resolver resolver(m_IOService);
                m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
                m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
                m_pSocket->open(udp::v4());
            }
            catch (std::exception& e)
            {
                std::cerr << e.what() << std::endl;
            }
        }
    
        void perform_run(Run::Ptr state) {
            if (state->remainingToSend) {
                std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);
    
                m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
                        m_ReceiverEndpoint,
                        boost::bind(&Demo::handle_sent, this,
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred,
                            state));
            } else {
                state->stopTimerAndLog();
            }
        }
    
        void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
        {
            assert(actually_transferred == state->buffer.size());
            state->transferredBuffers += 1;
            state->remainingToSend    -= 1;
    
            if (error) {
                // missing error propagation to main thread
                std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
                std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
            }
    
            perform_run(state); // remaining buffers for run
        }
    
        void on_async_testrun() {
            perform_run(boost::make_shared<Run>());
        }
    };
    
    int main(int argc, char const** argv)
    {
        assert(argc==3);
    
        {
            Demo demo;
            demo.on_connect(argv[1], argv[2]);
    
            for (int i = 0; i<100; ++i) {
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
                demo.on_async_testrun();
            }
        } // Demo destructor joins IO thread, making sure all stats are final
    
        using namespace boost::accumulators;
        std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. "      << sqrt(variance(demo_results::bufsize)) << "\n";
        std::cout << "avg. b/w:         " << mean(demo_results::mbps)    << " mbps, std.dev. " << sqrt(variance(demo_results::mbps))    << "\n";
        std::cout << "avg. time:        " << mean(demo_results::micros)  << " μs, std.dev. "   << sqrt(variance(demo_results::micros))  << "\n";
    }
    

    关于c++ - 用Boost::Asio方法async_send_to发送多个缓冲区的最佳方法是什么,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/28594736/

    10-11 22:47
    查看更多