1. C++ queue 队列基本用法

在C++中,queue是一个模板类,用于实现队列数据结构,遵循先进先出的原则。

♦ 常用方法: ·

queue<int> Q;            //定义一个int型队列
Q.empty();               //返回队列是否为空
Q.size();                //返回当前队列长度
Q.front();               //返回当前队列的第一个元素
Q.back();              	 //返回当前队列的最后一个元素
Q.push();                //在队列后面插入一个元素, 比如插入数字5: Q.push(5)
Q.pop();                 //从当前队列里,移出第一个元素

♦ 简单使用: ·

#include <iostream>
#include <queue>

using namespace std;

int main()
{
    // 创建一个queue对象
    queue<int> Q;

    // 向队列中添加元素
    Q.push(1);
    Q.push(2);
    Q.push(3);

    cout<<"queue empty?  "<<Q.empty()<<endl;
    cout<<"queue size:   "<<Q.size()<<endl;

    // 从队列中移除元素,并输出
    while (!Q.empty()) {
        int value = Q.front();
        Q.pop();
        cout << "Dequeued:" << value << endl;
    }

    return 0;
}

♦ 打印: ·

Qt QQueue 安全的多线程队列、阻塞队列-LMLPHP

2. Qt QQueue 队列基本用法

QQueue 继承与 QList

♦ 常用方法: ·

QQueue<int> QQ;         	 //定义一个int型队列

QQ.isEmpty();                //返回队列是否为空

QQ.size();                   //返回队列元素个数

QQ.clear();                  //清空队列

QQ.enqueue();                //在队列尾部添加一个元素, 比如插入数字5: QQ.enqueue(5)
/* 相当于
Q.push();                 
*/

QQ.dequeue();                //删除当前队列第一个元素,并返回这个元素
/* 相当于
Q.front();                   //返回当前队列的第一个元素
Q.pop();                     //从当前队列里,移出第一个元素
*/

QQ.head();                   //返回当前队列第一个元素
/* 相当于
Q.front();                   
*/

QQ.last();                   //返回当前队列尾部的元素
/* 相当于
Q.back();              	   
*/
T &  operator[]( int i );   //以数组形式访问队列元素

♦ 实例: ·

#include <QCoreApplication>
#include <QQueue>
#include <QDebug>

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    // 创建一个QQueue对象
    QQueue<int> QQ;

    // 向队列中添加元素
    QQ.enqueue(1);
    QQ.enqueue(2);
    QQ.enqueue(3);

    qDebug()<<"queue empty:  "<<QQ.isEmpty();
    qDebug()<<"queue size:  " <<QQ.size();
    qDebug()<<"queue head:  " <<QQ.head()  ;
    qDebug()<<"queue last:  " <<QQ.last()  << "\n";


    // 从队列中移除元素,并输出
    while (!QQ.isEmpty()) {
        int value = QQ.dequeue();
        qDebug() << "Dequeued:" << value;
    }

    return a.exec();
}

♦ 打印: ·

Qt QQueue 安全的多线程队列、阻塞队列-LMLPHP

3. Qt QQueue 多线程队列

在多线程编程中,由于QQueue并不是线程安全的,因此我们需要先使用互斥锁(QMutex)来保护队列。在读写队列时,我们需要获取互斥锁的锁定,以避免多个线程同时访问队列导致的数据竞争问题。

然后通过经典的生产者和消费者来看一个简单的示例程序,演示如何使用QQueue实现线程安全队列:

#include <QCoreApplication>
#include <QQueue>
#include <QMutex>
#include <QThread>
#include <QDebug>

// 定义线程安全队列类
template<typename T>
class ThreadSafeQueue
{
public:
    // 添加元素到队列尾部
    void enqueue(const T& value) {
        QMutexLocker locker(&m_mutex);
        m_queue.enqueue(value);
    }

    // 从队列头部移除一个元素,并返回它
    T dequeue() {
        QMutexLocker locker(&m_mutex);
        if (m_queue.isEmpty()) {
            return T();
        }
        return m_queue.dequeue();
    }

    // 返回队列是否为空
    bool isEmpty() const {
        QMutexLocker locker(&m_mutex);
        return m_queue.isEmpty();
    }

private:
    QQueue<T> m_queue;
    mutable QMutex m_mutex;
};

// 定义生产者线程类
class ProducerThread : public QThread
{
public:
    ProducerThread(ThreadSafeQueue<int>& queue)
        : m_queue(queue)
    {
    }

protected:
    void run() override {
        for (int i = 1; i <= 10; i++) {
            m_queue.enqueue(i);
            qDebug() << "Enqueued:" << i;
            msleep(500);
        }
    }

private:
    ThreadSafeQueue<int>& m_queue;
};

// 定义消费者线程类
class ConsumerThread : public QThread
{
public:
    ConsumerThread(ThreadSafeQueue<int>& queue)
        : m_queue(queue)
    {
    }

protected:
    void run() override {
        while (!isInterruptionRequested()) {
            if (!m_queue.isEmpty()) {
                int value = m_queue.dequeue();
                qDebug() << "Dequeued:" << value;
            }
            msleep(500);
        }
    }

private:
    ThreadSafeQueue<int>& m_queue;
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    // 创建线程安全队列对象
    ThreadSafeQueue<int> queue;

    // 创建生产者线程对象和消费者线程对象
    ProducerThread producer(queue);
    ConsumerThread consumer(queue);

    // 启动线程
    producer.start();
    consumer.start();

    qDebug() << "F1";

    // 等待线程结束
    //在producer等待期间,  consumer run()也会运行 直到producer 运行完毕,main才能往下执行
    producer.wait();

    qDebug() << "F2";

    consumer.requestInterruption(); //相当于 ctrl + c 结束 consumer 线程
    qDebug() << "F3";
    consumer.wait();
    qDebug() << "F4";

    return a.exec();
}

♦ 运行结果:

Qt QQueue 安全的多线程队列、阻塞队列-LMLPHP

在上面的示例程序中,我们首先定义了一个模板类ThreadSafeQueue,用于实现线程安全队列。该类使用QMutex来保护QQueue对象,以实现线程安全。

接下来,我们定义了两个线程类ProducerThread和ConsumerThread,用于生产和
消费数据。

在ProducerThread中,我们循环向队列中添加元素,每隔500毫秒添加一个元素。在ConsumerThread中,我们循环从队列中取出元素,每隔500毫秒取出一个元素。在取出元素时,我们需要判断队列是否为空,避免出现异常情况。

其中:执行 producer.wait() 时, mian 中主线程将会被阻塞; 等到 producer 生产者运行完毕,才会唤醒;而 consumer 线程不受影响;

万一发生数据处理速度不匹配的情况呢?

  • 生产者休眠 500ms 消费者休眠500ms, 就是如上情况
  • 生产者休眠时间 < 消费者休眠时间, 那么生产者执行完毕后,消费者还未消费完就退出线程了,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕
  • 生产者休眠时间 > 消费者休眠时间, 那么生产者执行完毕后,消费者也执行完毕了

真实的大数据情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

在 java 中有 BlockingQueue 阻塞队列,但是Qt 中似乎没有相关的阻塞队列,需要我们自己去自己控制这些细节

4. Qt BlockingQueue 自定义线程安全的阻塞队列

Qt QQueue 安全的多线程队列、阻塞队列-LMLPHP

以下定义一个简单的阻塞队列:

#include <QCoreApplication>
#include <QWaitCondition>
#include <QQueue>
#include <QMutex>
#include <QThread>
#include <QDebug>

template <typename T>
class BlockingQueue
{
public:
    BlockingQueue() {}
    void put(const T& value)
    {
        QMutexLocker locker(&m_mutex);
        m_queue.enqueue(value);
        m_condition.wakeOne();   //唤醒等待队列中的一个线程(来自wait)
    }
    T take()
    {
        QMutexLocker locker(&m_mutex);
        while (m_queue.isEmpty()) {
            m_condition.wait(&m_mutex);
        }
        return m_queue.dequeue();
    }
    bool isEmpty() const
    {
        QMutexLocker locker(&m_mutex);
        return m_queue.isEmpty();
    }
    int size() const
    {
        QMutexLocker locker(&m_mutex);
        return m_queue.size();
    }

private:
    QQueue<T> m_queue;
    mutable QMutex m_mutex;
    QWaitCondition m_condition;
};

这个 BlockingQueue类使用QMutex和QWaitCondition来保证线程安全,并实现了put、take、isEmpty和size等方法。其中,put方法用于往队列中插入元素,take方法用于从队列中取出元素,isEmpty方法用于判断队列是否为空,size方法用于获取队列中元素的数量。

在put方法中,我们首先获取了互斥锁,然后将元素插入到队列中,并通过QWaitCondition的wakeOne()方法唤醒一个等待线程(调用take()中的线程)。在take方法中,我们首先获取了互斥锁,然后在队列为空时调用QWaitCondition的wait()方法等待,直到有其他线程往队列中插入了元素并唤醒了当前线程。

mutable的作用是允许在const成员函数中修改BlockingQueue类的m_mutex和m_notEmpty成员变量。这是因为,生产者和消费者线程在往阻塞队列中添加或删除元素时,都需要对这两个成员变量进行修改。但是,由于take()和tryTake()方法都是const成员函数,因此如果不将m_mutex和m_notEmpty声明为mutable类型,编译器就会报错。

♦ 使用: ·

static BlockingQueue<int> queue;

class Producer : public QThread
{
public:
    void run() override
    {
        for (int i = 0; i < 10; ++i) {
            queue.put(i);
            qDebug() << "Producer thread: " << QThread::currentThreadId() << ", value: " << i;
            msleep(500); // sleep for 0.5 second

        }
    }
};

class Consumer : public QThread
{
public:
    void run() override
    {
        int value = 0;
        while (true) {
            value = queue.take();
            qDebug() << "Consumer thread: " << QThread::currentThreadId() << ", value: " << value;
        }
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    Producer producer;
    Consumer consumer1, consumer2;

    producer.start();
    consumer1.start();
    consumer2.start();

    return a.exec();
}

♦ 运行结果:

Qt QQueue 安全的多线程队列、阻塞队列-LMLPHP

上述包含一个生产者线程和两个消费者线程,生产者线程往队列中插入10个整数,每插入一个元素后暂停0.5秒。两个消费者线程不断从队列中取出元素,并输出当前线程的ID和取出的元素值。 当队列为空时,消费者线程会进入等待状态,直到有其他线程往队列中插入元素。

05-08 04:42