Python重试模块retrying
参考:
https://segmentfault.com/a/1190000004085023
https://pypi.org/project/retrying/
最初的版本
import requests
class ProxyUtil:
def __init__(self):
self._get_proxy_count = 0
def get_proxies(self):
try:
r = requests.get('代理服务器地址')
# print('正在获取')
# raise Exception("异常")
# print('获取到最新代理 = %s' % r.text)
params = dict()
if r and r.status_code == 200:
proxy = str(r.content, encoding='utf-8')
params['http'] = 'http://' + proxy
params['https'] = 'https://' + proxy
else:
raise Exception("获取代理失败,状态码%s"%(r.status_code))
return params
except Exception:
if self._get_proxy_count < 5:
print('第%d次获取代理失败,准备重试' % self._get_proxy_count)
self._get_proxy_count += 1
self.get_proxies()
else:
print('第%d次获取代理失败,退出' % self._get_proxy_count)
self._get_proxy_count = 0
return dict()
if __name__ == '__main__':
proxy = ProxyUtil()
proxy.get_proxies()
下面来试试retrying模块
安装pip install retrying
import requests
from retrying import retry
class ProxyUtil:
def __init__(self):
self._get_proxy_count = 0
@retry
def get_proxies(self):
r = requests.get('代理地址')
print('正在获取')
raise Exception("异常")
print('获取到最新代理 = %s' % r.text)
params = dict()
if r and r.status_code == 200:
proxy = str(r.content, encoding='utf-8')
params['http'] = 'http://' + proxy
params['https'] = 'https://' + proxy
if __name__ == '__main__':
proxy = ProxyUtil()
proxy.get_proxies()
结果:
# 设置最大重试次数
@retry(stop_max_attempt_number=5)
def get_proxies(self):
r = requests.get('代理地址')
print('正在获取')
raise Exception("异常")
print('获取到最新代理 = %s' % r.text)
params = dict()
if r and r.status_code == 200:
proxy = str(r.content, encoding='utf-8')
params['http'] = 'http://' + proxy
params['https'] = 'https://' + proxy
# 设置方法的最大延迟时间,默认为100毫秒(是执行这个方法重试的总时间)
@retry(stop_max_attempt_number=5,stop_max_delay=50)
# 通过设置为50,我们会发现,任务并没有执行5次才结束!
# 添加每次方法执行之间的等待时间
@retry(stop_max_attempt_number=5,wait_fixed=2000)
# 随机的等待时间
@retry(stop_max_attempt_number=5,wait_random_min=100,wait_random_max=2000)
# 每调用一次增加固定时长
@retry(stop_max_attempt_number=5,wait_incrementing_increment=1000)
# 根据异常重试,先看个简单的例子
def retry_if_io_error(exception):
return isinstance(exception, IOError)
@retry(retry_on_exception=retry_if_io_error)
def read_a_file():
with open("file", "r") as f:
return f.read()
# 定义一个函数用于判断返回的是否是IOError
def wraper(args):
return isinstance(args,IOError)
class ProxyUtil:
def get_proxies(self):
r = requests.get('http://47.98.163.40:17000/get?country=local')
print('正在获取')
raise IOError
# raise IndexError
print('获取到最新代理 = %s' % r.text)
params = dict()
if r and r.status_code == 200:
proxy = str(r.content, encoding='utf-8')
params['http'] = 'http://' + proxy
params['https'] = 'https://' + proxy
# @retry_handler(retry_time=2, retry_interval=5, retry_on_exception=[IOError,IndexError])
@retry(stop_max_attempt_number=5,retry_on_exception=wraper)
def retry_test(self):
self.get_proxies()
print('io')
# 通过返回值判断是否重试
def retry_if_result_none(result):
"""Return True if we should retry (in this case when result is None), False otherwise"""
# return result is None
if result =="111":
return True
@retry(stop_max_attempt_number=5,retry_on_result=retry_if_result_none)
def might_return_none():
print("Retry forever ignoring Exceptions with no wait if return value is None")
return "111"
might_return_none()
def retry_handler(retry_time: int, retry_interval: float, retry_on_exception: [BaseException], *args, **kwargs):
def is_exception(exception: [BaseException]):
for exp in retry_on_exception:
if isinstance(exception,exp):
return True
return False
# return isinstance(exception, retry_on_exception)
def _retry(*args, **kwargs):
return Retrying(wait_fixed=retry_interval * 1000).fixed_sleep(*args, **kwargs)
return retry(
wait_func=_retry,
stop_max_attempt_number=retry_time,
retry_on_exception=is_exception
)
class ProxyUtil:
def get_proxies(self):
r = requests.get('代理地址')
print('正在获取')
raise IOError
# raise IndexError
print('获取到最新代理 = %s' % r.text)
params = dict()
if r and r.status_code == 200:
proxy = str(r.content, encoding='utf-8')
params['http'] = 'http://' + proxy
params['https'] = 'https://' + proxy
@retry_handler(retry_time=2, retry_interval=5, retry_on_exception=[IOError,IndexError])
# @retry(stop_max_attempt_number=5,retry_on_exception=wraper)
def retry_test(self):
self.get_proxies()
print('io')
if __name__ == '__main__':
proxy = ProxyUtil()
proxy.retry_test()