目录
一、生产者和消费者模型
生产者: 生产数据的任务
消费者: 处理数据的任务
生产者--队列(盆)-->消费者
生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.
补充: queue不适合传大文件,通产传一些消息.
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
1.1为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
1.2什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
二、基于队列实现生产者消费者模型
2.1生产者消费者模型1
from multiprocessing import Process,Queue
def producer(q,name,food):
"""生产者"""
for i in range(1,11):
print(f'{name}生产{food}{i}')
res = f'{food}{i}'
q.put(res)
q.put(None)
def consumer(q,name):
"""消费者"""
while True:
res = q.get(timeout=3)
if res is None:break
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'ocean','包子'))
c1 = Process(target=consumer,args=(q,'chen'))
p1.start()
c1.start()
#####################
ocean生产包子1
ocean生产包子2
ocean生产包子3
ocean生产包子4
ocean生产包子5
ocean生产包子6
ocean生产包子7
ocean生产包子8
ocean生产包子9
ocean生产包子10
chen吃了包子1
chen吃了包子2
chen吃了包子3
chen吃了包子4
chen吃了包子5
chen吃了包子6
chen吃了包子7
chen吃了包子8
chen吃了包子9
chen吃了包子10
2.2生产者消费者模型2
from multiprocessing import Process,Queue
import time,random
def producer(q,name,food):
"""生产者"""
for i in range(1,4):
print(f'{name}生产了{food}{i}')
time.sleep(random.randint(1,3))
res = f'{food}{i}'
q.put(res)
def consumer(q,name):
"""消费者"""
while True:
res = q.get(timeout=3)
if res is None:break
time.sleep(random.randint(1,3))
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'ocean','包子'))
p2 = Process(target=producer,args=(q,'sky','韭菜'))
p3 = Process(target=producer,args=(q,'rocky','酒'))
c1 = Process(target=consumer,args=(q,'nick'))
c2 = Process(target=consumer,args=(q,'mac'))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()#生产者生产完毕
q.put(None)#存在几个消费者,put几次None
q.put(None)
# for i in range(2):
# q.put(None)
#############################
ocean生产了包子1
sky生产了韭菜1
rocky生产了酒1
ocean生产了包子2
sky生产了韭菜2
ocean生产了包子3
sky生产了韭菜3
mac吃了韭菜1
rocky生产了酒2
nick吃了包子1
mac吃了包子2
rocky生产了酒3
nick吃了韭菜2
mac吃了酒1
nick吃了包子3
nick吃了酒2
mac吃了韭菜3
nick吃了酒3
2.3生产者消费者模型3
2.3.1JoinableQueue([maxsize])模块
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
2.3.1.1方法介绍
q.task_done()
:使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
q.join()
:生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
from multiprocessing import JoinableQueue
#用法和Queue相似
q = JoinableQueue()
q.put('ocean')#队列放入一个任务,内存在一个计数机制,+1
q.put('can')#计数机制 +1
print(q.get())
q.task_done()#完成一次任务,计数机制-1
print(q.get())
q.task_done()#计数机制 -1
q.join()#计数机制不为0的时候,阻塞等待计数器为0后通过
##################
ocean
can
2.3.2JionableQueue队列实现生产者消费者模型
from multiprocessing import Process, JoinableQueue
import time, random
def producer(q, name, food):
"""生产者"""
for i in range(3):
print(f'{name}生产了{food}{i}')
time.sleep(random.randint(1, 3))
res = f'{food}{i}'
q.put(res)
def consumer(q, name):
"""消费者"""
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print(f'{name}吃了{res}')
q.task_done() # 用来计数
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q, 'ocean', '包子'))
p2 = Process(target=producer, args=(q, 'chen', '菜'))
p3 = Process(target=producer, args=(q, 'nick', '酒'))
c1 = Process(target=consumer, args=(q, 'pocky'))
c2 = Process(target=consumer, args=(q, 'mac'))
p1.start()
p2.start()
p3.start()
c1.daemon = True # 定义为收回进程
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
p3.join() # 生产者生产完毕
q.join()
# 生产者生产完毕--这是主程序的最后一行代码,执行后结束---q.join()消费者已经取干净了,没有存在的意思了。
# 这个是主程序的最后一行代码结束,守护进程结束,也是守护进程的概念
###############################
ocean生产了包子0
chen生产了菜0
nick生产了酒0
chen生产了菜1
nick生产了酒1
ocean生产了包子1
chen生产了菜2
pocky吃了菜0
nick生产了酒2
ocean生产了包子2
mac吃了酒0
pocky吃了包子0
mac吃了菜1
mac吃了包子1
mac吃了菜2
pocky吃了酒1
pocky吃了包子2
mac吃了酒2