(1) 基本的概念
爬蟲:css
自動獲取網站數據的程序html
關鍵是 定時,定量的,批量的獲取
反爬蟲:python
使用技術手段 防止爬蟲程序的方法mysql
存在誤傷,即 反爬技術 將普通用戶識別爲爬蟲 若是誤傷高 --- 效果再好也不能使用 例子: 好比 限制 ip === 用戶的ip 通常都是 局域網內動態分配的, 一個爬蟲的ip 可能分配給 另外一個 非爬蟲的用戶 有效的方法: 能夠在一段時間內 限制 ip,過一段時間 再把 ip釋放
反爬的成本:git
成功率越高成本越大,攔截率越高,誤傷率越高
反爬蟲的目的:github
初級爬蟲:簡單粗暴,無論服務器壓力,容易使服務器奔潰 數據保護 失控的爬蟲 某種狀況下,沒法或者沒有關閉的爬蟲 商業對手
爬蟲 vs 反爬蟲的對抗web
網站反爬的的策略 vs 爬蟲策略 1.監控 某個時間 訪問忽然增長,ip相同,user-agent不是瀏覽器;限制 ip訪問 (注意:不能封ip) (1)user-agent模擬,ip代理(ip代理池) 2 發現 ip 變化, 要求登陸訪問 (2) 註冊 帳號, 每次請求 帶 cookie 3 開發健全的帳號體系,每一個帳號 權限不一樣 (3) 多個帳號 聯合爬蟲(維護帳號池) 4 訪問頻繁 ,限制 ip頻率 (4) 模仿人的請求速度 5 彈出 驗證碼 (5) 識別驗證碼 6 增長網頁 內容的動態填充 ajax向後臺請求 (6) selenium + phantomJs 徹底模擬瀏覽器
爬蟲技術使得網站反爬的成本會愈來愈高ajax
(2)在scrapy中用到的反爬策略
1 隨機切換 UAsql
方案:chrome
1 能夠在 settings 中維護一個 ua_list 而後每一個 Request 裏的headers 參數 隨機獲取; 缺點: 代碼冗長,每一個spider都要寫 2 使用 downloadermiddleware 全局middleware,寫一個 UA middleware 注意:scrapy有一個的默認UserAgentMiddlewarescrapy.downloadermiddlewares.useragent.UserAgentMiddleware 會從setings 配置文件中獲取 3 使用第三方的fake-useragent 包 FakeUseragent().ie / .random 實際是維護了一個 ua的網頁 :https://fake-useragent.herokuapp.com/browsers/0.1.5 4 配置 USER_AGENT_TYPE = 'random' + 利用 fake-useragent 來實現 隨機ua 5 這樣 在 scheduler 把 request 經過 engine 發給 下載器的時候 就會加上 RandomUA
代碼實現以下:
settings:
USER_AGENT_TYPE = 'random' DOWNLOADER_MIDDLEWARES = { '項目.middlewares.RandomUserAgentMiddleware': 543, }
middlewares.py:
class RandomUserAgentMiddleware(object): def __init__(self,crawler): super(RandomUserAgentMiddleware,self).__init__() self.ua = UserAgent() self.ua_type = crawler.settings.get('USER_AGENT_TYPE','random') @classmethod def from_crawler(cls,crawler): return cls(crawler) def process_request(self,request,spider): def get_ua(): return getattr(self.ua,self.ua_type) print(get_ua()) request.headers.setdefault('USER_AGENT',get_ua())
2 設置ip 代理
(1)ip 不是固定不變的,存在必定的誤傷:
雲服務器: 阿里雲不會變化 亞馬遜服務器 重啓以後ip會變更 小區的ip 也是動態分配
注意: 本機ip 爬取速度最好,最穩定,要儘可能避免 本機ip被封掉
(2)ip 代理的 原理:
本地 向 代理服務器 發起請求,代理服務器 與 要請求的服務器進行 交互 通過代理以後的速度: 通過一次 中間服務器 速度會慢不少
(3)如何設置ip 代理
request.meta['proxy'] = 'http://ip + port'
(4)如何設置 ip代理池:
本身寫一個 處理 ip的 腳本,隨機獲取
思路: 爬蟲爬取 免費 ip; 放到數據庫; 再 做測試,刪除不能用的ip; 獲取ip; select ip,port from ip_proxy order by rand() limit 1; 隨機返回 ip
代碼實現: 1 爬取快代理的ip,並存入數據庫:
# _*_ coding:utf-8 _*_ __author__ = 'jimmy' __date__ = '2018/2/21 13:21' import requests from scrapy.selector import Selector import MySQLdb from concurrent.futures import ThreadPoolExecutor # pool = ThreadPoolExecutor(max_workers=6) # proxy = { # 'https':'https://113.139.180.244:808' } headers = { 'Host':'Host:www.kuaidaili.com', 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36' } conn = MySQLdb.connect(db='scrapytest', host='localhost', port=3306, user='root', passwd='', charset='utf8') cursor = conn.cursor() response = requests.get('https://www.kuaidaili.com/free/inha/1/',headers=headers) selector = Selector(text=response.text) # 獲取 所有頁碼 pages = int(selector.xpath("//div[@id='listnav']/ul/li")[-2].xpath('a/text()').extract_first()) # 獲取全部 url page_list = ['https://www.kuaidaili.com/free/inha/'+ str(num) for num in range(1,pages+1)] def get_text(page): response = requests.get(page,headers=headers) text = response.text return text def get_ips(text): selector = Selector(text=text) trs = selector.xpath("//tbody/tr") for tr in trs: ip = tr.xpath("td[1]/text()").extract_first() port = tr.xpath("td[2]/text()").extract_first() addr = tr.xpath("td[5]/text()").extract_first() speed = tr.xpath("td[6]/text()").extract_first().split('秒')[0] param = (ip,port,addr,speed) filter_set.add(param) return filter_set def insert_mysql(param): sql = ''' INSERT INTO ip_proxy(ip,port,addr,speed) VALUES(%s,%s,%s,%s) ''' cursor.execute(sql,param) conn.commit() class GetRandomIP(object): def judge_ip(self,ip,port): url = 'http://www.baidu.com' proxy_url = 'http://{0}:{1}'.format(ip,port) try: proxy_dict = { 'http':proxy_url } response = requests.get(url,proxies=proxy_dict) except Exception as e: self.delete_ip(ip) return False else: code = response.status_code if code >=200 and code <300: return True else: self.delete_ip(ip) return False def delete_ip(self,ip): sql = ''' DELETE FROM ip_proxy WHERE ip=%s ''' cursor.execute(sql,(ip,)) conn.commit() def get_random_ip(self): sql = ''' SELECT ip,port FROM ip_proxy ORDER BY RAND() LIMIT 1 ''' cursor.execute(sql) ip,port = cursor.fetchone() if self.judge_ip(ip,port): return '{0}:{1}'.format(ip,port) else: return self.get_random_ip() # if __name__ == '__main__': # filter_set= set() # for page in page_list: # text = get_text(page) # get_ips(text) # for param in filter_set: # insert_mysql(param)
代碼實現: 2 設置ip代理的中間件
settings
DOWNLOADER_MIDDLEWARES = { 'LG.middlewares.RandomIPMiddleware': 300,}
middlewraes
class RandomIPMiddleware(object): def process_request(self,request,spider): ip_port = GetRandomIP().get_random_ip() request.meta['proxy'] = 'http://{0}'.format(ip_port)
3 模擬登錄 cookie與session
注意瀏覽器是 -- 無狀態請求 cookie(瀏覽器本地存儲機制) -- 有狀態的請求(本地存儲,本地文件中)
cookie原理
用戶 ---->>> 服務器 瀏覽器 <<----- 分配 id ----->> 再次請求(附加身份信息) 不能存儲 客戶的敏感信息 --- 本地文件容易丟失,不安全
session(基於cookie實現)
返給 客戶端 一個session_id 服務器端生成(具備過時時間) 客戶端請求的時候,帶着session_id 在 服務器中獲取 我的信息
模擬登錄的思路:
請求方式 :post 請求url: 請求參數:_xsrf username pwd 獲取 session
代碼實現:1 requests + cookie
# _*_ coding:utf-8 _*_ __author__ = 'jimmy' __date__ = '2018/2/19 15:40' import requests try: import cookielib # py2 except: import http.cookiejar as cookielib # py3 # py2 和 py3 的 兼容代碼 import re def get_xsrf(): response = requests.get('https://www.zhihu.com',headers=headers) text = response.text xsrf = re.search(r';xsrf":"(.*?)"',text) if xsrf: return xsrf.group(1) else: return '' headers = { 'HOST':'www.zhihu.com', 'Referer':'https://www.zhihu.com', 'User-Agent':'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36',} cookies_ = { "_xsrf":get_xsrf(), "z_c0":".........."} response = requests.get(url='https://www.zhihu.com/inbox', headers=headers, cookies = cookies_, allow_redirects = False) print(response.status_code) 固然,也能夠經過session來實現: session = requests.session() def zhihu_login(account,pwd): # 知乎登陸 post_url = 'https://www.zhihu.com/api/v3/oauth/sign_in' post_data = { '_xsrf':get_xsrf(), 'username':account, 'password':pwd } response = session.post(post_url,data=post_data,headers=headers) session.cookies.save() zhihu_login('xxx','xxxx') def get_logined_index(): response = session.get('https://www.zhihu.com',headers=headers) with open('index.html','wb') as f: f.write(response.text.encode('utf-8')) get_logined_index()
代碼實現:2 scrapy
# -*- coding: utf-8 -*- import scrapy import re,datetime import json from urllib.parse import urljoin # 拼接域名 url from scrapy.loader import ItemLoader from zhihuspider.items import ZhihuQuestionItem,ZhihuAnswerItem class ZhihuSpider(scrapy.Spider): name = 'zhihu' allowed_domains = ['zhihu.com'] start_urls = ['http://zhihu.com/'] headers = { 'HOST': 'www.zhihu.com', 'Referer': 'https://www.zhihu.com', 'User-Agent': 'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36', } cookies_ = { "_xsrf": '', "z_c0": "............", } # api 接口 start_answer_url = 'https://www.zhihu.com/api/v4/questions/{0}/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}' # 知乎 question 的 answer 的起始 url def start_requests(self): return [scrapy.Request('https://www.zhihu.com',callback=self.get_xsrf,headers=self.headers)] # 獲取xsrf def get_xsrf(self,response): response_text = response.text xsrf = re.search(r';xsrf":"(.*?)"', response_text,re.DOTALL) if xsrf: xsrf= xsrf.group(1) else: xsrf = '' self.cookies_['_xsrf'] = xsrf for url in self.start_urls: yield scrapy.Request(url, headers=self.headers,cookies=self.cookies_, dont_filter=True,callback=self.parse) def parse(self,response): ''' 提取全部 url === 深度優先的策略 其中 question/數字 的url :param response: :return: ''' all_urls = response.css("a::attr(href)").extract() all_urls = [urljoin(response.url,url) for url in all_urls] all_urls = list(filter(lambda url:True if url.startswith('https') else False,all_urls )) for url in all_urls: match_obj = re.match('(.*zhihu.com/question/(\d+))(/.*|$)',url) if match_obj: request_url = match_obj.group(1) question_id = match_obj.group(2) yield scrapy.Request(request_url,meta={'question_id':question_id},headers=self.headers,callback=self.parse_question) else: yield scrapy.Request(url,headers=self.headers,callback=self.parse) def parse_question(self,response): """ 獲取 詳細的 item :param response: :return: """ question_id = response.meta.get('question_id') item_loader = ItemLoader(item=ZhihuQuestionItem(),response=response) item_loader.add_css('title','.QuestionHeader-title::text') item_loader.add_css('content','.QuestionHeader-detail') item_loader.add_value('url',response.url) item_loader.add_value('question_id',question_id) item_loader.add_css('answer_nums','.List-headerText span::text') item_loader.add_css('comment_nums','.QuestionHeader-Comment button::text') item_loader.add_css('watch_user_nums','.NumberBoard-itemValue::text') item_loader.add_css('topics','.QuestionHeader-topics .Popover div::text') question_item = item_loader.load_item() yield scrapy.Request(self.start_answer_url.format(question_id,20,0),headers=self.headers,callback=self.parse_answer) yield question_item def parse_answer(self,response): answer_json = json.loads(response.text) is_end = answer_json['paging']['is_end'] totals = answer_json['paging']['totals'] # 提取 answer 的數據 item for answer in answer_json['data']: answer_item = ZhihuAnswerItem() answer_item['zhihu_id'] = answer['id'] answer_item['url'] = answer['url'] answer_item['question_id'] = answer['question']['id'] answer_item['author_id'] = answer['author']['id'] if 'id' in answer['author'] else None answer_item['content'] = answer['content'] if 'content' in answer else None answer_item['create_time'] = answer['created_time'] answer_item['update_time'] = answer['updated_time'] answer_item['crawl_time'] = datetime.datetime.now() yield answer_item if not is_end: # 判斷是否還有下一頁 next_url = answer_json['paging']['next'] yield scrapy.Request(next_url,headers=self.headers,callback=self.parse_answer)
固然能夠維護多個用戶,實現一隨機的cookie池
4 Selenium 解決動態加載的html問題
Selenium:網站 開發測試框架
可使 咱們訪問 的html 與 瀏覽器(f12) 獲得的 html 同樣(動態html)
1 selenium 須要 對應的 瀏覽器 driver 2 注意 下載 好對應的 driver 直接能夠 放進 python 的scripts裏面;不用再去配置 executable_path=.../..exe參數 3 注意版本對應要求
如何把selenium集成scrapy中間件中
1 使用 中間件,須要 使用 HtmlResponse 直接把 response 返回給spider
from scrapy.http import HtmlResponse class SeleniumMiddleware(object): """ 經過 selenium 瀏覽器 驅動 直接獲取網頁 內容 不用再 通過 downloader 下載 因此 須要直接返回 """ def process_request(self,request,spider): if spider == 'lagou': browser = webdriver.Chrome() browser.get(request.url) import time time.sleep(1) return HtmlResponse(url=browser.current_url,body=browser.page_source,request=request,encoding='utf-8')
2 保證 每一個 爬蟲 spider 只用 一個chrome
# 在開始的時候 class MySpider(scrapy.Spider): ... ... def __init__(self): super(LagouSpider,self).__init__() self.browser = webdriver.Chrome()
3 保證 爬蟲 spider 結束的時候 chrome 關閉 ==== 信號量
from scrapy.xlib.pydispatch import dispatcher from scrapy import signals class MySpider(scrapy.Spider): ... ... def __init__(self): super(LagouSpider,self).__init__() self.browser = webdriver.Chrome() dispatcher.connect(self.spider_closed,signals.spider_closed) def spider_closed(self): self.browser.quit() print('browser關閉')
注意: 用 chrome 瀏覽器 去獲取 url網頁內容,返回 response的 過程是一個 同步的過程,會下降的運行速度
5 驗證碼
識別驗證碼的方法:
編碼實現 (tesseract-ocr) === google--開源的識別軟件 基於大量的人工訓練數據,識別率,效率低 在線打碼 (經常使用) 打碼平臺會提供 api 接口 人工打碼 成本高
在線打碼平臺的 api
附: http 請求狀態
200 -- 請求被成功處理 (通常去爬蟲) 301/302 -- 重定向/ 永久性,臨時性 403 -- 沒有權限訪問 (_xsrf,防止csrf攻擊) 404 -- 訪問資源有誤 500 -- 服務器有誤 503 -- 服務器維護,停機