说明:
当我们有多个线程以不同的速度运行并且我们想要以特定的顺序从一个线程向另一个线程发送信息时,消息队列可能会有用。
这个想法是,发送线程将消息推送到队列中,而接收线程将消息按自己的步调弹出。 只要发送线程平均发送的消息不超过接收线程可以处理的数量,此系统就可以工作。 因为队列充当缓冲区,所以消息可能会突发发送和弹出,换句话说:只要一段时间内的平均发送速度低于接收者的容量,流量就会达到峰值。
例程:
该示例显示了一个输入线程,该线程从控制台(cin)读取数据并将其推送到消息队列中。 另一个线程(接收方)检查队列大小,如果该大小不为零,则将消息弹出队列,并像处理该消息一样工作。
打开Qt Creator,新建控制台应用程序,选择MingW构建组件
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | # ifndef MSGQUEUE_H # define MSGQUEUE_H # include < iostream > # include < cstdlib > # include < unistd.h > // usleep # include < fcntl.h > // threads # include < pthread.h > # include < string > // messages # include < queue > // the message queue using namespace std; pthread_mutex_t msgmutex = PTHREAD_MUTEX_INITIALIZER; queue < string > msgq; void *msgreceiver(void *arg) { long qsize; string nextmsg; while (true) { if (msgq.empty()) { usleep(); // sleep 0.01 sec before trying again continue; } // we end up here because there was something in the msg queue pthread_mutex_lock( & msgmutex); qsize = msgq.size(); ) cout << "Queue size: " << qsize << endl; nextmsg = msgq.front(); // get next message in queue msgq.pop(); // remove it from the queue pthread_mutex_unlock( & msgmutex); cout << "Processing value " << nextmsg << endl; usleep(); } pthread_exit((); } // msgreceiver() void *msgtransmitter(void *arg) { string nextmsg; while (true) { cin >> nextmsg; pthread_mutex_lock( & msgmutex); msgq.push(nextmsg); // push message onto the queue pthread_mutex_unlock( & msgmutex); } pthread_exit((); } // msgtransmitter() int test_msgqueue() { pthread_t thr; // Create threads if (pthread_create( & thr, NULL, msgreceiver, NULL) || pthread_create( & thr, NULL, msgtransmitter, NULL)) { cout << " cannot make thread\n"; exit(); } /* * At this point the main thread can perform its actions or end */ cout << "** Main thread ends **\n"; pthread_exit((); } # endif // MSGQUEUE_H |
测试截图: