进程的理解:
1、系统进行资源分配和调度的基本单位,一个具有一定独立功能的程序关于某个数据集合的一次运行活动;
2、它是一个动态的概念,一个活动的实体;
- 狭义定义:
an instance of a computer program that is being executed
即正在运行的程序的实例化对象。 - 广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动,是操作系统进行资源分配和调度的基本单位,是操作系统动态执行的基本单元。
- 狭义定义:
注:其概念的关键点在于
1)、进程是一个实体(动态的),具有自己独立的地址空间,包括:
文本区域(text region):存储处理器执行的代码;
数据区域(data region):存储变量与进程执行期间使用的动态分配的内存;
堆栈(stack region):存储的是程序执行过程中调用的指令与本地变量;
注:正是由于每个进程是一个独立的实体,其中以上所述的三个区域,即每个进程的数据区域以及堆栈是独立的,相互隔离的,所以在多进程中可以保证数据的安全性
2)、编写完的代码,没有运行时,称为程序,
正在运行的代码,称为进程
程序是死的(静态的),进程是活的(动态的)
3、进程的三大状态
- (1) 就绪(Ready)状态
进程创建完成即其他所有资源都已分配完毕,等待cpu调度执行时,称为就绪状态。 - (2) 执行(Running)状态
cpu开始执行该进程时称为执行状态。 - (3) 阻塞(Blocked)状态
由于等待某个事件发生而无法执行时,便是阻塞状态,cpu执行其他进程.例如,等待I/O完成input、申请缓冲区不能满足等等。
如图所示
- (1) 就绪(Ready)状态
CPU 调度进程的方式
- 先来先服务fcfs(first come first server):先来的先执行
- 短作业优先算法:分配的cpu多,先把短的算完
- 时间片轮转算法:每一个任务就执行一个时间片的时间.然后就执行其他的
- 多级反馈队列算法
- 越是时间长的,cpu分配的资源越少,优先级靠后
- 越是时间短的,cpu分配的资源越多
创建进程
导入multiprocessing模块中的Process类
以供后续创建类的时候直接调用
p = Process(target = func, name = process01, args=(5,))
实例化进程对象
Process 类参数介绍
Process 类常⽤⽅法
# 主进程速度快于子进程,join方法可以使得子进程执行结束后,再继续执行主进程中的代码,可以用来同步代码的一致性
import multiprocessing
def func():
print("发送第一份邮件")
if __name__ == "__main__":
p = multiprocessing.Process(target=func)
p.start()
p.join()
print("发送第二份邮件")
# 发送第一份邮件
# 发送第二份邮件
# 多个子进程配合 join 方法实现异步并发
import multiprocessing
def func(index):
print(f"发送第{index}封邮件")
if __name__ == "__main__":
process_list = []
for i in range(10):
p = multiprocessing.Process(target=func, args=(i, ))
p.start()
process_list.append(p)
# p.join() 程序会变成同步阻塞
for i in process_list:
i.join() # 异步并发
print("主进程发最后一封邮件!")
Process类常⽤属性
创建进程的两种方法
# 创建进程的方法一:
# 利用multiprocessing模块提供一个Process类来创建一个进程对象
from multiprocessing import Process
import time
def func(n):
while n > 0:
print(n)
time.sleep(3)
n -= 1
if __name__ == "__main__":
p = Process(target = func, args=(5,))
p.start()
p.join()
# 创建进程的方法二:
# 创建新的进程可以自定义一个类去继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def run(self):
n = 5
while n > 0:
print(n)
time.sleep(3)
n -= 1
if __name__ == "__main__":
p = ClockProcess()
p.start()
p.join()
守护进程
多任务处理方式一:多进程
创建多进程的两种方式:
# 手动创建
from multiprocessing import Process
num = 1
def run1():
global num
num += 5
print("子进程1运行中,num = %d" % (num))
def run2():
global num
num += 10
print("子进程2运行中,num = %d" % (num))
if __name__ == "__main__":
print("父进程启动")
p1 = Process(target=run1)
p2 = Process(target=run2)
print("子进程将要执行")
p1.start()
p2.start()
p1.join()
p2.join()
print("子进程结束")
# 借助旧版进程池创建多进程
from multiprocessing import Pool
import random
import time
def work(num):
print(random.random() * num)
time.sleep(3)
if __name__ == "__main__":
# 实例化进程池对象,设置同一时间内最多可以执行的进程数为3个
# 题中的10个任务都由进程池中的这三个进程轮询执行,不会创建额外 的进程数
# 若不指定则同一时间内可以执行的进程个数默认为cpu逻辑核心数
p = Pool(3)
for i in range(10):
# apply_async 选择要调用的任务,每次循环出来的任务会用闲下来的子进程去执行
# 使⽤⾮阻塞⽅式调⽤func(并⾏执⾏,阻塞⽅式必须为等待上⼀个进程退出后才能执⾏下⼀个进程), args为传递给func的参数列表,kwargs为传递给func的关键字参数列表;
p.apply_async(work, (i,))
# 进程池关闭之后不会再接受新的请求
p.close()
# 等待进程池中的所有子进程都结束
p.join()
# 多进程中,主进程一般用来等待子进程执行完毕,真正的任务都由子进程中执行
# 借助新版进程池创建多进程
from concurrent.futures import ProcessPoolExecutor
import os
import time
def func(i):
print("任务执行中... start", os.getpid())
time.sleep(10)
print("任务结束... end", i)
return i
# ProcessPoolExecutor 进程池基本使用
"""
默认如果一个进程短时间内可以完成更多的任务,就不会创建额外的新的进程,以节省资源
"""
if __name__ == "__main__":
lst = []
print(os.cpu_count()) # cpu逻辑核心数
# 创建进程池对象
"""进程池中默认最多创建cpu这么多个进程,所有任务全由这几个进程完成,不会额外创建进程"""
p = ProcessPoolExecutor()
# 异步提交任务
for i in range(10):
res = p.submit(func, i)
lst.append(res)
# 获取当前进程池返回值
for i in lst:
print(i.result())
# 等待所有子进程执行结束
p.shutdown() # join
print("主程序执行结束....")
进程间通信
进程间数据不共享,他们之间进行数据传递即为通信
from multiprocessing import Queue
Queue 的基本使用
初始化
入队操作(存数据)
出队操作(取数据)
其他操作
python代码实现
from multiprocessing import Queue, Process
import time
def write(q):
for value in ["a", "b", "c"]:
print("开始写入:", value)
q.put(value)
time.sleep(2)
def read(q):
while True:
if not q.empty():
print("读取到的是", q.get())
time.sleep(2)
else:
break
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pw.join() #等待接收完毕
pr.start()
pr.join()
print("接受完毕!")
# 三个进程间通信
from multiprocessing import Process
from multiprocessing import Queue
def func1(q1):
q1.put("你好!")
print(f"子进程p1往队列q1中放入的数据为:你好!")
def func2(q1, q2):
msg = q1.get()
print(f"子进程p2从队列q1中取出的数据为:{msg}")
q2.put(msg)
print(f"子进程p2往队列q2中放入的数据为:{msg}")
def func3(q2):
msg = q2.get()
print(f"子进程p3从队列q2中取出的数据为:{msg}")
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
p1 = Process(target=func1, args=(q1,))
p2 = Process(target=func2, args=(q1, q2))
p3 = Process(target=func3, args=(q2,))
p1.start()
p2.start()
p3.start()
JoinableQueue 的用法
# put 存储
# get 获取
# task_done 队列计数减1
# join 阻塞
# task_done 配合 join 一起使用
# [1,2,3,4,5]
# 队列计数5
# put 一次 每存放一个值,队列计数器加1
# get 一次 通过task_done让队列计数器减1
# join 函数,会根据队列中的计数器来判定是阻塞还是放行
# 如果计数器变量是0,意味着放行,其他情况阻塞;
from multiprocessing import Process,JoinableQueue
jq = JoinableQueue()
# put 会让队列计数器加1
jq.put("a")
print(jq.get())
# 通过task_done,让队列计数器减1
jq.task_done()
# 只有队列计数器是0的时,才会放行
jq.join() # 队列.join
print("finish")
生产者——消费者模型
Queue下的生产者——消费者模型:
# 消费者模型
def consumer(q, name):
while True:
food = q.get()
if food is None:
break
time.sleep(random.uniform(0.1, 1))
print("%s 吃了一个%s" % (name, food))
# 生产者模型
def producer(q, name, food):
for i in range(5):
time.sleep(random.uniform(0.1, 1))
print("%s 生产了 %s%s" % (name, food, i))
q.put(food + str(i))
if __name__ == "__main__":
q = Queue()
# 消费者1
p1 = Process(target=consumer, args=(q, "张三"))
p1.start()
# 消费者2
a2 = Process(target=consumer, args=(q, "李四"))
a2.start()
# 生产者1
p2 = Process(target=producer, args=(q, "王五", "黄金"))
p2.start()
# 生产者2
b2 = Process(target=producer, args=(q, "小明", "钻石"))
b2.start()
# 在生产完所有的数据之后,在队列的末尾塞入一个None
p2.join()
b2.join()
# 消费者模型如果获取的是None,代表停止消费
q.put(None)
q.put(None)
JoinableQueue 下的生产者——消费者模型:
from multiprocessing import Process,JoinableQueue
# 消费者模型
def consumer(q, name):
while True:
food = q.get()
time.sleep(random.uniform(0.1, 1))
print("%s 吃了一个%s" % (name, food))
q.task_done()
# 生产者模型
def producer(q, name, food):
for i in range(5):
time.sleep(random.uniform(0.1, 1))
print("%s 生产了 %s%s" % (name, food, i))
q.put(food + str(i))
if __name__ == "__main__":
q = JoinableQueue()
# 消费者1
p1 = Process(target=consumer, args=(q, "张三"))
p1.daemon = True
p1.start()
# 生产者1
p2 = Process(target=producer, args=(q, "李四", "黄金"))
p2.start()
# 把生产者所有的数据都装载到队列中
p2.join()
# 当队列计数器减到0的时候,会立刻放行
# 必须等待消费者模型中所有的数据都task_done之后,变成0了就代表消费结束.
q.join()
print("程序结束....")
进程池中的进程之间的通信
from multiprocessing import Manager, Pool
import time
def write(q):
for i in "welcome":
print("开始写入", i)
q.put(i)
def read(q):
time.sleep(2)
for i in range(q.qsize()): # q.qsize()获取到当前队列的消息数量!
print("得到消息", q.get())
if __name__ == "__main__":
print("主进程启动!")
q = Manager().Queue()
po = Pool()
po.apply_async(write, (q,))
po.apply_async(read, (q,))
po.close()
po.join()