更新:我在下面的回答中提供了问题的原因及其解决方案。

我想基于图像处理任务的生产者-消费者方法实现多线程。就我而言,Producer线程应获取图像并将其放入container中,而消费者线程应从Container线程中提取图像。我认为我应该使用queue来实现container

我想使用此SO answer中建议的以下代码。但是我对container实现感到非常困惑,并将传入的图像放入Producer线程中。

问题:第一个consumer thread显示的图像不包含完整数据。并且,第二个consumer thread从不显示任何图像。可能是由于某些竞争情况或锁定情况,第二个线程根本无法访问队列数据。我已经尝试使用Mutex

#include <vector>
#include <thread>
#include <memory>
#include <queue>

#include <opencv2/highgui.hpp>
#include <opencv2/core.hpp>
#include <opencv2/imgproc.hpp>

Mutex mu;

struct ThreadSafeContainer
{
    queue<unsigned char*> safeContainer;

};

struct Producer
{
    Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
    {

    }

    void run()
    {
        while(true)
        {
            // grab image from camera
            // store image in container
            Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
            unsigned char *pt_src = image.data;
            mu.lock();
            container->safeContainer.push(pt_src);
            mu.unlock();
        }
    }

    std::shared_ptr<ThreadSafeContainer> container;
};

struct Consumer
{
    Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
    {

    }
    ~Consumer()
    {

    }

    void run()
    {
        while(true)
        {
            // read next image from container
        mu.lock();
        if (!container->safeContainer.empty())
            {
                unsigned char *ptr_consumer_Image;
                ptr_consumer_Image = container->safeContainer.front(); //The front of the queue contain the pointer to the image data
                container->safeContainer.pop();

                Mat image(400, 400, CV_8UC3);
                image.data = ptr_consumer_Image;

                imshow("consumer image", image);
                waitKey(33);
            }
            mu.unlock();
        }
    }

    std::shared_ptr<ThreadSafeContainer> container;
};



int main()
{
    //Pointer object to the class containing a "container" which will help "Producer" and "Consumer" to put and take images
    auto ptrObject_container = make_shared<ThreadSafeContainer>();

    //Pointer object to the Producer...intialize the "container" variable of "Struct Producer" with the above created common "container"
    auto ptrObject_producer = make_shared<Producer>(ptrObject_container);


    //FIRST Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
    auto first_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);

    //SECOND Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
    auto second_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);

    //RUN producer thread
    thread producerThread(&Producer::run, ptrObject_producer);


    //RUN first thread of Consumer
    thread first_consumerThread(&Consumer::run, first_ptrObject_consumer);

    //RUN second thread of Consumer
    thread second_consumerThread(&Consumer::run, second_ptrObject_consumer);

    //JOIN all threads
    producerThread.join();
    first_consumerThread.join();
    second_consumerThread.join();

    return 0;
}

最佳答案

我在您的原始问题中没有看到实际的问题,因此,我将为您提供我在大学类(class)中用来实现生产者-消费者的参考资料。

http://cs360.byu.edu/static/lectures/winter-2014/semaphores.pdf

幻灯片13和17很好地说明了生产者-消费者

我在自己的github上发布的实验室中利用了此功能:
https://github.com/qzcx/Internet_Programming/tree/master/ThreadedMessageServer

如果查看我的server.cc,则可以看到生产者-消费者模式的实现。

请记住,使用这种模式不能切换wait语句的顺序,否则可能会陷入死锁。

希望这会有所帮助。

编辑:

好的,这是上面链接的我的代码中的消费者-生产者模式的摘要。生产者使用者背后的想法是采用线程安全的方式将任务从“生产者”线程传递到“消费者” worker 线程。在我的示例中,要做的工作是处理客户请求。生产者线程(.serve())监视传入的套接字,并将连接传递给使用者线程(.handle()),以处理传入的实际请求。该模式的所有代码都可以在server.cc中找到。文件(在server.h中带有一些声明/导入)。

为了简短起见,我省略了一些细节。确保遍历每行,了解正在发生的事情。查找我正在使用的库函数以及参数的含义。我在这里为您提供了很多帮助,但是您仍然需要做大量的工作来获得全面的了解。

生产商:

就像我上面提到的,整个生产者线程都位于.serve()函数中。它做以下事情

  • 初始化信号量。由于操作系统差异,此处有两个版本。我在OS X上编程,但必须在Linux上交代码。由于信号量与操作系统相关,因此了解如何在特定设置中使用信号量非常重要。
  • 设置客户端与之交谈的套接字。对您的应用而言并不重要。
  • 创建使用者线程。
  • 监视客户端套接字并使用生产者模式将项目传递给消费者。此代码在
  • 下面

    在.serve()函数的底部,您可以看到以下代码:
    while ((client = accept(server_,(struct sockaddr *)&client_addr,&clientlen)) > 0) {
        sem_wait(clients_.e); //buffer check
        sem_wait(clients_.s);
        clients_.q->push(client);
        sem_post(clients_.s);
        sem_post(clients_.n); //produce
    }
    

    首先,检查缓冲区信号量“e”,以确保队列中有足够的空间来放置请求。其次,获取队列的信号量“s”。然后将您的任务(在这种情况下为客户端连接)添加到队列中。释放队列的信号量。最后,使用信号量“n”向消费者发出信号。

    使用者:

    在.handle()方法中,您实际上只关心线程的最开始。
    while(1){
        sem_wait(clients_.n); //consume
        sem_wait(clients_.s);
        client = clients_.q->front();
        clients_.q->pop();
        sem_post(clients_.s);
        sem_post(clients_.e); //buffer free
    
        //Handles the client requests until they disconnect.
    }
    

    消费者采取与生产者相似的行动,但采取相反的方式。首先,消费者等待生产者在信号量“n”上发信号。请记住,因为有多个使用者,所以最终哪个使用者可能会获得此信号量是完全随机的。他们为之奋斗,但在该信号量的sem_post上,只有这一点可以通过。其次,他们像生产者一样获取队列信号。将第一个项目弹出队列,然后释放信号量。最后,它们在缓冲区信号量“e”上发出信号,表明缓冲区中现在有更多空间。

    免责声明:

    我知道信号灯的名字很糟糕。它们与我教授的幻灯片相匹配,因为那是我学习的地方。我认为它们代表以下观点:
  • e为空:如果此信号量已满,此信号量将阻止生产者将更多项目推入队列。
  • s用于信号量:我最不喜欢的。但是我教授的风格是为每个共享数据结构都有一个结构。在这种情况下,“clients_”是包含所有三个信号量和队列的结构。基本上,此信号量用于确保没有两个线程同时触摸相同的数据结构。
  • n表示队列中的项目数。
  • 10-08 08:49