Scrapy爬虫笔记 - 爬取知乎-LMLPHP

Scrapy爬虫笔记 - 爬取知乎-LMLPHP

cookie是一种本地存储机制,cookie是存储在本地的

session其实就是将用户信息用户名、密码等)加密成一串字符串,返回给浏览器,以后浏览器每次请求都带着这个sessionId

Scrapy爬虫笔记 - 爬取知乎-LMLPHP

状态码一般是服务器自己定义,也可以框架定义,也可以自己定义

F12 NetWork 下可以看到每个请求的状态码

301永久性重定向,比如更换了 域名,但又希望原域名可以请求的到

302临时性重定向,比如未登录状态下点击个人中心,会重定向到登陆页面

404一般是url非法,当然这种情况也可以返回200的空页面,但是这样不太好,因为404可以被过滤掉

500一般是服务器中某个函数出错了,但是又没有捕获异常,一般开发框架会处理

503的状况一般暂停爬虫,一般只爬取200,也不爬404

如果要爬知乎,先登录

可以通过输入错误的用户名密码,在network中找到posturl和post参数

requests.get() 默认加的header是python2或者python3,直接请求会报500

所以requests时需要加header

agent = "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0"
header = {
"HOST":"www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': agent
} response = session.get("https://www.zhihu.com", headers=header)

模拟登陆时,不是直接用requests,而且用requests的session,session代表某一次连接,这样就不需要每次requests.get时都建立连接,效率是更高的

session = requests.session()

response = session.get("https://www.zhihu.com", headers=header)

后面所以的requests都可以换成session

session.cookies没有save()方法,可以用session.cookies = cookielib.LWPCookieJar()实例化出来的cookies可以有save()

知乎模拟登陆代码(未使用scrapy)

# -*- coding: utf-8 -*-
__author__ = 'bobby' import requests
try:
import cookielib
except:
import http.cookiejar as cookielib import re session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename="cookies.txt")
try:
session.cookies.load(ignore_discard=True)
except:
print ("cookie未能加载") agent = "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0"
header = {
"HOST":"www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': agent
} def is_login():
#通过个人中心页面返回状态码来判断是否为登录状态
inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773"
response = session.get(inbox_url, headers=header, allow_redirects=False)
if response.status_code != 200:
return False
else:
return True def get_xsrf():
#获取xsrf code
response = session.get("https://www.zhihu.com", headers=header)
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response.text)
if match_obj:
return (match_obj.group(1))
else:
return "" def get_index():
response = session.get("https://www.zhihu.com", headers=header)
with open("index_page.html", "wb") as f:
f.write(response.text.encode("utf-8"))
print ("ok") def get_captcha():
import time
t = str(int(time.time()*1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
t = session.get(captcha_url, headers=header)
with open("captcha.jpg","wb") as f:
f.write(t.content)
f.close() from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass captcha = input("输入验证码\n>")
return captcha def zhihu_login(account, password):
#知乎登录
if re.match("^1\d{10}",account):
print ("手机号码登录")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": get_xsrf(),
"phone_num": account,
"password": password,
"captcha":get_captcha()
}
else:
if "@" in account:
#判断用户名是否为邮箱
print("邮箱方式登录")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": get_xsrf(),
"email": account,
"password": password
} response_text = session.post(post_url, data=post_data, headers=header)
session.cookies.save() zhihu_login("", "admin123")
# get_index()
is_login() # get_captcha()

zhihu_login_requests.py

scrapy的spider的入口是start_requests
所以需要重写start_requests

异步UI
def start_requests(self):
return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]

callback不加括号是因为传递的是这个函数的对象,它是会被调用的,如果你现在加了括号,那就代表现在就调用它,它就会返回这个函数的值给你,,所以只需要传递函数名过去就好了

通过robot.txt会判断哪些页面会过滤掉

ROBOTSTXT_OBEY = False 不遵守ROBOTS协议

正则表达式默认只匹配一行
加re.DOTALL可以匹配全文

Scrapy shell 如何添加 User_Agent

scrapy shell -s USER_AGENT="Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/60.0" https://blog.csdn.net/weixin_42471384/article/details/81556531
# -*- coding: utf-8 -*-
import re
import json
import datetime try:
import urlparse as parse
except:
from urllib import parse import scrapy
from scrapy.loader import ItemLoader
from items import ZhihuQuestionItem, ZhihuAnswerItem class ZhihuSpider(scrapy.Spider):
name = "zhihu"
allowed_domains = ["www.zhihu.com"]
start_urls = ['https://www.zhihu.com/'] #question的第一页answer的请求url
start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccollapsed_counts%2Creviewing_comments_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.is_blocking%2Cis_blocked%2Cis_followed%2Cvoteup_count%2Cmessage_thread_token%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}" headers = {
"HOST": "www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0"
} custom_settings = {
"COOKIES_ENABLED": True
} def parse(self, response):
"""
提取出html页面中的所有url 并跟踪这些url进行一步爬取
如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
"""
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls = filter(lambda x:True if x.startswith("https") else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
#如果提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
else:
#如果不是question页面则直接进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse) def parse_question(self, response):
#处理question页面, 从页面中提取出具体的question item
if "QuestionHeader-title" in response.text:
#处理新版本
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2)) item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text") question_item = item_loader.load_item()
else:
#处理老版本页面的item提取
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2)) item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
# item_loader.add_css("title", ".zh-question-title h2 a::text")
item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
item_loader.add_css("content", "#zh-question-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", "#zh-question-answer-num::text")
item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
# item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
item_loader.add_css("topics", ".zm-tag-editor-labels a::text") question_item = item_loader.load_item() yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item def parse_answer(self, reponse):
#处理question的answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"] #提取answer的具体字段
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now() yield answer_item if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer) def start_requests(self):
return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)] def login(self, response):
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1)) if xsrf:
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": xsrf,
"phone_num": "",
"password": "",
"captcha": ""
} import time
t = str(int(time.time() * 1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha) def login_after_captcha(self, response):
with open("captcha.jpg", "wb") as f:
f.write(response.body)
f.close() from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass captcha = input("输入验证码\n>") post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/phone_num"
post_data["captcha"] = captcha
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)] def check_login(self, response):
#验证服务器的返回数据判断是否成功
text_json = json.loads(response.text)
if "msg" in text_json and text_json["msg"] == "登录成功":
for url in self.start_urls:
yield scrapy.Request(url, dont_filter=True, headers=self.headers)

zhihu.py

。。。。。。

05-11 22:16