线程的理解
1、操作系统能够进行运算调度的最小单位,即程序执行的最小单位
2、进程负责程序所必须的资源分配(文本区域、数据区域、堆栈区域),一个进程中也经常需要同时做多件事,即要同时运行多个‘子任务’,这些子任务即线程。
3、线程基本不占用系统资源,其只拥有在运行过程中必不可少的资源(如程序计数器、一组寄存器和栈)
4、同一个进程中的所有线程都共享此进程所拥有的全部资源,
5、线程之间的通信主要通过共享所属进程的资源
6、线程的上下文切换很快,资源开销较少,但是相对于进程而言,不够安全,在多个线程共同操作进程的某一资源时,可能会丢失数据
7、线程和进程之间的区别
线程的五种状态
GIL全局解释器锁
线程创建
使用python中的threading模块中的Thread类创建线程
from threading import Thread
threading模块提供的Thread类来创建线程对象
from threading import Thread
import os
def func(num):
print('当前线程{},所归属的进程id号{}'.format(os.getpid(), num))
for i in range(10):
# 异步创建10个子线程
t = Thread(target=func, args=(i,))
t.start()
# 主线程执行任务
print(os.getpid())
自定义类继承Thread类,每次实例化这个类的时候,就等同于实例化线程对象
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, name):
# 手动调用父类的构造方法
super().__init__()
self.name = name
def run(self):
time.sleep(1)
print("当前线程正在执行runing ... ", self.name)
if __name__ == "__main__":
t = MyThread("机器今天会再次爆炸么?")
t.start()
print("主线程执行结束 ... ")
Thread 类中的基本方法
from threading import Thread
import time
def func():
time.sleep(1)
if __name__ == "__main__":
t = Thread(target=func)
t.start()
print(t , type(t))
print(t.is_alive()) # False
print(t.getName())
t.setName("xboyww")
print(t.getName())
from threading import Thread
import time
from threading import currentThread
from threading import enumerate
from threading import activeCount
# 1.currentThread().ident 查看线程id号
def func():
print("子线程id", currentThread().ident, os.getpid())
if __name__ == "__main__":
Thread(target=func).start()
print("主线程id", currentThread().ident, os.getpid())
# 2.enumerate() 返回目前正在运行的线程列表
def func():
print("子线程id", currentThread().ident, os.getpid())
time.sleep(0.5)
if __name__ == "__main__":
for i in range(10):
Thread(target=func).start()
lst = enumerate()
# 子线程10 + 主线程1个 = 11
print(lst ,len(lst))
# 3.activeCount() 返回目前正在运行的线程数量
print(activeCount())
线程池(ThreadPoolExecutor)
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread
def func(i):
print("thread ... start", cthread().ident, i)
time.sleep(3)
print("thread ... end", i)
return cthread().ident
if __name__ == "__main__":
lst = []
setvar = set()
# (1) 创建线程池对象
"""限制线程池最多创建os.cpu_count() * 5 = 线程数,所有任务全由这几个线程完成,不会额外创建线程"""
tp = ThreadPoolExecutor() # 我的电脑40个线程并发
# (2) 异步提交任务
for i in range(100):
res = tp.submit(func, i)
lst.append(res)
# (3) 获取返回值
for i in lst:
setvar.add(i.result())
# (4) 等待所有子线程执行结束
tp.shutdown()
print(len(setvar), setvar)
print("主线程执行结束 ... ")
守护线程
守护线程 : 等待所有线程全部执行完毕之后,再自己终止,守护的是所有线程
线程对象.setDaemon(True)
from threading import Thread
import time
def func1():
while True:
time.sleep(0.5)
print("我是func1")
def func2():
print("我是func2 start ... ")
time.sleep(3)
print("我是func2 end ... ")
t1 = Thread(target=func1)
t2 = Thread(target=func2)
# 在start调用之前,设置守护线程
t1.setDaemon(True)
t1.start()
t2.start()
print("主线程执行结束 ... ")
同步 & 异步
同步
场景1:是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,协同步调,按预定的先后次序进行运行
场景2:一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列
异步
处理调用这个事务之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、回调来通知调用者处理结果
对于I/O相关的程序来说,异步编程可以大幅度的提高系统的吞吐量,因为在某个I/O操作的读写过程中,系统可以先去处理其它的操作(通常是其它的I/O操作)
不确定执行顺序
阻塞 & 非阻塞
阻塞
非阻塞
串行 & 并行 & 并发
串行
A和B两个任务运行在一个CPU线程上,在A任务执行完之前不可以执行B。即,在整个程序的运行过程中,仅存在一个运行上下文,即一个调用栈一个堆。程序会按顺序执行每个指令。
并行
并行指两个或两个以上任务同一时刻被不同的cpu执行。在多道程序环境下,并行性使多个程序同一时刻可在不同CPU上同时执行。比如,A和B两个任务可以同时运行在不同的CPU线程上,效率较高,但受限于CPU线程数,如果任务数量超过了CPU线程数,那么每个线程上的任务仍然是顺序执行的。
并发
并发指多个线程在宏观(相对于较长的时间区间而言)上表现为同时执行,而实际上是轮流穿插着执行,并发的实质是一个物理CPU在若干道程序之间多路复用,其目的是提高有限物理资源的运行效率。 并发与并行串行并不是互斥的概念,如果是在一个CPU线程上启用并发,那么自然就还是串行的,而如果在多个线程上启用并发,那么程序的执行就可以是既并发
图示
线程同步
互斥锁(threading模块中定义的Lock类)
import threading
num = 0
def test1():
global num
# 调用Lock对象的acquire()方法获得锁时,这把锁进入“locked”状态
# 如果此时另一个线程2试图获得这个锁,该线程2就会变为同步阻塞状态
if mutex.acquire():
for i in range(1000):
num += 1
# 调用Lock对象的release()方法释放锁之后,该锁进入“unlocked”状态。
mutex.release()
def test2():
global num
# 线程调度程序继续从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态
if mutex.acquire():
for i in range(1000):
num += 1
mutex.release()
mutex = threading.Lock()
p1 = threading.Thread(target=test1)
p1.start()
p2 = threading.Thread(target=test2)
p2.start()
print(num)
死锁(只上锁,不解锁)
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 线程1被 A 锁——>锁定
if mutexA.acquire():
print(self.name + '---do1---up---')
time.sleep(1)
if mutexB.acquire():
print(self.name + '---do1---down---')
mutexB.release()
# 线程1被 A 锁释放的前提是:线程1 抢到 B 锁
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
time.sleep(1)
# 线程2被 B 锁——>锁定
if mutexB.acquire():
print(self.name + '---do2---up---')
if mutexA.acquire():
print(self.name + '---do2---down---')
mutexA.release()
# 线程2被 B 锁释放的前提是:线程2 抢到 A 锁
mutexB.release()
if __name__ == '__main__':
mutexA = threading.Lock()
mutexB = threading.Lock()
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
# Thread-1---do1---up---
# Thread-2---do2---up---
# 程序卡死
# 线程1不释放A锁
# 线程2不释放B锁
递归锁(threading模块中定义的RLock类)
import threading
import time
class MyThread1(threading.Thread):
def run(self):
if mutexA.acquire():
print(self.name + '---do1---up---')
time.sleep(1)
if mutexB.acquire():
print(self.name + '---do1---down---')
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
time.sleep(1)
if mutexB.acquire():
print(self.name + '---do2---up---')
if mutexA.acquire():
print(self.name + '---do2---down---')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
mutexA = threading.RLock()
mutexB = threading.RLock()
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
# Thread-1---do1---up---
# Thread-1---do1---down---
# Thread-2---do2---up---
# Thread-2---do2---down---
信号量(threading模块中定义的Semaphore类)
import time
import threading
def foo(se):
se.acquire()
time.sleep(2)
print("ok")
se.release()
if __name__ == "__main__":
# 设置同一时间内可以有5个线程并发
se = threading.Semaphore(5)
for i in range(20):
t1 = threading.Thread(target=foo, args=(se,))
t1.start() # 此时可以控制同时进入的线程数
线程队列(queue模块)
queue.Queue:FIFO(先⼊先出) 队列 Queue
# 基本使用
from queue import Queue
# put 存
# get 取
# put_nowait 存,超出了队列长度,报错
# get_nowait 取,没数据取不出来,报错
# linux windows 线程中put_nowait,get_nowait都支持
"""先进先出,后进后出"""
# maxsize为一个整数,表示队列的最大条目数,可用来限制内存的使用。
# 一旦队列满,插入将被阻塞直到队列中存在空闲空间。如果maxsize小于等于0,队列大小为无限。maxsize默认为0
q = Queue(maxsize=0)
q.put(1)
q.put(2)
print(q.get())
print(q.get())
# 取不出来,阻塞
# print(q.get())
print(q.get_nowait())
q2 = Queue(3)
q2.put(11)
q2.put(22)
q2.put(33)
# 放不进去了,阻塞
# q2.put(44)
q2.put_nowait(44)
import threading
import time
from queue import Queue
class Pro(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count = count + 1
msg = '生成产品' + str(count)
queue.put(msg) # 队列中添加新产品
print(msg)
time.sleep(1)
class Con(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消费了' + queue.get()
print(msg)
time.sleep(1)
if __name__ == "__main__":
queue = Queue()
# 创建一个队列。线程中能用,进程中不能使用
for i in range(500): # 创建500个产品放到队列里
queue.put('初始产品' + str(i)) # 字符串放进队列
for i in range(2): # 创建了两个线程
p = Pro()
p.start()
for i in range(5): # 5个线程
c = Con()
c.start()
queue.LifoQueue:LIFO(后⼊先出) 栈 LifoQueue
# LifoQueue 先进后出,后进先出(按照栈的特点设计)
from queue import LifoQueue
lq = LifoQueue(3)
lq.put(11)
lq.put(22)
lq.put(33)
# print(lq.put_nowait(444))
print(lq.get())
print(lq.get())
print(lq.get())
queue.PriorityQueue:(优先级队列) PriorityQueue
# PriorityQueue 按照优先级顺序排序 (默认从小到大排序)
from queue import PriorityQueue
# 如果都是数字,默认从小到大排序
pq = PriorityQueue()
pq.put(13)
pq.put(3)
pq.put(20)
print(pq.get())
print(pq.get())
print(pq.get())
# 如果都是字符串
"""如果是字符串,按照ascii编码排序"""
pq1 = PriorityQueue()
pq1.put("chinese")
pq1.put("america")
pq1.put("latinos")
pq1.put("blackman")
print(pq1.get())
print(pq1.get())
print(pq1.get())
print(pq1.get())
# 要么全是数字,要么全是字符串,不能混合 error
"""
pq2 = PriorityQueue()
pq2.put(13)
pq2.put("aaa")
pq2.put("拟稿")
"""
pq3 = PriorityQueue()
# 默认按照元组中的第一个元素排序
pq3.put( (20,"wangwen") )
pq3.put( (18,"wangzhen") )
pq3.put( (30,"weiyilin") )
pq3.put( (40,"xiechen") )
print(pq3.get())
print(pq3.get())
print(pq3.get())
print(pq3.get())
生产消费者模式
进程(线程)之间如果直接通信,可能会出现两个问题
- 耦合性太强
- 速率有可能不匹配
解决方式,找一个缓冲区来中转数据即生产者——消费者模式
线程异步
通过回调函数可以实现多线程异步执行
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread as cthread
import os, time
def func1(i):
print("Process start ... ", os.getpid())
time.sleep(0.5)
print("Process end ... ", i)
return "*" * i
def func2(i):
print("thread start ... ", cthread().ident)
time.sleep(0.5)
print("thread end ... ", i)
return "*" * i
def call_back1(obj):
print("<==回调函数callback进程号:===>", os.getpid())
print(obj.result())
def call_back2(obj):
print("<==回调函数callback线程号:===>", cthread().ident)
print(obj.result())
# (1) 进程池的回调函数: 由主进程执行调用完成
if __name__ == "__main__":
p = ProcessPoolExecutor(5)
for i in range(1, 11):
res = p.submit(func1, i)
# 进程对象.add_done_callback(回调函数)
'''
add_done_callback 可以把res本对象和回调函数自动传递到函数里来
'''
res.add_done_callback(call_back1)
p.shutdown()
print("主进程执行结束 ... ", os.getpid())
# (2) 线程池的回调函数: 由当前子线程执行调用完成
if __name__ == "__main__":
tp = ThreadPoolExecutor(5)
for i in range(1, 11):
res = tp.submit(func2, i)
# 线程对象.add_done_callback(回调函数)
'''
add_done_callback 可以把res本对象和回调函数自动传递到函数里来
'''
res.add_done_callback(call_back2)
tp.shutdown()
print("主线程执行结束 ... ", cthread().ident)
from multiprocessing import Pool
import random
import time
def download(f):
for i in range(1, 4):
print(f"{f}下载文件{i}")
time.sleep(random.randint(1, 3))
return "下载完成"
def alterUser(msg):
print(msg)
if __name__ == "__main__":
p = Pool(3)
# 当func执行完毕后,return的东西会给到回调函数callback
p.apply_async(func=download, args=("线程1",), callback=alterUser)
p.apply_async(func=download, args=("线程2",), callback=alterUser)
p.apply_async(func=download, args=("线程3",), callback=alterUser)
p.close()
p.join()