并发编程2

扫码查看

1、僵尸进程与孤儿进程

僵尸进程(有坏处)

在子进程结束后,主进程没有正常结束, 子进程PID不会被回收

缺点:

操作系统中的PID号是有限的,如有子进程PID号无法正常回收,则会占用PID号。
资源浪费。

若PID号满了,则无法创建新的进程。

孤儿进程(没有坏处)

在子进程没有结束时,主进程没有“正常结束”, 子进程PID不会被回收

操作系统优化机制(孤儿院):
当主进程意外终止,操作系统会检测是否有正在运行的子进程,会他们放入孤儿院中,让操作系统帮你自动回收。

2、守护进程

守护进程会在主进程代码执行结束后就终止

from multiprocessing import Process
# 在子进程中调用,可以拿到子进程对象.pid可以pid号
# 在主进程中调用,可以拿到主进程对象.pid可以pid号

import time

def demo(name):
    print(f'start....{name}')
    time.sleep(1000)
    print(f'end......{name}')
    print('子进程结束啦啊....~~~')

if __name__ == '__main__':
    p = Process(target=demo, args=('童子军jason1号', ))

    # 守护进程必须在p.start()调用之前设置
    p.daemon = True  # 将子进程p设置为守护进程

    # 告诉操作系统帮你开启子进程
    p.start()
    # p.join()

    time.sleep(1)
    print('皇帝驾崩啦啊~~~')

3、进程间数据隔离

进程隔离是为保护操作系统中进程互不干扰

from multiprocessing import Process
n=100

def work():
    global n
    n=0
    print('子进程内: ',n)

if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)

结果:
主进程内:  100
子进程内:  0

4、进程互斥锁

进程同步(multiprocess.Lock)

锁-–multiprocess.Lock

多进程模拟抢票实例

#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩余票数%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('\033[43m购票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模拟并发100个客户端抢票
        p=Process(target=task)
        p.start()
# 引发问题:数据写入错乱

互斥锁保证数据安全

from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩余票数%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random())  # 模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random())  # 模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('\033[32m购票成功\033[0m')
    else:
        print('\033[31m购票失败\033[0m')

def task(lock):
    search()
    lock.acquire()  # 将买票这一环节由并发变成了串行,牺牲了运行效率但是保证了数据的安全
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100):  # 模拟并发100个客户端抢票
        p=Process(target=task,args=(lock,))
        p.start()

总结:加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

问题:虽然可以用文件共享数据显示进程间数据通信但问题是

  • 效率低(共享数据基于文件,而文件是硬盘上的数据)
  • 需要自己加锁处理

针对上述问题,我们需要找到一种更加合理快捷的方式,那就是队列和管道

5、进程间通信(IPC机制)

我们知道进程之间数据是相互隔离的,要想实现进程间的通信(IPC机制),就必须借助于一些技术才可以,比如multiprocessing模块中的:队列和管道,这两种方式都是可以实现进程间数据传输的,由于队列是管道+锁的方式实现,所以我们着重研究队列即可

6、队列

概念介绍

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

大白话总结一下就是队列支持多个人从队列的一端放入数据,同样支持多个人从队列的另一端取数据

三种队列:

from multiprocessing import Queue  # multiprocessing提供队列  先进先出
from multiprocessing import JoinableQueue  # 基于 Queue 封装的队列 先进先出
import queue  # python内置的队列  先进先出

第一种:

# Queue(5)指的是队列中只能存放5份数据
q_obj1 = Queue(5)  # q_obj1队列对象
# # 添加数据到队列中
q_obj1.put('jason')
print('添加1个')
q_obj1.put('hcy')
print('添加1个')

# put: 只要队列满了,会进入阻塞
# q_obj1.put('sean')
# put_nowait: 只要队列满了,就会报错
# q_obj1.put_nowait('sean')

# # get: 只要队列中有数据,就能获取数据,若没有则会进入阻塞
# print(q_obj1.get())
# print(q_obj1.get())

# get_nowait: 若队列中没有数据获取则会报错
# print(q_obj1.get_nowait())

第二种:

q_obj1 = JoinableQueue(5)  # q_obj1队列对象
q_obj1.put('jason')
print('添加1个')
q_obj1.put('hcy')
print('添加1个')
# put_nowait: 只要队列满了,就会报错
# q_obj1.put_nowait('sean')
# get: 只要队列中有数据,就能获取数据,若没有则会进入阻塞
print(q_obj1.get())
print(q_obj1.get())

第三种:

# q_obj1 = queue.Queue(5)  # q_obj1队列对象
# # 添加数据到队列中
# q_obj1.put('jason')
# print('添加1个')
# q_obj1.put('hcy')
# print('添加1个')

# put: 只要队列满了,会进入阻塞
# q_obj1.put('sean')

# put_nowait: 只要队列满了,就会报错
# q_obj1.put_nowait('sean')

# get: 只要队列中有数据,就能获取数据,若没有则会进入阻塞
# print(q_obj1.get())
# print(q_obj1.get())

7、生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据(做包子的)之后不用等待消费者(吃包子的)处理,直接扔给阻塞队列(盘子),消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式

基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃 %s' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('%s 生产了 %s' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('主')
12-13 13:44
查看更多