转载请注明转自: 存储系统研究, 本文固定链接: 单生产者-多消费者模型中遇到的问题

(1)      原始代码

最近使用单生产者-多消费者模型是遇到一个问题,以前既然都没有想到过。生产者线程的代码如下,基本功能就是接收到一个连接之后创建一个Socket对象并放到list中等待处理。

点击(此处)折叠或打开

  1. void DataManager::InternalStart() {
  2.     server_socket_ = new ServerSocket();
  3.     if (!server_socket_->SetAddress(NetworkUtil::GetIpAddress().c_str(), 9091)) {
  4.         LOG(ERROR) << "Set address failed.";
  5.         delete server_socket_;
  6.         server_socket_ = NULL;
  7.         return;
  8.     }
  9.     server_socket_->SetSoBlocking(true);
  10.     if (!server_socket_->Listen()) {
  11.         LOG(ERROR) << "listen failed.";
  12.         return;
  13.     }
  14.     Socket *socket = NULL;
  15.     while (!stop_) {
  16.         if ((socket = server_socket_->Accept()) != NULL) {
  17.             LOG(INFO) << "Recieved connection fd: " << socket->GetAddr();
  18.             {
  19.                 common::MutexLock lc(&socket_mu_);
  20.                 socket_list_.push_back(socket);
  21.                 cond_var_.Signal();
  22.             }
  23.         }
  24.     }

多个消费者线程的的代码如下,基本功能是从list中取得一个Socket对象进行处理;

点击(此处)折叠或打开

  1. void DataManager::WorkEntry() {
  2.    Socket *socket = NULL;
  3.    while (!stop_) {
  4.        // Get connection socket.
  5.        {
  6.            common::MutexLock lc(&socket_mu_);
  7.            if (socket_list_.empty()) {
  8.                cond_var_.Wait(&socket_mu_);
  9.            }
  10.            if (stop_)
  11.                break;
  12.            socket = socket_list_.front();
  13.            socket_list_.pop_front();
  14.        }
  15.  
  16.        bool success = false;
  17.         do{
  18.            {
  19.                Packet request;
  20.                if ((success = socket->GetPacket(&request))) {
  21.                     HandlePacket(&request);
  22.                }
  23.            }
  24.        } while (success);
  25.  
  26.        delete socket;
  27.        socket = NULL;
  28.     }
  29. }

(2)      问题

运行过程中进场出现段错误,都是在12行(socket = socket_list_.front())。使用GDB调试发现socket_list_size0

(3)      加入log调试

加入下面的log进行调试

点击(此处)折叠或打开

  1. @@ -115,6 +115,7 @@ voidDataManager::InternalStart() {
  2.             {
  3.                  common::MutexLocklc(&socket_mu_);
  4.                 socket_list_.push_back(socket);
  5. + LOG(INFO) << "1:size: " << socket_list_.size();
  6.                  cond_var_.Signal();
  7.             }
  8.         }
  9. @@ -129,11 +130,14 @@ voidDataManager::WorkEntry() {
  10.             common::MutexLock lc(&socket_mu_);
  11.             if (socket_list_.empty()) {
  12.                 cond_var_.Wait(&socket_mu_);
  13. + LOG(INFO) << "2:size: " << socket_list_.size();
  14.             }
  15.             if (stop_)
  16.                  break;
  17. + LOG(INFO) << "3: size: " << socket_list_.size();
  18.             socket = socket_list_.front();
  19.             socket_list_.pop_front();
  20. + LOG(INFO) << "4: size:" << socket_list_.size()


打印的log如下:

I0809 02:35:45.269896 17305DataManager.cc:114] Recieved connection fd: 10.237.92.30:37220 

I0809 02:35:45.269902 17305DataManager.cc:118] 1: size: 1
I0809 02:35:45.269928 17310DataManager.cc:133] 2: size: 1
I0809 02:35:45.269935 17310DataManager.cc:137] 3: size: 1
I0809 02:35:45.269937 17310DataManager.cc:140] 4: size: 0
………
I0809 02:35:45.271636 17305 DataManager.cc:114]Recieved connection fd: 10.237.92.30:37224
I0809 02:35:45.271644 17305DataManager.cc:118] 1: size: 1
I0809 02:35:45.271663 17310DataManager.cc:137] 3: size: 1
I0809 02:35:45.271670 17310DataManager.cc:140] 4: size: 0
I0809 02:35:45.271739 17309 DataManager.cc:133]2: size: 0
I0809 02:35:45.271750 17309DataManager.cc:137] 3: size: 0


(4)      分析:

a)        正常的log顺序正常的log顺序应该是,add一个Socket之后得到,有一个消费者线程被signal唤醒并处理这个socket。

I0809 02:35:45.269902 17305DataManager.cc:118] 1: size: 1 
I0809 02:35:45.269928 17310DataManager.cc:133] 2: size: 1 
I0809 02:35:45.269935 17310DataManager.cc:137] 3: size: 1 
I0809 02:35:45.269937 17310DataManager.cc:140] 4: size: 0

b)       出错时的log顺序出现错误时的log顺序如下,

I0809 02:35:45.271644 17305DataManager.cc:118] 1: size: 1 
I0809 02:35:45.271663 17310DataManager.cc:137] 3: size: 1 
I0809 02:35:45.271670 17310DataManager.cc:140] 4: size: 0 
I0809 02:35:45.271739 17309 DataManager.cc:133]2: size: 0 
I0809 02:35:45.271750 17309DataManager.cc:137] 3: size: 0

线程号可以从第三列得出, 17305的线程是生产者线程,17310和17309为消费者线程。从打印的log可以看除运行的顺序如下:
a) 初始状态;
    i.   17305:获得socket_mu_准备向socket_list_中插入socket。
    ii.  17309:正处于cond_var_.Wait(&socket_mu_);状态下等待cond_var发生;
    iii. 17310 :socket_mu_应该是在试图
b) 17305线程调用cond_var_.Signal()唤醒17309,此时17309和17310还需要争夺socket_mu_,应该是17310先得到了socket_mu_所以17309必须再次睡眠。
c) 17310将刚才17305生产的socket消耗了,并且释放了socket_mu_。但是此时的socket_list_有变成空的了。
d) 17309得到socket_mu_,调用socket_list_.front()时程序crash

(4)      解决办法:多加一个判断

点击(此处)折叠或打开

  1. @@ -129,6 +129,9 @@ voidDataManager::WorkEntry() {
  2.             common::MutexLock lc(&socket_mu_);
  3.             if (socket_list_.empty()) {
  4.                  cond_var_.Wait(&socket_mu_);
  5. +
  6. + if (socket_list_.empty())
  7. + continue;
  8.             }
  9.             if (stop_)
  10.                  break
 
12-17 00:54