# -*- coding: utf-8 -*-
# @Time : 2018/12/26 9:55 PM
# @Author : cxa
# @Software: PyCharm
import asyncio
import aiohttp
from db.mongohelper import save_data
import hashlib
import pathlib
import ujson
from logger.log import crawler
from utils import proxy_helper
from retrying import retry
from itertools import islice try:
import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
sem = asyncio.Semaphore(1000)
url = "https://xxx.xxx.com" @retry(stop_max_attempt_number=5)
def get_proxy():
proxy = proxy_helper.get_proxy()
host = proxy.get('ip')
port = proxy.get('port')
ip = f"http://{host}:{port}"
return ip async def fetch(item, session, proxy, retry_index=0):
try:
name = item
sf = get_md5(name)
data = {"kw": name, "signinfo": sf}
async with session.post(url, data=data, proxy=proxy, verify_ssl=False) as req:
res_status = req.status
if res_status == 200:
data = ujson.loads(await req.text())
searchdata = data.get("searchResult")
if searchdata:
await save_data(searchdata)
else:
crawler.info(f'<search_name: {name}>, data: {data},') except IndexError as e:
print(f"<出错时候的数据:{seq}>,<原因: e>")
except Exception as e:
data = None
crawler.error(f"<Error: {url} {str(e)}>")
if not data:
crawler.info(f'<Retry url: {url}>, Retry times: {retry_index+1}')
retry_index += 1
proxy = get_proxy()
return await fetch(item, session, proxy, retry_index) async def bound_fetch(item, session, proxy):
async with sem:
await fetch(item, session, proxy) async def print_when_done(tasks):
[await _ for _ in limited_as_completed(tasks, 2000)] async def run(data):
async with aiohttp.ClientSession() as session:
proxy = get_proxy()
coros = (asyncio.ensure_future(bound_fetch(item, session, proxy)) for item in data)
await print_when_done(coros) def limited_as_completed(coros, limit):
futures = [
asyncio.ensure_future(c)
for c in islice(coros, 0, limit)
] async def first_to_finish():
while True:
await asyncio.sleep(0.01)
for f in futures:
if f.done():
futures.remove(f)
try:
newf = next(coros)
futures.append(
asyncio.ensure_future(newf))
except StopIteration as e:
pass
return f.result() while len(futures) > 0:
yield first_to_finish() def get_use_list():
fname = pathlib.Path.joinpath(pathlib.Path.cwd(), "namelist.txt")
with open(fname, encoding='utf-8') as fs:
data = (i.strip() for i in fs.readlines())
return data def get_md5(key):
m = hashlib.md5()
m.update(f'{key}0jjj890j0369dce05f9'.encode('utf-8'))
a = m.hexdigest()
return a if __name__ == '__main__':
crawler.info("开始下载")
data = get_use_list()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(data))
loop.close()
05-11 19:28