我有一个问题,我似乎无法解决自己。我有一个在while循环中计算数据的Process1。此过程必须尽快执行。我需要在Process1中计算出的数据用于以后的分析,并且写入文件的速度很慢。

我从未使用过IPC,但是我认为这是一种将Process1中的数据存储在内存中并从另一个非时间紧迫的Process2(独立程序)访问数据并将日期写入文件的好方法。

我创建了我的小测试程序(以了解IPC),因此:

  • 即使无法访问Process2,Process1也会运行-然后它将跳过IPC并仅执行
  • 运行Process2时,它将等待Process1-如果Process1启动,则获取数据,然后再写入磁盘。
  • Process2将仅在以下10个样本中获取x量的数据(maxRunTime)。

  • 我创建的当前程序速度太慢了,通过IPC发送消息时速度慢了6倍。目前,我在每个“TimeStep”处仅传递三个浮点数,但是可能是100。运行时可能是10.000。

    要做的事情:
    如果有人可以引导我朝正确的方向前进,我将感到高兴。下面的代码正在工作,因为它不漂亮,可能会很幸运。

    我需要找到一个尽可能快的解决方案,但不必是实时的。由于我不是专业程序员,因此我还需要妥协其复杂性,因为我需要了解自己在做什么。

    希望有人能帮忙。

    代码:
  • 使用Boost.1.59和MSVC 11.0_x86
  • 两个单独的程序-ConsoleApps

  • 流程1:
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <boost/date_time.hpp>
    #include <iostream>
    #include <vector>
    #include <windows.h>
    #include <string>
    #include <ctime>
    #include <iostream>
    #include <fstream>
    #include <map>
    #include <stdio.h>
    #include <conio.h>
    #include <tchar.h>
    #include <time.h>
    
    
    #pragma comment(lib, "user32.lib")
    
    using namespace std;
    using namespace boost::interprocess;
    using namespace boost::posix_time;
    using boost::posix_time::microsec_clock;
    
    
    bool InitCreateMsgQ()
    {
        bool initOK = false;
        //Create a msgQ for parsing data
        try
        {
            message_queue::remove("msgQData");
            //Create a message_queue.
            message_queue mqData
            (open_or_create     //create q
            ,"msgQData"         //name
            ,1000000                //max message number
            ,sizeof(float)      //max message size
            );
            initOK = true;
        }
        catch(interprocess_exception &ex)
        {
            return false;
        }
    //Create State
        try
        {
            message_queue::remove("msgState");
            //Create a message_queue.
            message_queue mqState
            (open_or_create     //create q
            ,"msgState"     //name
            ,1                  //max message number
            ,sizeof(int)        //max message size
            );
            initOK = true;
        }
        catch(interprocess_exception &ex)
        {
            return false;
        }
        return initOK;
    }
    bool SetState(int state)
    {
        bool timeout = true;
        try
        {
            //Open a message queue.
            message_queue mqState
            (open_only       //only oepn q
            ,"msgState"  //name
            );
    
            timeout = !mqState.timed_send(&state, sizeof(int), 0,
                                            ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
        }
        catch(interprocess_exception &ex)
        {
            message_queue::remove("msgState");
            timeout = true;
        }
        return timeout;
    }
    bool SetData(float data)
    {
        bool timeout = true;
        try
        {
            //Open a message queue.
            message_queue mqData
            (open_only       //only oepn q
            ,"msgQData"  //name
            );
    
            timeout = !mqData.timed_send(&data, sizeof(float), 0,
                                            ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
            //mqData.send(&data, sizeof(float), 0);
        }
        catch(interprocess_exception &ex)
        {
            message_queue::remove("msgQData");
            timeout = true;
        }
        return timeout;
    }
    
    int main ()
    {
        time_t start,end;
    
        int runTime = 0; //just for testing
        int dummyState = 2;
        float x;
        int state = 0;
        if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
        if (SetState(state)){state = 0;}// If timeout to set state go to state 0
        //Do twice to get error if observer is not started
        if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
                                             // If timeout to set state go to state 0
    
        time (&start);
        //Runtime!
        while(runTime<1000)
        {
            switch (state)
            {
                case 0:
                    state = 0;//force next state 0 - should not be needed
                    //Do nothing and break loop if monitor tool is not ready
                    break;
                case 1:
                    state = 1;
                    cout << "Try SEND DATA" << endl;
                    for (int i = 0; i < 3; i++)
                    {
                        x = rand() % 100;
                        if (SetData(x)){state = 0;}
                    }
                    break;
                default:
                    break;
            }
            runTime++;
            cout << "runTime: " << runTime <<" state: " << state << endl;
        }
    
        message_queue::remove("msgQData");
        message_queue::remove("msgState");
        cout << "done - state: " << state << endl;
    
        time (&end);
        double dif = difftime (end,start);
        printf ("Elasped time is %.2lf seconds.", dif );
    
        getchar();
    }
    

    Process2:
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <boost/date_time.hpp>
    #include <iostream>
    #include <vector>
    #include <windows.h>
    #include <string>
    #include <ctime>
    #include <iostream>
    #include <fstream>
    #include <map>
    #include <stdio.h>
    #include <conio.h>
    #include <tchar.h>
    #include <time.h>
    
    
    #pragma comment(lib, "user32.lib")
    
    using namespace std;
    using namespace boost::interprocess;
    using namespace boost::posix_time;
    using boost::posix_time::microsec_clock;
    
    ofstream debugOut;      // Output file for debug    (DEBUG)
    
    int getState()
    {
        int state = 0;
        bool timeout = true;
        try
        {
            //Open a message queue.
            message_queue mqState
            (open_only       //only oepn q
            ,"msgState"  //name
            );
    
            unsigned int priority;
            message_queue::size_type recvd_size;
    
            timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);
        }
        catch(interprocess_exception &ex)
        {
            timeout = true;
        }
    
        if(timeout){state = 0;}
    
        return state;
    }
    float getData()
    {
        float Data = -123456;
        bool timeout = true;
        try
        {
            //Open a message queue.
            message_queue mqData
            (open_only       //only oepn q
            ,"msgQData"  //name
            );
    
            unsigned int priority;
            message_queue::size_type recvd_size;
    
            //Receive the data
            //mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
            timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
                                            ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
        }
        catch(interprocess_exception &ex)
        {
            timeout = true;
        }
    
        if(timeout){Data = -123456;}
    
        return Data;
    }
    
    int main ()
    {
        int state = 0;
        int maxRunTime = 10;
        float Data;
        float DataArray[100000];
    
        debugOut.open("IPCWriteTest.txt", std::ios::trunc);
        debugOut.close();
    
        while(true)
        {
            switch (state)
            {
                case 0:
                    //Do nothing - data not ready state
                    if(getState() == 1)
                    {
                        state = 1;
                        cout << "State: 1" <<endl;
                    } //If all msQ ok set state 1
                    else{state = 0;}
                    break;
                case 1:
                    for (int runTime = 0; runTime < maxRunTime; runTime++)
                    {
                        cout << "runTime: " << runTime << " Data: ";
                        for (int i = 0; i < 3; i++)
                        {
                            Data = getData();
                            cout << Data << "   ";
                            DataArray[runTime]=Data;
                        }
                        cout << endl;
                    }
    
                    debugOut.open("IPCWriteTest.txt", std::ios::app);
                    for (int runTime = 0; runTime < maxRunTime; runTime++)
                    {
                        debugOut << "runTime: " << runTime << " Data: ";
                        for (int i = 0; i < 3; i++)
                        {
                            debugOut << DataArray[runTime] << " ";
    
                        }
                        debugOut << endl;
                    }
                    debugOut.close();
                    state = 0;
                    break;
                default:
                    break;
            }
        }
    
        std::cout << "done" << endl;
        getchar();
    }
    

    最佳答案

    您正在为每个操作打开队列。

    您应该尝试打开一次并传递对所有相关代码的引用(通常您会将其存储为类中的成员)。

    同样,有单独的队列也是减慢速度的秘诀。在我看来,您是在将mqState“滥用”为interprocess::condition_variable或信号量:

  • http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.conditions
  • http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.semaphores

  • 像这样将异常转换为乏味的错误代码无论如何不是很有效。您正在手动执行异常处理应执行的操作。

    另外,您将调试消息跟踪到标准输出这一事实将使大大降低的速度,从而降低程序的速度,尤其是,尤其是Windows上的

    观察者须知

    同样的事情,对于debugOutput文件也可能不应该连续重新打开。

    被三重“硬循环”是很奇怪的。如果是队列,则一次只弹出1条消息。如果消息“逻辑上”由三个 float 组成,请发送包含三个 float 的消息。现在,我什至认为这是一个错误:
                for (int i = 0; i < 3; i++) {
                    data = getData();
                    std::cout << data << "   ";
                    DataArray[runTime] = data;
                }
    

    它将三个不同的值分配给同一索引(runTime)...

    简化代码

    我“查看”(清理后)的生产者代码:

    On Coliru
    #include <boost/date_time.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <fstream>
    #include <algorithm>
    #include <iterator>
    #include <iostream>
    #include <map>
    #include <string>
    #include <vector>
    
    namespace bip = boost::interprocess;
    namespace pt  = boost::posix_time;
    
    struct QueueLogic {
    
        bool forced_remove = bip::message_queue::remove("msgQData");
        bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };
    
        bool SetData(float data) {
            return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
        }
    };
    
    #include <boost/chrono.hpp>
    #include <boost/chrono/chrono_io.hpp>
    using Clock = boost::chrono::high_resolution_clock;
    
    int main() {
        std::vector<float> pre_calculated;
        std::generate_n(back_inserter(pre_calculated), 10000*100, [] { return rand()%100; });
    
        auto start = Clock::now();
    
        try {
            QueueLogic instance;
    
            for (auto v : pre_calculated)
                instance.SetData(v);
    
        } catch(std::exception const& e) {
            std::cout << "Exception thrown: " << e.what() << "\n";
            bip::message_queue::remove("msgQData");
            throw;
        }
    
        auto end = Clock::now();
        std::cout << boost::chrono::duration_cast<boost::chrono::milliseconds>(end-start) << "\n";
    }
    

    消费者代码:

    On Coliru
    #include <iostream>
    #include <fstream>
    #include <vector>
    
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <boost/date_time.hpp>
    
    using namespace std;
    namespace bip = boost::interprocess;
    namespace pt  = boost::posix_time;
    
    #include <boost/chrono.hpp>
    #include <boost/chrono/chrono_io.hpp>
    using Clock = boost::chrono::high_resolution_clock;
    
    struct ObserverLogic {
    
        bip::message_queue mqData{bip::open_only, "msgQData"};
    
        float getData() {
            float data;
            bip::message_queue::size_type recvd_size;
            unsigned int priority;
            if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
                                      pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(10)))
            {
                throw std::runtime_error("timeout in timed_receive");
            }
    
            return data;
        }
    };
    
    int main() {
        std::vector<float> DataArray;
        DataArray.reserve(100000);
    
        ObserverLogic instance;
    
        try {
            while (DataArray.size() <= 100000) {
                DataArray.push_back(instance.getData());
            }
        } catch (std::exception const &e) {
            std::cout << "Exception caught: " << e.what() << "\n";
        }
    
        std::cout << "Received " << DataArray.size() << " messages\n";
        std::copy(DataArray.begin(), DataArray.end(), std::ostream_iterator<float>(std::cout, "; "));
    
        std::cout << "\n\ndone" << std::endl;
    }
    

    注释

    Live1-Coliru上不允许共享内存

    08-06 12:43