写了一个爬虫工具类。
# -*- coding: utf-8 -*-
# @Time : 2018/8/7 16:29
# @Author : cxa
# @File : utils.py
# @Software: PyCharm
from retrying import retry
from decorators.decorators import decorator, parse_decorator
from glom import glom
from config import headers
import datetime
import hashlib
from tomorrow import threads
from requests_html import HTMLSession
try:
import simplejson as json
except ImportError:
import json
class MetaSingleton(type):
_inst = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._inst:
cls._inst[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs)
return cls._inst[cls]
class Get_Proxies(metaclass=MetaSingleton):
ip = None
def getproxy(self, change_proxy):
if self.ip is None:
self.ip = self.get_ip(HTMLSession())
self.proxies = {
'http': self.ip,
'https': self.ip
}
if change_proxy:
self.ip = self.get_ip(HTMLSession())
self.proxies = {
'http': self.ip,
'https': self.ip
}
return self.proxies
def get_ip(self, session):
url = 'ip'
req = session.get(url)
if req.status_code == 200:
jsonstr = req.json()
isok = glom(jsonstr, "resCode")
if isok == "0000":
key = glom(jsonstr, ('reData', ['key']))[0]
uname = glom(jsonstr, ('reData', ['username']))[0]
passwd = glom(jsonstr, ('reData', ['password']))[0]
proxies = f"http://{uname}:{passwd}@{key}"
return proxies
@retry(stop_max_attempt_number=5, wait_random_min=3000, wait_random_max=7000)
@decorator
def post_html(session, post_url: int, post_data: dict, headers=headers, timeout=30):
'''
:param session: 传入session对象
:param post_url: post请求需要的url
:param headers: 报头信息,config模块默认提供
:param post_data: post信息 字典类型
:param timeout:
:return:
'''
post_req = session.post(url=post_url, headers=headers, data=post_data, timeout=timeout, proxies=get_proxies())
if post_req.status_code == 200:
post_req.encoding = post_req.apparent_encoding
# time.sleep(random.randint(1, 3))
return post_req
# 随机等待1-3s
@retry(stop_max_attempt_number=5, wait_random_min=3000, wait_random_max=7000)
@decorator
def get_response(session, url: str, params=None, headers=headers, timeout=10):
'''
获取response
:param url:链接
:return: return response object
'''
try:
req = session.get(url=url, headers=headers, params=params, timeout=timeout, proxies=get_proxies())
except:
req = session.get(url=url, headers=headers, params=params, timeout=timeout, proxies=get_proxies(True))
if req.status_code == 200:
req.encoding = req.apparent_encoding
# time.sleep(random.randint(1, 3))
return req
# 随机等待1-3s
@decorator
def get_html(req):
'''
获取html类型的网页格式
:param req:
:return:
'''
source = req.text
return source
@decorator
def get_json(req):
'''
获取json类型的网页格式
:param req: response对象
:return:
'''
try:
jsonstr = req.json()
except:
source = get_html(req)
if source.endswith(';'):
jsonstr = json.loads(source.replace(';', ''))
return jsonstr
@parse_decorator(None)
def get_xpath(req, xpathstr: str):
'''
xpath操作获取节点
:param req:response对象
:param xpathstr:
:return:
'''
node = req.html.xpath(xpathstr)
return node
@decorator
def get_link(node):
'''
获取当前节点的链接
:param req:response对象
:return:返回绝对链接
'''
return list(node.absolute_links)[0]
@parse_decorator(None)
def get_text(node):
'''
获取当前节点下的文本
:param req:response对象
:param xpathstr:xpath表达式
:return:
'''
return node.text
@parse_decorator(None)
def get_all_text(node):
'''
获取该节点包括其子节点下的所有文本
:param req:response对象
:param xpathstr:xpath表达式
:return:
'''
if isinstance(node, list):
return node[0].full_text
else:
return node.full_text
@decorator
def get_json_data(jsonstr: str, pat: str):
'''
#通过glom模块操作数据
:param jsonstr:json字符串
:param pat:模板
:return:
'''
item = glom(jsonstr, pat)
return item
@decorator
def get_hash_code(key):
'''
获取字符串hash值,md5加密
:param key:
:return:
'''
value = hashlib.md5(key.encode('utf-8')).hexdigest()
return value
@parse_decorator(None)
def get_next_node(node, xpathstr):
'''
当前节点下面操作xpath
:param node: 节点
:param xpathstr: xpath表达式
:return:
'''
next_node = node[0].xpath(xpathstr)
if next_node:
return next_node
@decorator
def get_datetime_from_unix(unix_time):
'''
时间戳转时间格式
:param unix_time:
:return:
'''
unix_time_value = unix_time
if not isinstance(unix_time_value, int):
unix_time_value = int(unix_time)
new_datetime = datetime.datetime.fromtimestamp(unix_time_value)
return new_datetime
def get_proxies(change_proxy=False):
ip = Get_Proxies().getproxy(change_proxy)
return ip
@decorator
@threads(20)
@retry(stop_max_attempt_number=5)
def async_get_response(session, url: str, headers=headers, timeout=10):
'''
获取response
:param url:链接
:return: return response object
'''
try:
req = session.get(url=url, headers=headers, timeout=timeout, proxies=get_proxies())
except:
req = session.get(url=url, headers=headers, timeout=timeout, proxies=get_proxies(True))
# if req.status_code==200:
# req.encoding=req.apparent_encoding
# #time.sleep(random.randint(1, 3))
return req
if __name__ == '__main__':
print(get_proxies())
以下是headers文件的内容
import random
first_num = random.randint(55, 62)
third_num = random.randint(0, 3200)
fourth_num = random.randint(0, 140)
class FakeChromeUA:
os_type = [
'(Windows NT 6.1; WOW64)', '(Windows NT 10.0; WOW64)', '(X11; Linux x86_64)',
'(Macintosh; Intel Mac OS X 10_12_6)'
]
chrome_version = 'Chrome/{}.0.{}.{}'.format(first_num, third_num, fourth_num)
@classmethod
def get_ua(cls):
return ' '.join(['Mozilla/5.0', random.choice(cls.os_type), 'AppleWebKit/537.36',
'(KHTML, like Gecko)', cls.chrome_version, 'Safari/537.36']
)
headers = {
'User-Agent': FakeChromeUA.get_ua(),
'Accept-Encoding': 'gzip, deflate, sdch',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Connection': 'keep-alive'
}
以下是logger文件的内容
# -*- coding: utf-8 -*-
import os
import time
import logging
import sys
log_dir1=os.path.join(os.path.dirname(os.path.dirname(__file__)),"logs")
today = time.strftime('%Y%m%d', time.localtime(time.time()))
full_path=os.path.join(log_dir1,today)
if not os.path.exists(full_path):
os.makedirs(full_path)
log_path=os.path.join(full_path,"t.log")
def get_logger():
# 获取logger实例,如果参数为空则返回root logger
logger = logging.getLogger("t")
if not logger.handlers:
# 指定logger输出格式
formatter = logging.Formatter('%(asctime)s %(levelname)-8s: %(message)s')
# 文件日志
file_handler = logging.FileHandler(log_path,encoding="utf8")
file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.formatter = formatter # 也可以直接给formatter赋值
# 为logger添加的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 指定日志的最低输出级别,默认为WARN级别
logger.setLevel(logging.INFO)
# 添加下面一句,在记录日志之后移除句柄
return logger