作爬蟲最懼怕的兩件事一個是被封帳戶一個是被封IP地址,IP地址可使用代理來解決,網上有許多作IP代理的服務,他們提供大量的IP地址,不過這些地址不必定都是所有可用,由於這些IP地址可能被其餘人作爬蟲使用,因此隨時可能被一些網站封禁,因此對於一些不可用的IP地址,使用以後就會影響程序運行效率,使用在得到IP地址以後,對這些地址作篩選,去除一些不可用的地址,再進行爬蟲,效率就大大提高。經過爬取網上一些高匿IP來自建代理池。html
1.準備工做web
搭建代理池須要一些庫的支持,包括安裝Redis數據庫,安裝requests,apihttp,redis-py,pyquery,Flask,這些工具自行安裝redis
2.搭建代理池,須要幹什麼數據庫
能夠經過先下面的圖瞭解一些(這個圖網上找的。。。。。),圖中能夠分爲4個部分:獲取模塊(爬蟲調度器,代理獲取爬蟲),存儲模塊(Redis數據庫),檢測模塊(代理驗證爬蟲),接口模塊(web api)json
3.代理池的實現flask
存儲模塊:使用Redis的有序集合,爲何要使用有序集合,優勢:有序集合可以保證數據不重複,有序集合自帶分數字段,分數能夠重複,可以幫助把每個元素進行排序,數值小的在前面,數值大的在後面,對於後面的接口模塊很是有用,能夠經過分數值來判斷代理是否可用,100表明最可用,0表明不可用,從接口獲取的確定分數是100的IP地址,保證每個均可用,提升效率。下面來實現存儲模塊 (db.py)api
import redisfrom random import choice import re
# Redis數據庫地址
REDIS_HOST = '127.0.0.1'服務器
# Redis端口
REDIS_PORT = 6379session
# Redis密碼,如無填None
REDIS_PASSWORD = Noneapp
REDIS_KEY = 'proxies'
# 代理分數
MAX_SCORE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10
class RedisClient(object): def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD): """ 初始化 :param host: Redis 地址 :param port: Redis 端口 :param password: Redis密碼 """ self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True) def add(self, proxy, score=INITIAL_SCORE): """ 添加代理,設置分數爲最高 :param proxy: 代理 :param score: 分數 :return: 添加結果 """ if not re.match('\d+\.\d+\.\d+\.\d+\:\d+', proxy): print('代理不符合規範', proxy, '丟棄') return if not self.db.zscore(REDIS_KEY, proxy): return self.db.zadd(REDIS_KEY, score, proxy) def random(self): """ 隨機獲取有效代理,首先嚐試獲取最高分數代理,若是不存在,按照排名獲取,不然異常 :return: 隨機代理 """ result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) if len(result): return choice(result) else: result = self.db.zrevrange(REDIS_KEY, 0, 100) if len(result): return choice(result) else: raise PoolEmptyError def decrease(self, proxy): """ 代理值減一分,小於最小值則刪除 :param proxy: 代理 :return: 修改後的代理分數 """ score = self.db.zscore(REDIS_KEY, proxy) if score and score > MIN_SCORE: print('代理', proxy, '當前分數', score, '減1') return self.db.zincrby(REDIS_KEY, proxy, -1) else: print('代理', proxy, '當前分數', score, '移除') return self.db.zrem(REDIS_KEY, proxy) def exists(self, proxy): """ 判斷是否存在 :param proxy: 代理 :return: 是否存在 """ return not self.db.zscore(REDIS_KEY, proxy) == None def max(self, proxy): """ 將代理設置爲MAX_SCORE :param proxy: 代理 :return: 設置結果 """ print('代理', proxy, '可用,設置爲', MAX_SCORE) return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy) def count(self): """ 獲取數量 :return: 數量 """ return self.db.zcard(REDIS_KEY) def all(self): """ 獲取所有代理 :return: 所有代理列表 """ return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE) def batch(self, start, stop): """ 批量獲取 :param start: 開始索引 :param stop: 結束索引 :return: 代理列表 """ return self.db.zrevrange(REDIS_KEY, start, stop - 1)
定義了一個RedisClient類,定義了一些操做數據庫的方法,這裏設置了代理的分數,默認添加進去的IP地址都是10分,在後面檢測IP地址是否的時候,若是可用就直接設置爲 100,在檢測代理不可用的時候,分數減1,直到分數爲0,刪除這個不可用的IP地址,調用的是decrease()方法.max()方法設置分數爲100,ranfom()方法是隨機獲取有效代理。
2.獲取模塊
獲取各大網站上的IP地址(crawler.py)
import json import re from .utils import get_page from pyquery import PyQuery as pq
import requests from requests.exceptions import ConnectionError base_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' } def get_page(url, options={}): """ 抓取代理 :param url: :param options: :return: """ headers = dict(base_headers, **options) print('正在抓取', url) try: response = requests.get(url, headers=headers) print('抓取成功', url, response.status_code) if response.status_code == 200: return response.text except ConnectionError: print('抓取失敗', url) return None
class ProxyMetaclass(type): def __new__(cls, name, bases, attrs):#定義了一個元類,attrs包含類的一些屬性,包括類的全部方法 count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items():遍歷類的方法 if 'crawl_' in k:#知足以crawl開頭的類的方法就添加到__CrawlFunc__屬性中 attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): def get_proxies(self, callback): proxies = [] for proxy in eval("self.{}()".format(callback)): print('成功獲取到代理', proxy) proxies.append(proxy) return proxies def crawl_daili66(self, page_count=4): """ 獲取代理66 :param page_count: 頁碼 :return: 代理 """ start_url = 'http://www.66ip.cn/{}.html' urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print('Crawling', url) html = get_page(url) if html: doc = pq(html) #第一個li節點以後的節點 trs = doc('.containerbox table tr:gt(0)').items() for tr in trs: #選擇第一個td ip = tr.find('td:nth-child(1)').text() port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_qydaili(self): """ 獲取代理 :param page_count: 頁碼 :return: 代理 """ for i in range(1,11): start_url = 'http://www.qydaili.com/free/?action=china&page={}'.format(i) html = get_page(start_url) if html: doc = pq(html) #第一個li節點以後的節點 trs = doc('.container .table-bordered tbody tr').items() for tr in trs: #選擇第一個td ip = tr.find('td:nth-child(1)').text() port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_kuaidaili(self): for i in range(1, 20): start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i) html = get_page(start_url) if html: ip_address = re.compile('<td data-title="IP">(.*?)</td>') re_ip_address = ip_address.findall(html) port = re.compile('<td data-title="PORT">(.*?)</td>') re_port = port.findall(html) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','') def crawl_xicidaili(self): for i in range(1, 21): start_url = 'http://www.xicidaili.com/nn/{}'.format(i) headers = { 'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cookie':'_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3', 'Host':'www.xicidaili.com', 'Referer':'http://www.xicidaili.com/nn/3', 'Upgrade-Insecure-Requests':'1', } html = get_page(start_url, options=headers) if html: find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S) trs = find_trs.findall(html) for tr in trs: find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') re_ip_address = find_ip.findall(tr) find_port = re.compile('<td>(\d+)</td>') re_port = find_port.findall(tr) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','') def crawl_ip3366(self): for i in range(1, 20): start_url = 'http://www.ip3366.net/free/?stype=1&page={}'.format(i) html = get_page(start_url) if html: find_tr = re.compile('<tr>(.*?)</tr>', re.S) trs = find_tr.findall(html) for s in range(1, len(trs)): find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') re_ip_address = find_ip.findall(trs[s]) find_port = re.compile('<td>(\d+)</td>') re_port = find_port.findall(trs[s]) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','') def crawl_iphai(self): url = ['http://www.iphai.com/','http://www.iphai.com/free/ng','http://www.iphai.com/free/np','http://www.iphai.com/free/wg','http://www.iphai.com/free/wp'] for start_url in url: html = get_page(start_url) if html: find_tr = re.compile('<tr>(.*?)</tr>', re.S) trs = find_tr.findall(html) for s in range(1, len(trs)): find_ip = re.compile('<td>\s+(\d+\.\d+\.\d+\.\d+)\s+</td>', re.S) re_ip_address = find_ip.findall(trs[s]) find_port = re.compile('<td>\s+(\d+)\s+</td>', re.S) re_port = find_port.findall(trs[s]) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','') def crawl_89ip(self): for i in range(1,20): start_url = 'http://www.89ip.cn/index_{}'.format(i) html = get_page(start_url) if html: find_tr = re.compile('<tr>(.*?)</tr>', re.S) trs = find_tr.findall(html) for s in range(1, len(trs)): find_ip = re.compile('<td>\s+(\d+\.\d+\.\d+\.\d+)\s+</td>', re.S) re_ip_address = find_ip.findall(trs[s]) find_port = re.compile('<td>\s+(\d+)\s+</td>', re.S) re_port = find_port.findall(trs[s]) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','') def crawl_data5u(self): start_url = 'http://www.data5u.com/free/gngn/index.shtml' headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7', 'Cache-Control': 'max-age=0', 'Connection': 'keep-alive', 'Cookie': 'JSESSIONID=47AA0C887112A2D83EE040405F837A86', 'Host': 'www.data5u.com', 'Referer': 'http://www.data5u.com/free/index.shtml', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36', } html = get_page(start_url, options=headers) if html: ip_address = re.compile('<span><li>(\d+\.\d+\.\d+\.\d+)</li>.*?<li class=\"port.*?>(\d+)</li>', re.S) re_ip_address = ip_address.findall(html) for address, port in re_ip_address: result = address + ':' + port yield result.replace(' ', '')
在這裏爲每個方法都定義了一個以crawl開頭的方法,這樣方便拓展,好比你想添加其餘的IP網站,只要定義以crawl開頭就能夠了,這裏給的網站目前都是可用的,惟一的缺點就是IP可用的很少。定義了Crawler類,還要定義一個Getter 類,動態調用全部crawl開頭的方法,獲取IP,存儲到數據庫。(getter.py)
from proxypool.tester import Tester from proxypool.db import RedisClient from proxypool.crawler import Crawler import sys #代理池數量上限
POOL_UPPER_THRESHOLD = 20000 class Getter(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): """ 判斷是否達到了代理池限制 """ if self.redis.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print('獲取器開始執行') if not self.is_over_threshold(): for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] # 獲取代理 proxies = self.crawler.get_proxies(callback) sys.stdout.flush() for proxy in proxies: self.redis.add(proxy)
3.檢測模塊
獲取到代理以後,就是從數據庫中提取出來,檢測是否可用,這裏使用異步請求apihttp 來檢測,效率比較高 ( tester.py)
import asyncio import aiohttp import time import sys try: from aiohttp import ClientError except: from aiohttp import ClientProxyConnectionError as ProxyConnectionError from proxypool.db import RedisClient
# 檢查週期
TESTER_CYCLE = 20
# 獲取週期
GETTER_CYCLE = 300
# 測試API,建議抓哪一個網站測哪一個
TEST_URL = 'https://weibo.cn/'
class Tester(object): def __init__(self): self.redis = RedisClient() async def test_single_proxy(self, proxy): """ 測試單個代理 :param proxy: :return: """ conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf-8') real_proxy = 'http://' + proxy print('正在測試', proxy) async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用', proxy) else: self.redis.decrease(proxy) print('請求響應碼不合法 ', response.status, 'IP', proxy) except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): self.redis.decrease(proxy) print('代理請求失敗', proxy) def run(self): """ 測試主函數 :return: """ print('測試器開始運行') try: count = self.redis.count() print('當前剩餘', count, '個代理') for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) print('正在測試第', start + 1, '-', stop, '個代理') test_proxies = self.redis.batch(start, stop) loop = asyncio.get_event_loop() tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) sys.stdout.flush() time.sleep(5) except Exception as e: print('測試器發生錯誤', e.args)
TEST_URL網站設置爲你要進行爬蟲的網站,這裏是weibo地址
4.接口模塊
接口模塊就是從數據庫中獲取可用的IP地址,經過Flask來實現,這樣能夠方便部署在服務器上 (api.py)
from flask import Flask, g from .db import RedisClient __all__ = ['app'] app = Flask(__name__) def get_conn(): if not hasattr(g, 'redis'): g.redis = RedisClient() return g.redis @app.route('/') def index(): return '<h2>Welcome to Proxy Pool System</h2>' @app.route('/random') def get_proxy(): """ Get a proxy :return: 隨機代理 """ conn = get_conn() return conn.random() @app.route('/count') def get_counts(): """ Get the count of proxies :return: 代理池總量 """ conn = get_conn() return str(conn.count()) if __name__ == '__main__': app.run()
再實現一個調度模塊,來調用上面的3個模塊,經過多進程方式運行(scheduler.py)
import time from multiprocessing import Process from proxypool.api import app from proxypool.getter import Getter from proxypool.tester import Tester from proxypool.db import RedisClient #開關 TESTER_ENABLED = True GETTER_ENABLED = True API_ENABLED = True # 檢查週期 TESTER_CYCLE = 20 # 獲取週期 GETTER_CYCLE = 300 class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): """ 定時測試代理 """ tester = Tester() while True: print('測試器開始運行') tester.run() time.sleep(cycle) def schedule_getter(self, cycle=GETTER_CYCLE): """ 定時獲取代理 """ getter = Getter() while True: print('開始抓取代理') getter.run() time.sleep(cycle) def schedule_api(self): """ 開啓API """ app.run(API_HOST, API_PORT) def run(self): print('代理池開始運行') if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()
調用 run()方法就能夠啓動代理池了(run.py)
from proxypool.scheduler import Scheduler import sys import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') def main(): try: s = Scheduler() s.run() except: main() if __name__ == '__main__': main()
運行效果
可使用Redis Desktop Manager 查看爬取到的地址,能夠看到爬了8994個地址,可用的也就200多個,有點慘啊