点击(此处)折叠或打开

  1. // 生产者消费者.cpp : 定义控制台应用程序的入口点。
  2. //

  3. #include "stdafx.h"
  4. #include <iostream>
  5. #include <Windows.h>
  6. using namespace std;

  7. const int BUFFER_SIZE=5;
  8. const int producerCount=5;
  9. const int consumerCount=5;
  10. const int timewait=INFINITE;
  11. static int pID=0;
  12. int cID=0;
  13. HANDLE mutex;
  14. HANDLE empty;
  15. HANDLE full;
  16. int in=0,out=0;
  17. int buffer[BUFFER_SIZE];

  18. DWORD WINAPI pFunc(LPVOID lparam);
  19. DWORD WINAPI cFunc(LPVOID lparam);

  20. int _tmain(int argc, _TCHAR* argv[])
  21. {
  22.     HANDLE pThread[producerCount];
  23.     HANDLE cThread[consumerCount];
  24.     DWORD producerID[producerCount];
  25.     DWORD consumerID[consumerCount];

  26.     mutex=CreateMutex(NULL,FALSE,NULL);
  27.     empty=CreateSemaphore(NULL,BUFFER_SIZE,BUFFER_SIZE,NULL);
  28.     full =CreateSemaphore(NULL,0,BUFFER_SIZE,NULL);

  29.     for(int i=0 ; i<producerCount ; ++i){
  30.         pThread[i]=CreateThread(NULL,0,pFunc,NULL,0,&producerID[i]);
  31.         if(pThread[i]==NULL)
  32.             return -1;
  33.     }
  34.     
  35.     for(int i=0 ; i<consumerCount ; ++i){
  36.         cThread[i]=CreateThread(NULL,0,cFunc,NULL,0,&consumerID[i]);
  37.         if(cThread[i]==NULL)
  38.             return -1;
  39.     }
  40.     
  41.     bool isContinue=true;
  42.     while(isContinue)
  43.     {
  44.         if(getchar())
  45.         {
  46.             isContinue = false;
  47.         }
  48.     }

  49.     for(int i=0 ; i<producerCount ; ++i){
  50.         CloseHandle(pThread[i]);
  51.     }
  52.     for(int i=0 ; i<consumerCount ; ++i){
  53.         CloseHandle(cThread[i]);
  54.     }
  55.     CloseHandle(mutex);
  56.     CloseHandle(empty);
  57.     CloseHandle(full);
  58.     return 0;
  59. }

  60. DWORD WINAPI pFunc(LPVOID lparam)
  61. {
  62.     Sleep(100);

  63.     WaitForSingleObject(empty,timewait);
  64.     WaitForSingleObject(mutex,timewait);

  65.     cout<<"produced one ID:"<<pID<<endl;
  66.     buffer[in]=pID;
  67.     in=(in+1)%BUFFER_SIZE;
  68.     pID++;

  69.     ReleaseMutex(mutex);
  70.     ReleaseSemaphore(full,1,NULL);

  71.     return 0;
  72. }

  73. DWORD WINAPI cFunc(LPVOID lparam)
  74. {
  75.     Sleep(100);

  76.     WaitForSingleObject(full,timewait);
  77.     WaitForSingleObject(mutex,timewait);

  78.     cID=buffer[out];
  79.     out=(out+1)%BUFFER_SIZE;
  80.     cout<<"consumed one ID:"<<cID<<endl;
  81.     
  82.     ReleaseMutex(mutex);
  83.     ReleaseSemaphore(empty,1,NULL);

  84.     return 0;
  85. }

java实现:

点击(此处)折叠或打开

  1. import java.util.Date;
  2. import java.util.concurrent.Semaphore;

  3. class Producer implements Runnable
  4. {
  5.     private BoundedBuffer buffer;
  6.     private int producerCount=5;
  7.     private long sleeptime=100;
  8.     
  9.     public Producer(BoundedBuffer buffer){
  10.         this.buffer=buffer;
  11.     }
  12.     
  13.     public void run(){
  14.         Date message;
  15.         for(int i=0 ;i<producerCount ;i++){
  16.             try {
  17.                 Thread.sleep(sleeptime);
  18.             } catch (InterruptedException e) {
  19.                 // TODO Auto-generated catch block
  20.                 e.printStackTrace();
  21.             }
  22.             message=new Date();
  23.             buffer.insert(message);
  24.             System.out.println("in producer:"+i+" "+message);
  25.         }
  26.     }
  27. }

  28. class Consumer implements Runnable
  29. {
  30.     private BoundedBuffer buffer;
  31.     private int consumerCount=5;
  32.     private long sleeptime=100;
  33.     
  34.     public Consumer(BoundedBuffer buffer){
  35.         this.buffer=buffer;
  36.     }
  37.     
  38.     public void run(){
  39.         Date message;
  40.         for(int i=0 ;i<consumerCount ;i++){
  41.             try {
  42.                 Thread.sleep(sleeptime);
  43.             } catch (InterruptedException e) {
  44.                 // TODO Auto-generated catch block
  45.                 e.printStackTrace();
  46.             }
  47.             message=(Date)buffer.remove();
  48.             System.out.println("in consumer:"+i+" "+message);
  49.         }
  50.     }
  51. }

  52. class BoundedBuffer
  53. {
  54.     private static final int BUFFER_SIZE=5;
  55.     private Object[] buffer;
  56.     private int in,out;
  57.     private Semaphore mutex;
  58.     private Semaphore empty;
  59.     private Semaphore full;
  60.     
  61.     public BoundedBuffer(){
  62.         in=0;
  63.         out=0;
  64.         buffer=new Object[BUFFER_SIZE];
  65.         
  66.         mutex=new Semaphore(1);
  67.         empty=new Semaphore(BUFFER_SIZE);
  68.         full=new Semaphore(0);
  69.     }
  70.     
  71.     public void insert(Object item){
  72.         try {
  73.             empty.acquire();
  74.         } catch (InterruptedException e) {
  75.             // TODO Auto-generated catch block
  76.             e.printStackTrace();
  77.         }
  78.         try {
  79.             mutex.acquire();
  80.         } catch (InterruptedException e) {
  81.             // TODO Auto-generated catch block
  82.             e.printStackTrace();
  83.         }
  84.         
  85.         buffer[in]=item;
  86.         in=(in+1)%BUFFER_SIZE;
  87.         
  88.         mutex.release();
  89.         full.release();
  90.     }
  91.     
  92.     public Object remove(){
  93.         try {
  94.             full.acquire();
  95.         } catch (InterruptedException e) {
  96.             // TODO Auto-generated catch block
  97.             e.printStackTrace();
  98.         }
  99.         try {
  100.             mutex.acquire();
  101.         } catch (InterruptedException e) {
  102.             // TODO Auto-generated catch block
  103.             e.printStackTrace();
  104.         }
  105.         
  106.         Object item=buffer[out];
  107.         out=(out+1)%BUFFER_SIZE;
  108.         
  109.         mutex.release();
  110.         empty.release();
  111.         return item;
  112.     }
  113. }

  114. public class 生产者消费者
  115. {
  116.     public static void main(String[] args){
  117.         BoundedBuffer buffer=new BoundedBuffer();
  118.         
  119.         Thread producer=new Thread(new Producer(buffer));
  120.         Thread consumer=new Thread(new Consumer(buffer));
  121.         
  122.         producer.start();
  123.         consumer.start();
  124.     }
  125. }
菜鸟学习中,望路过大神多多指点。
09-19 01:13