這篇文章將講解如何爬取知乎上面的問答數據。css
首先,咱們須要知道,想要爬取知乎上面的數據,第一步確定是登陸,因此咱們先介紹一下模擬登陸:html
先說一下個人思路:python
1.首先咱們須要控制登陸的入口,重寫start_requests方法。來控制到這個入口以後,使用callback回調函數來調用login函數.mysql
2.在login函數中經過response.text獲取到該頁面的HTML代碼,經過正則表達式提取到登陸必需的xsrf值。正則表達式
3.下面就是獲取驗證碼了,這是一個難點,首先咱們先觀察在請求驗證碼時的url:算法
能夠發現,在請求驗證碼時出現了一個r後面的隨機數,因此咱們須要生成一個隨機數,並將該隨機數做爲參數傳給請求驗證碼時的url;而後將該url交給scrapy下載器進行下載。sql
4.而後經過回調函數調用login_after_captcha ,在該函數中咱們須要在請求驗證碼的url下載到驗證碼圖片,讓用戶輸入驗證碼並賦值給變量captcha。而後將xsrf、用戶名、密碼、驗證碼等參數存入postdata。咱們就能夠進行模擬登陸了。數據庫
5.經過調用scrapy的FormRequest來進行模擬登陸json
6.模擬登陸以後,調用check_login函數判斷是否登陸成功。api
下面是模擬登陸部分的源碼:
def start_requests(self): #控制登陸入口,實現知乎的登陸 return [scrapy.Request('https://www.zhihu.com/#signin',headers=self.header,callback=self.login)] def login(self,response): response_text=response.text xsrf_obj = re.match('.*name="_xsrf" value="(.*)".*', response_text,re.DOTALL) xsrf="" if xsrf_obj: xsrf=xsrf_obj.group(1) else: return "" if xsrf: post_data = { # 傳遞的數據 "_xsrf": xsrf, "phone_num": "你的帳號", "password": "你的密碼", "captcha":"" } import time t = str(int(time.time() * 1000)) captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login & lang=cn".format(t) yield scrapy.Request(captcha_url, headers=self.header, meta={"post_data": post_data}, callback=self.login_after_captcha) def login_after_captcha(self, response): with open("captcha.jpg", "wb") as f: #下載驗證碼圖片 f.write(response.body) #這裏必須爲body f.close() from PIL import Image try: im = Image.open('captcha.jpg') im.show() im.close() except: pass captcha = input("輸入驗證碼\n>") post_data = response.meta.get("post_data", {}) post_url = "https://www.zhihu.com/login/phone_num" post_data["captcha"] = captcha #將驗證碼的值傳給post_data return [scrapy.FormRequest( url=post_url, formdata=post_data, headers=self.header, callback=self.check_login #調用scrapy的FormRequest完成表單提交,,即模擬登陸,登陸以後調用check_login判斷登陸狀態 )] def check_login(self, response): # 驗證服務器的返回數據判斷是否成功 text_json = json.loads(response.text) #response.text存放了登陸成功或其餘提示信息 的Unicode編碼, # 咱們將其加載爲json格式。json格式中的msg(message字段存放了中文字符的登陸提示信息) if "msg" in text_json and text_json["msg"] == "登陸成功": for url in self.start_urls: yield scrapy.Request(url, dont_filter=True, headers=self.header) #在執行完check_login方法以後,由於咱們在最後沒有加callback函數,因此默認會調用parse方法
須要注意的是,每次在請求一個頁面時都須要帶上headers。由於headers中有一個重要的參數 User-Agent,咱們須要指明User-Agent。若是不加上headers的話,那麼在運行時會出現500的服務器錯誤,這是由於默認的User-Agent是你運行的python2或python3的User-Agent,服務器並不認識。因此,在爬取知乎的時候,在請求一個頁面時必須加上headers
在完成了登陸以後,下面就開始咱們的爬取邏輯:
爬取知乎的question須要的字段,設計數據表:
爬取知乎的question須要的字段,設計數據表:
爬取的大體思路:
1.首先咱們在parse方法中獲取到登陸以後的首頁的全部url,而後過濾掉不是url的條目,而後再提取感興趣的url。在這裏能夠進行一個判斷,若是是咱們感興趣的url則執行回調函數parse_question,若不是感興趣的url,那麼就利用scrapy的深度優先遍歷的特色進一步判斷,如此重複。
2.在parse_question方法中則是對url頁面中的數據解析。而後經過yield來調用回調函數解析該question對應的answer,並經過yield question_item傳遞給pipelines。
3.在pipelines中可使用爬取到數據,好比存爲json格式,存入數據庫等等。
4.更多的解釋會在源碼中註解。
咱們先看一下爬取的結果:
question表:
answer表:
下面是整個項目的源碼:
目錄結構:
import scrapy import re import json from scrapy.loader import ItemLoader from ArticleSpider.items import ZhihuQuestionItem,ArticleItemLoader,ZhihuAnswerItem import datetime try: import urlparse as parse except: from urllib import parse class ZhihuSpider(scrapy.Spider): name = 'zhihu' allowed_domains = ['www.zhihu.com'] start_urls = ['http://www.zhihu.com/'] #question的第一頁answer請求url start_answer_url="https://www.zhihu.com/api/v4/questions/{0}/answers?include=data[*].is_normal,admin_closed_comment,reward_info,is_collapsed,annotation_action,annotation_detail,collapse_reason,is_sticky,collapsed_by,suggest_edit,comment_count,can_comment,content,editable_content,voteup_count,reshipment_settings,comment_permission,created_time,updated_time,review_info,relevant_info,question,excerpt,relationship.is_authorized,is_author,voting,is_thanked,is_nothelp,upvoted_followees;data[*].mark_infos[*].url;data[*].author.follower_count,badge[?(type=best_answerer)].topics&offset={1}limit={2}&sort_by=default" header = { "Host": "www.zhihu.com", "Referer": "https://www.zhihu.com", 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:57.0) Gecko/20100101 Firefox/57.0" } def parse(self, response): #該函數的response.text是傳過來的登陸成功以後的html代碼 #提取出HTML頁面中的全部url,並跟蹤這些url進行進一步爬取(深度優先算法) #若是提取的url中格式爲/question/xxx 就下載以後直接進入解析函數 all_urls=response.css("a::attr(href)").extract() #提取全部的url all_urls=[parse.urljoin(response.url,url) for url in all_urls] #拼接特殊的URL all_urls=filter(lambda x:True if x.startswith("https") else False,all_urls) #使用匿名函數過濾掉不是url的條目 for url in all_urls: match_obj=re.match("(.*zhihu.com/question/(\d+))($|/).*",url) #提取感興趣URL(question/id) if match_obj: #若是提取到question相關的頁面 request_url=match_obj.group(1) #提取url question_id=match_obj.group(2) #提取question id yield scrapy.Request(request_url,headers=self.header,meta={"question_id":question_id},callback=self.parse_question) else: #若是不是question相關的url,則進行深度優先算法的提取 yield scrapy.Request(url,headers=self.header,callback=self.parse) pass def parse_question(self,response): #獲取到question的URL以後 ,調用該方法進行處理 if "QuestionHeader-title" in response.text: #處理新版本 click_num=response.css(".NumberBoard-value::text").extract()[1] question_id = response.meta.get("question_id", "") #獲取question_id字典。 item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response) item_loader.add_css("title",".QuestionHeader-title::text") #標題 item_loader.add_css("content",".QuestionHeader-detail") item_loader.add_value("url",response.url) #該頁面的url item_loader.add_value("zhihu_id",question_id) item_loader.add_css("answer_num",".List-headerText span::text")#extract以後['10',' 個回答'] 回答數 item_loader.add_css("comments_num",".QuestionHeader-Comment button::text") #extract以後['1 條評論'] item_loader.add_css("watch_user_num",".NumberBoard-value::text")#這裏會提取關注數和被瀏覽數兩個 item_loader.add_value("click_num",click_num) #這裏會提取關注數和被瀏覽數兩個 item_loader.add_css("topics",".QuestionHeader-topics .Popover div::text") #話題 question_item=item_loader.load_item() else: #處理老版本頁面的item提取 question_id = response.meta.get("question_id", "") item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response) item_loader.add_css("title", "#zh-question-title h2 a::text") item_loader.add_css("content", "#zh-question-detail") item_loader.add_value("url", response.url) item_loader.add_value("zhihu_id", question_id) item_loader.add_css("answer_num", "#zh-question-answer-num::text") item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text") item_loader.add_css("topics", ".zm-tag-editor-labels a::text") question_item = item_loader.load_item() yield scrapy.Request(self.start_answer_url.format(question_id,0,20),headers=self.header,callback=self.parse_answer) yield question_item def parse_answer(self,response): #因爲返回的是json文件 answer_json=json.loads(response.text) is_end=answer_json["paging"]["is_end"] #布爾類型 next_url=answer_json["paging"]["next"] for answer in answer_json["data"]: answer_item=ZhihuAnswerItem() answer_item["zhihu_id"]=answer["id"] answer_item["url"]=answer["url"] answer_item["question_id"]=answer["question"]["id"] answer_item["author_id"]=answer["author"]["id"] if "id" in answer["author"] else None answer_item["content"]=answer["content"] if "content" in answer else None answer_item["praise_num"]=answer["voteup_count"] answer_item["comments_num"]=answer["comment_count"] answer_item["create_time"]=answer["created_time"] #該網頁返回給咱們的int類型 answer_item["update_time"]=answer["updated_time"]# 該網頁返回給咱們的int類型 answer_item["crawl_time"]=datetime.datetime.now() # 2017-12-22 22:30:47.061460 yield answer_item if not is_end: #若是不是最後一個回答url yield scrapy.Request(next_url,headers=self.header,callback=self.parse_answer) def start_requests(self): #控制登陸入口,實現知乎的登陸 return [scrapy.Request('https://www.zhihu.com/#signin',headers=self.header,callback=self.login)] #在重寫start_requests方法以後,scrapy首先會執行該方法,請求的頁面是登陸界面,請求頭爲上面咱們定義的,而後由於 #scrapy是異步處理的緣故,因此在執行完該方法以後,咱們須要加一個callback回調函數,來調用登陸方法,若是不加上callback #指明回調函數,那麼scrapy默認會直接調用parse方法。 def login(self,response): response_text=response.text xsrf_obj = re.match('.*name="_xsrf" value="(.*)".*', response_text,re.DOTALL) xsrf="" if xsrf_obj: xsrf=xsrf_obj.group(1) else: return "" if xsrf: post_data = { # 傳遞的數據 "_xsrf": xsrf, "phone_num": "你的用戶名", "password": "你的密碼", "captcha":"" } import time t = str(int(time.time() * 1000)) captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login & lang=cn".format(t) yield scrapy.Request(captcha_url, headers=self.header, meta={"post_data": post_data}, callback=self.login_after_captcha) #meta表示字典,裏面存放了post_data。這裏保證了是同一個session中的cookie def login_after_captcha(self, response): with open("captcha.jpg", "wb") as f: #下載驗證碼圖片 f.write(response.body) #這裏必須爲body f.close() from PIL import Image try: im = Image.open('captcha.jpg') im.show() im.close() except: pass captcha = input("輸入驗證碼\n>") post_data = response.meta.get("post_data", {}) post_url = "https://www.zhihu.com/login/phone_num" post_data["captcha"] = captcha #將驗證碼的值傳給post_data return [scrapy.FormRequest( url=post_url, formdata=post_data, headers=self.header, callback=self.check_login #調用scrapy的FormRequest完成表單提交,,即模擬登陸,登陸以後調用check_login判斷登陸狀態 )] def check_login(self, response): # 驗證服務器的返回數據判斷是否成功 text_json = json.loads(response.text) #response.text存放了登陸成功或其餘提示信息 的Unicode編碼, # 咱們將其加載爲json格式。json格式中的msg(message字段存放了中文字符的登陸提示信息) if "msg" in text_json and text_json["msg"] == "登陸成功": for url in self.start_urls: yield scrapy.Request(url, dont_filter=True, headers=self.header) #在執行完check_login方法以後,由於咱們在最後沒有加callback函數,因此默認會調用parse方法
import hashlib import re def get_purenum(text): str_re = ".*?(\d+).*" # 提取整數部分並返回int類型 text = re.match(str_re, text) if text: return int(text.group(1)) else: return 0
import scrapy from scrapy.loader.processors import MapCompose,TakeFirst,Join from scrapy.loader import ItemLoader import re from ArticleSpider.util.common import get_purenum import datetime from ArticleSpider.settings import SQL_DATETIME_FORMAT,SQL_DATE_FORMAT class ZhihuQuestionItem(scrapy.Item): #入庫前處理 #知乎question實體類 zhihu_id=scrapy.Field( ) topics=scrapy.Field( ) url=scrapy.Field() title=scrapy.Field() content=scrapy.Field() create_time=scrapy.Field() update_time=scrapy.Field() answer_num=scrapy.Field() comments_num=scrapy.Field() watch_user_num=scrapy.Field() click_num=scrapy.Field() crawl_time=scrapy.Field() #爬蟲爬取的時間 def get_insertsql(self): insert_sql = "insert into zhihu_question(zhihu_id,topics,url,title,content," \ "answer_num,comments_num,watch_user_num,click_num,crawl_time) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE topics=VALUES(topics),content=VALUES(content),answer_num=values(answer_num), " \ "comments_num=values(comments_num),watch_user_num=values(watch_user_num),click_num=values(click_num)" zhihu_id = int("".join(self["zhihu_id"])) topics = ",".join(self["topics"]) url = self["url"][0] title = "".join(self["title"]) # ItemLoader是list類型,在這裏進行處理轉爲str類型或者int類型 content = "".join(self["content"]) answer_num = get_purenum("".join(self["answer_num"])) comments_num = get_purenum(self["comments_num"][0]) watch_user_num = int(self["watch_user_num"][0]) click_num = int(self["click_num"][0]) crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT) #將時間格式化爲咱們在setting中指定的格式 params = (zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time) return insert_sql,params class ZhihuAnswerItem(scrapy.Item): #知乎回答item zhihu_id=scrapy.Field() url=scrapy.Field() question_id=scrapy.Field() author_id=scrapy.Field() content=scrapy.Field() praise_num=scrapy.Field() comments_num=scrapy.Field() create_time=scrapy.Field() update_time=scrapy.Field() crawl_time=scrapy.Field() def get_insertsql(self): insert_sql = "insert into zhihu_answer(zhihu_id,url,question_id,author_id,content," \ "praise_num,comments_num,create_time,update_time,crawl_time) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE " \ "content=VALUES(content),praise_num=VALUES(praise_num),comments_num=VALUES(comments_num),update_time=VALUES(update_time)" # 調用datetime fromtimestamp方法將int 轉爲datetime類型(年月日時分秒都有),而後再格式化爲指定格式 create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT) update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT) params=(self["zhihu_id"],self["url"],self["question_id"],self["author_id"],self["content"] ,self["praise_num"],self["comments_num"],create_time,update_time,self["crawl_time"].strftime(SQL_DATETIME_FORMAT)) return insert_sql,params
from scrapy.pipelines.images import ImagesPipeline from scrapy.exporters import JsonItemExporter import MySQLdb from twisted.enterprise import adbapi import MySQLdb.cursors class ArticlespiderPipeline(object): def process_item(self, item, spider): #item中的values存放了tiems的全部屬性 return item class JsonExporterPipeline(object): #調用scrapy提供的JsonExporter導出json文件 def __init__(self): self.file=open('articleexport.json','wb')#打開文件.wb表示以二進制的方式 self.exporter=JsonItemExporter(self.file,encoding="utf-8",ensure_ascii=False) #傳遞參數 self.exporter.start_exporting() #開始導出 def close_spider(self,spider): self.exporter.finish_exporting() #中止導出 self.file.close() #關閉文件 def process_item(self, item, spider): self.exporter.export_item(item) return item class MysqlTwistedPipeline(object): #若每個網站寫一個Pipeline,那麼須要大數量的mysql的鏈接,顯然是不合理的,因此 #在實際開發中,每每是插入不一樣的數據庫使用不一樣的pipeline # 經過Twisted框架提供的異步容器,將數據經過異步的方式存儲到mysql中 def __init__(self,dbpool): self.dbpool=dbpool @classmethod def from_settings(cls,settings): dbparams=dict( host=settings["MYSQL_HOST"], db=settings["MYSQL_DBNAME"], user=settings["MYSQL_USER"], passwd=settings["MYSQL_PASSWORD"], charset="utf8", cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True ) dbpool=adbapi.ConnectionPool("MySQLdb",**dbparams) #鏈接池 return cls(dbpool) #返回MysqlTwistedPipeline類的一個實例化對象 def process_item(self, item, spider): query=self.dbpool.runInteraction(self.do_insert,item) #異步的插入操做 query.addErrback(self.handle_error,item,spider) #處理異步插入的異常 def handle_error(self,failure,item,spider): print(failure) #發生異常時直接輸出異常信息 def do_insert(self,cursor,item): insert_sql,params=item.get_insertsql() #調用對應的item的get_insertsql方法,獲取到不一樣的sql語句,和params元組 cursor.execute(insert_sql,params)
ITEM_PIPELINES = { 'ArticleSpider.pipelines.ArticlespiderPipeline': 300, 'ArticleSpider.pipelines.JsonExporterPipeline': 2 , #保存爲Json文件 # 'ArticleSpider.pipelines.MysqlPipeline': 3 #將數據同步存儲到數據庫 'ArticleSpider.pipelines.MysqlTwistedPipeline':3 #將數據異步插入到數據庫 } MYSQL_HOST="localhost" MYSQL_DBNAME="article_spider" MYSQL_USER="root" MYSQL_PASSWORD="751324" SQL_DATETIME_FORMAT="%Y-%m-%d %H:%M:%S" #詳細時間類型 SQL_DATE_FORMAT="%Y-%m-%d" #日期類型
至此,經過scrapy從模擬登陸到爬取問答數據的項目就完成了,模擬登陸是在爬蟲中很是重要的知識,在模擬登陸成功以後,包括解析數據,將item 傳入pipelines,數據表的建立,pipelines和items的配置,入庫等也就變得很簡單了;在這個項目中,在解析answer的數據時,咱們採用的是json格式直接提取的,由於url直接給咱們返回了json格式的文件,這樣也就簡化了咱們解析數據的操做。