更新:我在下面的回答中提供了问题的原因及其解决方案。
我想基于图像处理任务的生产者-消费者方法实现多线程。就我而言,Producer
线程应获取图像并将其放入container
中,而消费者线程应从Container
线程中提取图像。我认为我应该使用queue
来实现container
。
我想使用此SO answer中建议的以下代码。但是我对container
的实现感到非常困惑,并将传入的图像放入Producer
线程中。
问题:第一个consumer thread
显示的图像不包含完整数据。并且,第二个consumer thread
从不显示任何图像。可能是由于某些竞争情况或锁定情况,第二个线程根本无法访问队列数据。我已经尝试使用Mutex
。#include <vector>
#include <thread>
#include <memory>
#include <queue>
#include <opencv2/highgui.hpp>
#include <opencv2/core.hpp>
#include <opencv2/imgproc.hpp>
Mutex mu;
struct ThreadSafeContainer
{
queue<unsigned char*> safeContainer;
};
struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
unsigned char *pt_src = image.data;
mu.lock();
container->safeContainer.push(pt_src);
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{
}
~Consumer()
{
}
void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{
unsigned char *ptr_consumer_Image;
ptr_consumer_Image = container->safeContainer.front(); //The front of the queue contain the pointer to the image data
container->safeContainer.pop();
Mat image(400, 400, CV_8UC3);
image.data = ptr_consumer_Image;
imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}
std::shared_ptr<ThreadSafeContainer> container;
};
int main()
{
//Pointer object to the class containing a "container" which will help "Producer" and "Consumer" to put and take images
auto ptrObject_container = make_shared<ThreadSafeContainer>();
//Pointer object to the Producer...intialize the "container" variable of "Struct Producer" with the above created common "container"
auto ptrObject_producer = make_shared<Producer>(ptrObject_container);
//FIRST Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto first_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//SECOND Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto second_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//RUN producer thread
thread producerThread(&Producer::run, ptrObject_producer);
//RUN first thread of Consumer
thread first_consumerThread(&Consumer::run, first_ptrObject_consumer);
//RUN second thread of Consumer
thread second_consumerThread(&Consumer::run, second_ptrObject_consumer);
//JOIN all threads
producerThread.join();
first_consumerThread.join();
second_consumerThread.join();
return 0;
}
最佳答案
我在您的原始问题中没有看到实际的问题,因此,我将为您提供我在大学类(class)中用来实现生产者-消费者的参考资料。
http://cs360.byu.edu/static/lectures/winter-2014/semaphores.pdf
幻灯片13和17很好地说明了生产者-消费者
我在自己的github上发布的实验室中利用了此功能:
https://github.com/qzcx/Internet_Programming/tree/master/ThreadedMessageServer
如果查看我的server.cc,则可以看到生产者-消费者模式的实现。
请记住,使用这种模式不能切换wait语句的顺序,否则可能会陷入死锁。
希望这会有所帮助。
编辑:
好的,这是上面链接的我的代码中的消费者-生产者模式的摘要。生产者使用者背后的想法是采用线程安全的方式将任务从“生产者”线程传递到“消费者” worker 线程。在我的示例中,要做的工作是处理客户请求。生产者线程(.serve())监视传入的套接字,并将连接传递给使用者线程(.handle()),以处理传入的实际请求。该模式的所有代码都可以在server.cc中找到。文件(在server.h中带有一些声明/导入)。
为了简短起见,我省略了一些细节。确保遍历每行,了解正在发生的事情。查找我正在使用的库函数以及参数的含义。我在这里为您提供了很多帮助,但是您仍然需要做大量的工作来获得全面的了解。
生产商:
就像我上面提到的,整个生产者线程都位于.serve()函数中。它做以下事情
在.serve()函数的底部,您可以看到以下代码:
while ((client = accept(server_,(struct sockaddr *)&client_addr,&clientlen)) > 0) {
sem_wait(clients_.e); //buffer check
sem_wait(clients_.s);
clients_.q->push(client);
sem_post(clients_.s);
sem_post(clients_.n); //produce
}
首先,检查缓冲区信号量“e”,以确保队列中有足够的空间来放置请求。其次,获取队列的信号量“s”。然后将您的任务(在这种情况下为客户端连接)添加到队列中。释放队列的信号量。最后,使用信号量“n”向消费者发出信号。
使用者:
在.handle()方法中,您实际上只关心线程的最开始。
while(1){
sem_wait(clients_.n); //consume
sem_wait(clients_.s);
client = clients_.q->front();
clients_.q->pop();
sem_post(clients_.s);
sem_post(clients_.e); //buffer free
//Handles the client requests until they disconnect.
}
消费者采取与生产者相似的行动,但采取相反的方式。首先,消费者等待生产者在信号量“n”上发信号。请记住,因为有多个使用者,所以最终哪个使用者可能会获得此信号量是完全随机的。他们为之奋斗,但在该信号量的sem_post上,只有这一点可以通过。其次,他们像生产者一样获取队列信号。将第一个项目弹出队列,然后释放信号量。最后,它们在缓冲区信号量“e”上发出信号,表明缓冲区中现在有更多空间。
免责声明:
我知道信号灯的名字很糟糕。它们与我教授的幻灯片相匹配,因为那是我学习的地方。我认为它们代表以下观点: