方法介绍

#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务

obj = p.submit(task,i).result() #相当于apply同步方法
obj = p.submit(task,i) #相当于apply_async异步方法

#map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作 #shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前 #result(timeout=None)
取得结果 #add_done_callback(fn)
回调函数

示例

#介绍
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised. #用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
print(future.result())

ProcessPoolExecutor

#介绍
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. #用法
与ProcessPoolExecutor相同

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time == == == == == == == == == == == == == == == == == == == == == == == ==
例子 def task(i):
time.sleep(1)
print(i) if __name__ == '__main__': p = ThreadPoolExecutor(10)
# p = ProcessPoolExecutor(10)
for row in range(100):
p.submit(task, row) == == == == == == == == == == == == == == == == == == == == == == == == def run(self, host):
server_info = PluginManager(host).exec_plugin()
self.post_asset(server_info) def execute(self):
p = ThreadPoolExecutor(10) # 线程池
host_list = self.get_host()
for host in host_list:
p.submit(self.run, host)
# server_info = PluginManager(host).exec_plugin()
# self.post_asset(server_info)

cmdb项目的某个东东

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11):
# future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit map的用法

map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import time,os
def get_page(url):
print('<%s> is getting [%s]'%(os.getpid(),url))
response = requests.get(url)
if response.status_code==200: #200代表状态:下载成功了
return {'url':url,'text':response.text}
def parse_page(res):
res = res.result()
print('<%s> is getting [%s]'%(os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p = ThreadPoolExecutor()
p = ProcessPoolExecutor()
l = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
]
for url in l:
res = p.submit(get_page,url).add_done_callback(parse_page) #这里的回调函数拿到的是一个对象。得
# 先把返回的res得到一个结果。即在前面加上一个res.result() #谁好了谁去掉回调函数
# 回调函数也是一种编程思想。不仅开线程池用,开线程池也用
p.shutdown() #相当于进程池里的close和join
print('主',os.getpid())

add_done_callback

url_list = [
'http://www.cnblogs.com/wupeiqi/articles/6229292.html',
'http://www.baidu.com',
'http://www.hupu.com',
] import requests def task(url):
res = requests.get(url)
return res.content def callback(future):
print(future.result()) def run(): pool = ThreadPoolExecutor(10)
# pool = ProcessPoolExecutor(10)
# res_list = []
for url in url_list:
res = pool.submit(task,url)
# res_list.append(res)
res.add_done_callback(callback) pool.shutdown(wait=True) # 等待完成才进行后续代码
# for res in res_list:
# print(res.result()) # run()

add_done_callback2(爬虫)

参考

https://docs.python.org/dev/library/concurrent.futures.html

04-25 03:35
查看更多