生产者消费者模式的几种实现方式

拿我们生活中的例子来说,工厂生产出来的产品总是要输出到外面使用的,这就是生产与消费的概念。

在我们实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。

产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

第一种:采用wait—notify实现生产者消费者模式

1. 一生产者与一消费者:

2. 一生产者与多消费者:

第二种: 采用阻塞队列实现生产者消费者模式

3. 使用阻塞队列实现生产者消费者模式

相信大家都有去吃过日本料理。有个很诱人的餐食就是烤肉,烤肉师父会站在一边一直烤肉,再将烤好的肉放在一个盘子中;而流着口水的我们这些食客会坐在一边,只要盘子里有肉我们就会一直去吃。

在这个生活案例中,烤肉师父就是生产者,他就负责烤肉,烤完了就把肉放在盘子里,而不是直接递给食客(即不用通知食客去吃肉),如果盘子肉满,师父就会停一会,直到有人去食用烤肉后再去进行生产肉;而食客的我们就只是盯着盘子,一旦盘子有肉我们就负责去吃就行;

整个过程中食客与烤肉师父都不是直接打交道的,而是都与盘子进行交互。

盘子充当了一个缓冲区的概念,有东西生产出来就把东西放进去,盘子也是有大小限制,超过盘子大小就会阻塞生产者生产,等待消费者去消费;当盘子为空的时候 ,即阻塞消费者消费,等待生产者去生产。

编程中阻塞队列即可以实现盘子这个功能。

阻塞队列的特点:

当队列元素已满的时候,阻塞插入操作;

当队列元素为空的时候,阻塞获取操作。

ArrayBlockingQueue 与 LinkedBlockingQueue都是支持FIFO(先进先出),但是LinkedBlockingQueue是无界的,而ArrayBlockingQueue 是有界的。

下面使用阻塞队列实现生产者消费者:

生产者:

import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable{
	private final BlockingQueue blockingQueue;
	//设置队列缓存的大小。生产过程中超过这个大小就暂时停止生产
	private final int QUEUE_SIZE = 10;
	public Producer(BlockingQueue blockingQueue){
		this.blockingQueue = blockingQueue;
	}
	int task = 1;
	@Override
	  public void run() {
		while(true){
			try {
				System.out.println("正在生产:" + task);
				//将生产出来的产品放在队列缓存中
				blockingQueue.put(task);
				++task;
				//让其停止一会,便于查看效果
				Thread.sleep(1000);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

消费者:

import java.util.concurrent.BlockingQueue;
//消费者
public class Consumer implements Runnable{
	private final BlockingQueue blockingQueue;
	public Consumer(BlockingQueue blockingQueue){
		this.blockingQueue = blockingQueue;
	}
	@Override
	  public void run() {
		//只要阻塞队列中有任务,就一直去消费
		while(true){
			try {
				System.out.println("正在消费: " + blockingQueue.take());
				//让其停止一会,便于查看效果
				Thread.sleep(2000);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

测试:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * 生产者消费者模式
 * 使用阻塞队列BlockingQueue
 * @author wanggenshen
 *
 */
public class TestConPro {
	public static void main(String[] args){
		BlockingQueue blockingQueue = new LinkedBlockingQueue(5);
		Producer p = new Producer(blockingQueue);
		Consumer c = new Consumer(blockingQueue);
		Thread tp = new Thread(p);
		Thread tc= new Thread(c);
		tp.start();
		tc.start();
	}
}

因为LinkedBlockingQueue是无界队列,所以生产者会不断去生产,将生产出的任务放在队列中,消费者去队列中去消费:

如果改用有界阻塞队列ArrayBlockingQueue,就可以初始化队列的大小。则队列中元素超过队列大小的时候,生产者就会等待消费者消费一个再去生产一个:

测试代码:

初始化一个大小为10的ArrayBlockingQueue:

public static void main(String[] args){
	BlockingQueue blockingQueue = new ArrayBlockingQueue(10);
	Producer p = new Producer(blockingQueue);
	Consumer c = new Consumer(blockingQueue);
	Thread tp = new Thread(p);
	Thread tc= new Thread(c);
	tp.start();
	tc.start();
}

测试中,让生产者生产速度略快,而消费者速度略慢一点。可以看到生产出来的产品序号与消费的产品序号差始终为10(队列的大小):

总结

以上就是本文关于生产消费者模式实现方式和线程安全问题代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!

01-29 02:04