点击(此处)折叠或打开
- // 生产者消费者.cpp : 定义控制台应用程序的入口点。
- //
- #include "stdafx.h"
- #include <iostream>
- #include <Windows.h>
- using namespace std;
- const int BUFFER_SIZE=5;
- const int producerCount=5;
- const int consumerCount=5;
- const int timewait=INFINITE;
- static int pID=0;
- int cID=0;
- HANDLE mutex;
- HANDLE empty;
- HANDLE full;
- int in=0,out=0;
- int buffer[BUFFER_SIZE];
- DWORD WINAPI pFunc(LPVOID lparam);
- DWORD WINAPI cFunc(LPVOID lparam);
- int _tmain(int argc, _TCHAR* argv[])
- {
- HANDLE pThread[producerCount];
- HANDLE cThread[consumerCount];
- DWORD producerID[producerCount];
- DWORD consumerID[consumerCount];
- mutex=CreateMutex(NULL,FALSE,NULL);
- empty=CreateSemaphore(NULL,BUFFER_SIZE,BUFFER_SIZE,NULL);
- full =CreateSemaphore(NULL,0,BUFFER_SIZE,NULL);
- for(int i=0 ; i<producerCount ; ++i){
- pThread[i]=CreateThread(NULL,0,pFunc,NULL,0,&producerID[i]);
- if(pThread[i]==NULL)
- return -1;
- }
-
- for(int i=0 ; i<consumerCount ; ++i){
- cThread[i]=CreateThread(NULL,0,cFunc,NULL,0,&consumerID[i]);
- if(cThread[i]==NULL)
- return -1;
- }
-
- bool isContinue=true;
- while(isContinue)
- {
- if(getchar())
- {
- isContinue = false;
- }
- }
- for(int i=0 ; i<producerCount ; ++i){
- CloseHandle(pThread[i]);
- }
- for(int i=0 ; i<consumerCount ; ++i){
- CloseHandle(cThread[i]);
- }
- CloseHandle(mutex);
- CloseHandle(empty);
- CloseHandle(full);
- return 0;
- }
- DWORD WINAPI pFunc(LPVOID lparam)
- {
- Sleep(100);
- WaitForSingleObject(empty,timewait);
- WaitForSingleObject(mutex,timewait);
- cout<<"produced one ID:"<<pID<<endl;
- buffer[in]=pID;
- in=(in+1)%BUFFER_SIZE;
- pID++;
- ReleaseMutex(mutex);
- ReleaseSemaphore(full,1,NULL);
- return 0;
- }
- DWORD WINAPI cFunc(LPVOID lparam)
- {
- Sleep(100);
- WaitForSingleObject(full,timewait);
- WaitForSingleObject(mutex,timewait);
- cID=buffer[out];
- out=(out+1)%BUFFER_SIZE;
- cout<<"consumed one ID:"<<cID<<endl;
-
- ReleaseMutex(mutex);
- ReleaseSemaphore(empty,1,NULL);
- return 0;
- }
java实现:
点击(此处)折叠或打开
- import java.util.Date;
- import java.util.concurrent.Semaphore;
- class Producer implements Runnable
- {
- private BoundedBuffer buffer;
- private int producerCount=5;
- private long sleeptime=100;
-
- public Producer(BoundedBuffer buffer){
- this.buffer=buffer;
- }
-
- public void run(){
- Date message;
- for(int i=0 ;i<producerCount ;i++){
- try {
- Thread.sleep(sleeptime);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- message=new Date();
- buffer.insert(message);
- System.out.println("in producer:"+i+" "+message);
- }
- }
- }
- class Consumer implements Runnable
- {
- private BoundedBuffer buffer;
- private int consumerCount=5;
- private long sleeptime=100;
-
- public Consumer(BoundedBuffer buffer){
- this.buffer=buffer;
- }
-
- public void run(){
- Date message;
- for(int i=0 ;i<consumerCount ;i++){
- try {
- Thread.sleep(sleeptime);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- message=(Date)buffer.remove();
- System.out.println("in consumer:"+i+" "+message);
- }
- }
- }
- class BoundedBuffer
- {
- private static final int BUFFER_SIZE=5;
- private Object[] buffer;
- private int in,out;
- private Semaphore mutex;
- private Semaphore empty;
- private Semaphore full;
-
- public BoundedBuffer(){
- in=0;
- out=0;
- buffer=new Object[BUFFER_SIZE];
-
- mutex=new Semaphore(1);
- empty=new Semaphore(BUFFER_SIZE);
- full=new Semaphore(0);
- }
-
- public void insert(Object item){
- try {
- empty.acquire();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- try {
- mutex.acquire();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- buffer[in]=item;
- in=(in+1)%BUFFER_SIZE;
-
- mutex.release();
- full.release();
- }
-
- public Object remove(){
- try {
- full.acquire();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- try {
- mutex.acquire();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- Object item=buffer[out];
- out=(out+1)%BUFFER_SIZE;
-
- mutex.release();
- empty.release();
- return item;
- }
- }
- public class 生产者消费者
- {
- public static void main(String[] args){
- BoundedBuffer buffer=new BoundedBuffer();
-
- Thread producer=new Thread(new Producer(buffer));
- Thread consumer=new Thread(new Consumer(buffer));
-
- producer.start();
- consumer.start();
- }
- }