1   同一个进程内的队列(多线程)    import queue    queue.Queue()  先进先出    queue.LifoQueue() 后进先出    queue.PriorityQueue()  优先级队列       优先级队列 q = queue.PriorityQueue()          q.put()  接收的是一个元组          元组中第一个参数是:表示当前数据的优先级          元组中第二个参数是:需要存放到队列中的数据       优先级的比较(首先保证整个队列中,所有表示优先级的东西类型必须一致)          如果都是int,比数值的大小          如果都是str,比较字符串的大小(从第一个字符的ASCII码开始比较)代码:
from multiprocessing import Queue# 是用于多进程的队列,就是专门用来做进程间通信(IPC)。import queue# 是用于同一进程内的队列,不能做多进程之间的通信

q = queue.Queue()     # 先进先出q.put(1)q.put(2)q.put(3)print(q.get())print(q.get())

q = queue.LifoQueue()    # 后进先出的队列q.put(1)q.put(2)q.put(3)print(q.get())

q = queue.PriorityQueue()# 优先级队列,put()方法接收的是一个元组(),第一个位置是优先级,第二个位置是数据# 优先级如果是数字,直接比较数值# 如果是字符串,是按照 ASCII 码比较的。当ASCII码相同时,会按照先进先出的原则q.put((1,'abc'))q.put((5,'qwe'))q.put((-5,'zxc'))print(q.get())print(q.get())#print(chr(48))

2   线程池    在一个池子里,放固定数量的线程,这些线程等待任务,一旦有任务来,就有线程自发的去执行任务。

#  concurrent.futures 这个模块是异步调用的机制#  concurrent.futures 提交任务都是用submit#  for + submit 多个任务的提交#  shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。进程池线程池效率对比代码:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorfrom multiprocessing import Pool# from multiprocessing import Pool.apply / apply_async
import time

def func(num):    sum = 0    for i in range(num):        sum += i ** 2    print(sum)

if __name__ == '__main__':

    pool的进程池的效率演示    p = Pool(5)    start = time.time()    for i in range(100):        p.apply_async(func,args=(i,))    p.close()    p.join()    print('Pool进程池的效率时间是%s'%(time.time() - start))

    多进程的效率演示    tp = ProcessPoolExecutor(5)    start = time.time()    for i in range(100):        tp.submit(func, i)    tp.shutdown()  # 等效于 进程池中的 close + join    print('进程池的消耗时间为%s' % (time.time() - start))

    多线程的效率    tp = ThreadPoolExecutor(20)    start = time.time()    for i in range(1000):        tp.submit(func,i)    tp.shutdown()# 等效于 进程池中的 close + join    print('线程池的消耗时间为%s'%(time.time() - start))

 结果:针对计算密集的程序来说   不管是Pool的进程池还是ProcessPoolExecutor()的进程池,执行效率相当   ThreadPoolExecutor 的效率要差很多   所以 当计算密集时,使用多进程。
如何把多个任务扔进池中?  要么使用for + submit的方式去提交多个任务  要么直接使用map(func,iterable)方式去提交多个任务
多任务提交代码:
from concurrent.futures import ThreadPoolExecutorimport time

def func(num):    sum = 0    for i in range(num):        sum += i ** 2    print(sum)

t = ThreadPoolExecutor(20)start = time.time()t.map(func,range(1000))# 提交多个任务给池中。  等效于 for + submitt.shutdown()print(time.time() - start)

不同的方式提交多个任务(for+submit  或者 map),拥有不同的拿结果的方式  如果是for+submit的方式提交任务,拿结果用result方法  如果是map的方式提交任务,结果是一个生成器,采用__next__()的方式去拿结果线程池的返回值代码:
from concurrent.futures import ThreadPoolExecutorimport time

def func(num):    sum = 0    # time.sleep(5)    # print(num) # 异步的效果    for i in range(num):        sum += i ** 2    return sum

t = ThreadPoolExecutor(20)

# 下列代码是用map的方式提交多个任务,对应 拿结果的方法是__next__()  返回的是一个生成器对象res = t.map(func,range(1000))t.shutdown()print(res.__next__())print(res.__next__())print(res.__next__())print(res.__next__())

下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是resultres_l = []for i in range(1000):    re = t.submit(func,i)    res_l.append(re)# t.shutdown()[print(i.result()) for i in res_l]在Pool进程池中拿结果,是用get方法。   在ThreadPoolExecutor里边拿结果是用result方法

关于回调函数,不管是Pool进程池的方式,还是ProcessPoolExecutor的方式开启进程池,    回调函数都是由父进程调用关于回调函数,ThreadPoolExecutor    回调函数是子线程调用回调函数代码:
from concurrent.futures import ProcessPoolExecutor  (进程池回调函数)# 不管是ProcessPoolExecutor的进程池  还是Pool的进程池,回调函数都是父进程调用的。import osimport requests

def func(num):    sum = 0    for i in range(num):        sum += i ** 2    return sum

def call_back_fun(res):    # print(res.result(),os.getpid())    print(os.getpid())

if __name__ == '__main__':    print(os.getpid())    t = ProcessPoolExecutor(20)    for i in range(1000):        t.submit(func,i).add_done_callback(call_back_fun)    t.shutdown()

from threading import Thread     (线程池回调函数)from threading import current_thread #相当于线程号from concurrent.futures import ThreadPoolExecutorimport timedef func(i):    sum = 0    sum += i    time.sleep(1)    print('这是在子线程中',current_thread())    return sum

def call_back(sum):    time.sleep(1)    print('这是在回调函数中',sum.result(),current_thread())

if __name__ == '__main__':    t = ThreadPoolExecutor(5)    for i in range(10):        t.submit(func,i).add_done_callback(call_back)    t.shutdown()    print('这是在主线程中',current_thread())
 
 
05-11 11:31