Python 多进程解析:Multiprocessing 高效并行处理的奥秘
在 Python 编程中,多进程(Multiprocessing)是一种提高程序执行效率的重要手段。本文深入解析了多进程的概念与应用,帮助开发者充分利用多核处理器的计算能力。我们从基本的进程创建与启动开始,讲解了如何通过 Queue
实现进程间的数据传递,并通过对比多进程与多线程的性能差异,揭示了多进程在处理 CPU 密集型任务时的显著优势。文章还详细介绍了进程池(Pool)的使用方法,包括 map
和 apply_async
的不同应用场景。最后,我们探讨了共享内存和进程锁的使用,确保多进程在并发操作中的数据安全性。本文为希望掌握多进程编程的读者提供了全面且易懂的实践指导。
一 多进程
Multiprocessing 是一种编程和执行模式,它允许多个进程同时运行,以此提高应用程序的效率和性能。在 Python 中,multiprocessing 模块可以帮助你创建多个进程,使得每个进程都可以并行处理任务,从而有效利用多核处理器的能力。
1 导入进程标准模块
import multiprocessing as mp
2 定义调用函数
def job(a, d):
print('你好 世界')
3 创建和启动进程
# 创建进程
p1 = mp.Process(target=job, args=(1, 2))
# 启动进程
p1.start()
# 连接进程
p1.join()
二 存储进程结果 Queue
1 存入输出到 Queue
# 该函数没有返回值!!!
def job02(q):
res = 0
for i in range(1000):
res += i + i ** 2 + i ** 3
q.put(res) #
def my_result_process02():
q = mp.Queue()
p1 = mp.Process(target=job02, args=(q,))
p2 = mp.Process(target=job02, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1)
print(res2)
print(res1 + res2)
三 threading & multiprocessing 对比
1 创建多进程 multiprocessing
def job03(q):
res = 0
for i in range(1000000):
res += i + i ** 2 + i ** 3
# 结果加 queue
q.put(res)
# 多核运算多进程
def multicore03():
q = mp.Queue()
p1 = mp.Process(target=job03, args=(q,))
p2 = mp.Process(target=job03, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:', res1 + res2)
2 创建多线程 multithread
# 单核运算多线程
def multithread03():
# thread可放入process同样的queue中
q = mp.Queue()
t1 = td.Thread(target=job03, args=(q,))
t2 = td.Thread(target=job03, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:', res1 + res2)
3 创建普通函数
def normal03():
res = 0
for _ in range(2):
for i in range(1000000):
res += i + i ** 2 + i ** 3
print('normal:', res)
4 创建对比时间函数
def time_result03():
st = time.time()
normal03()
st1 = time.time()
print('normal time:', st1 - st)
multithread03()
st2 = time.time()
print('multithread time:', st2 - st1)
multicore03()
print('multicore time:', time.time() - st2)
5 运行结果
normal03: 499999666667166666000000
normal03 time: 0.6855959892272949
multithread03: 499999666667166666000000
multithread03 time: 0.6804449558258057
multicore03: 499999666667166666000000
multicore03 time: 0.38849496841430664
我运行的是 normal03 > multithread03 > multicore03
,normal03
和 multithread03
相差不大,multicore03
比 normal03
和 multithread03
快将近一倍。
四 进程池 Pool
使用进程池 Pool ,Python 会自行解决多进程问题。
1 进程池 Pool() 和 map()
map()
返回的是多结果。
def job04(x):
# Pool的函数有返回值
return x * x
def multicore04():
# Pool的函数有返回值
pool = mp.Pool()
# 自分配 CPU 计算
res = pool.map(job04, range(10))
print(res)
2 自定义核数量
Pool 默认大小是 CPU的核数,传入 processes 参数自定义需要的核数量。
def multicore05():
# 定义CPU核数量为3
pool = mp.Pool(processes=3)
res = pool.map(job04, range(10))
print(res)
3 apply_async 单结果返回
apply_async()
中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的, 所以在传入值后需要加逗号, 同时需要用 get()
方法获取返回值。
def multicore06():
pool = mp.Pool()
res = pool.apply_async(job04, (2,))
# 用get获得结果
print(res.get())
4 apply_async 多结果返回
def multicore07():
pool = mp.Pool()
multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
# 用get获得结果
print([res.get() for res in multi_res])
5 划重点
Pool
默认调用是CPU
的核数,传入processes
参数可自定义CPU
核数。map()
放入迭代参数,返回多个结果。apply_async()
只能放入一组参数,并返回一个结果,如果想得到map()
的效果需要通过迭代。
五 共享内存 shared memory
1 定义 Shared Value
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
2 定义 Shared Array
它只能是一维数组
array = mp.Array('i', [1, 2, 3, 4])
其中 d 和 i 参数用来设置数据类型的,d 表示一个双精浮点类型,i 表示一个带符号的整型,参考数据类型如下:
具体链接:Efficient arrays of numeric values
六 进程锁 Lock
1 不加进程锁
争抢共享内存
def job08(v, num):
for _ in range(5):
time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
v.value += num # v.value获取共享变量值
print(v.value, end="\n")
def multicore08():
v = mp.Value('i', 0) # 定义共享变量
p1 = mp.Process(target=job08, args=(v, 1))
p2 = mp.Process(target=job08, args=(v, 3)) # 设定不同的number看如何抢夺内存
p1.start()
p2.start()
p1.join()
p2.join()
2 加进程锁
def job09(v, num, l):
l.acquire() # 锁住
for _ in range(5):
# print(v.value, num)
time.sleep(0.1)
v.value = v.value + num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore09():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入
p1.start()
p1.join()
p2 = mp.Process(target=job09, args=(v, 3, l))
p2.start()
p2.join()
# def multicore10():
# l = mp.Lock() # 定义一个进程锁
# v = mp.Value('i', 0) # 定义共享内存
# p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入
# p2 = mp.Process(target=job09, args=(v, 3, l))
# p1.start()
# p2.start()
# p1.join()
# p2.join()
在这个示例中,必须先执行 p1
以达到预期效果。分别运行 multicore09
和 multicore10
会发现一些有意思的情况。
七 完整代码示例
注:建议在运行 main.py 对应的代码功能时,逐行使用注释进行操作。
# This is a sample Python script.
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import multiprocessing as mp
import threading as td
import time as time
def print_hi(name):
# Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press ⌘F8 to toggle the breakpoint.
# 创建进程
p1 = mp.Process(target=job, args=(1, 2))
# 启动进程
p1.start()
# Shared Value
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
# Shared Array,只能是一维数组
array = mp.Array('i', [1, 2, 3, 4])
def job(a, d):
print('你好 世界')
# 该函数没有返回值!!!
def job02(q):
res = 0
for i in range(1000):
res += i + i ** 2 + i ** 3
q.put(res) #
def my_result_process02():
q = mp.Queue()
p1 = mp.Process(target=job02, args=(q,))
p2 = mp.Process(target=job02, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1)
print(res2)
print(res1 + res2)
def job03(q):
res = 0
for i in range(1000000):
res += i + i ** 2 + i ** 3
# 结果加 queue
q.put(res)
# 多核运算多进程
def multicore03():
q = mp.Queue()
p1 = mp.Process(target=job03, args=(q,))
p2 = mp.Process(target=job03, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore03:', res1 + res2)
# 单核运算多线程
def multithread03():
# thread可放入process同样的queue中
q = mp.Queue()
t1 = td.Thread(target=job03, args=(q,))
t2 = td.Thread(target=job03, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread03:', res1 + res2)
def normal03():
res = 0
for _ in range(2):
for i in range(1000000):
res += i + i ** 2 + i ** 3
print('normal03:', res)
def time_result03():
st = time.time()
normal03()
st1 = time.time()
print('normal03 time:', st1 - st)
multithread03()
st2 = time.time()
print('multithread03 time:', st2 - st1)
multicore03()
print('multicore03 time:', time.time() - st2)
def job04(x):
# Pool的函数有返回值
return x * x
def multicore04():
# Pool的函数有返回值
pool = mp.Pool()
# 自分配 CPU 计算
res = pool.map(job04, range(10))
print(res)
def multicore05():
pool = mp.Pool(processes=3) # 定义CPU核数量为3
res = pool.map(job04, range(10))
print(res)
def multicore06():
pool = mp.Pool()
# apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,
# 所以在传入值后需要加逗号, 同时需要用get()方法获取返回值
res = pool.apply_async(job04, (2,))
# 用get获得结果
print(res.get())
def multicore07():
pool = mp.Pool()
multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
# 用get获得结果
print([res.get() for res in multi_res])
def job08(v, num):
for _ in range(5):
time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
v.value += num # v.value获取共享变量值
print(v.value, end="\n")
def multicore08():
v = mp.Value('i', 0) # 定义共享变量
p1 = mp.Process(target=job08, args=(v, 1))
p2 = mp.Process(target=job08, args=(v, 3)) # 设定不同的number看如何抢夺内存
p1.start()
p2.start()
p1.join()
p2.join()
def job09(v, num, l):
l.acquire() # 锁住
for _ in range(5):
# print(v.value, num)
time.sleep(0.1)
v.value = v.value + num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore09():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入
p1.start()
p1.join()
p2 = mp.Process(target=job09, args=(v, 3, l))
p2.start()
p2.join()
def multicore10():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job09, args=(v, 1, l)) # 需要将lock传入
p2 = mp.Process(target=job09, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
print_hi('什么是 Multiprocessing')
my_result_process02()
time_result03()
multicore04()
multicore05()
multicore06()
multicore07()
multicore08()
multicore09()
# multicore10()
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。
Hi, 什么是 Multiprocessing
你好 世界
249833583000
249833583000
499667166000
normal03: 499999666667166666000000
normal03 time: 0.7139420509338379
multithread03: 499999666667166666000000
multithread03 time: 0.6696178913116455
multicore03: 499999666667166666000000
multicore03 time: 0.3917398452758789
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
3
4
7
8
11
12
1515
16
19
1
2
3
4
5
8
11
14
17
20
八 源码地址
代码地址:
国内看 Gitee 之 什么是 Multiprocessing.py
国外看 GitHub 之 什么是 Multiprocessing.py
引用 莫烦 Python