用Flask+Aiohttp+Redis維護動態代理池

在網上有大量公開的免費代理,或者咱們也能夠購買付費的代理IP,可是代理不管是免費的仍是付費的,都不能保證都是可用的,由於可能此IP被其餘人使用來爬取一樣的目標站點而被封禁,或者代理服務器忽然發生故障或網絡繁忙。一旦咱們選用了一個不可用的代理,這勢必會影響爬蟲的工做效率。
html

因此,咱們須要提早作篩選,將不可用的代理剔除掉,保留可用代理。接下來咱們就搭建一個高效易用的代理池。git

1、準備工做

首先須要成功安裝Redis數據庫並啓動服務,另外還須要安裝aiohttp、requests、RedisPy、pyquery、Flask庫。
github

2、代理池的目標

咱們須要作到下面的幾個目標,來實現易用高效的代理池。
redis

基本模塊分爲4塊:存儲模塊、獲取模塊、檢測模塊、接口模塊。數據庫

  • 存儲模塊負責存儲抓取下來的代理。首先要保證代理不重複,要標識代理的可用狀況,還要動態實時處理每一個代理,因此一種比較高效和方便的存儲方式就是使用Redis的Sorted Set,即有序集合。json

  • 獲取模塊須要定時在各大代理網站抓取代理。代理能夠是免費公開代理也能夠是付費代理,代理的形式都是IP加端口,此模塊儘可能從不一樣來源獲取,儘可能抓取高匿代理,抓取成功以後將可用代理保存到數據庫中。flask

  • 檢測模塊須要定時檢測數據庫中的代理。這裏須要設置一個檢測連接,最好是爬取哪一個網站就檢測哪一個網站,這樣更加有針對性,若是要作一個通用型的代理,那能夠設置百度等連接來檢測。另外,咱們須要標識每個代理的狀態,如設置分數標識,100分表明可用,分數越少表明越不可用。檢測一次,若是代理可用,咱們能夠將分數標識當即設置爲100滿分,也能夠在原基礎上加1分;若是代理不可用,能夠將分數標識減1分,當分數減到必定閾值後,代理就直接從數據庫移除。經過這樣的標識分數,咱們就能夠辨別代理的可用狀況,選用的時候會更有針對性。api

  • 接口模塊須要用API來提供對外服務的接口。其實咱們能夠直接鏈接數據庫來取對應的數據,可是這樣就須要知道數據庫的鏈接信息,而且要配置鏈接,而比較安全和方便的方式就是提供一個Web API接口,咱們經過訪問接口便可拿到可用代理。另外,因爲可用代理可能有多個,那麼咱們能夠設置一個隨機返回某個可用代理的接口,這樣就能保證每一個可用代理均可以取到,實現負載均衡。瀏覽器

以上內容是設計代理的一些基本思路。接下來咱們設計總體的架構,而後用代碼實現代理池。安全

3、代理池的架構

根據上文的描述,代理池的架構能夠以下圖所示。

圖片

代理池分爲4個模塊:存儲模塊、獲取模塊、檢測模塊、接口模塊。

  • 存儲模塊使用Redis的有序集合,用來作代理的去重和狀態標識,同時它也是中心模塊和基礎模塊,將其餘模塊串聯起來。

  • 獲取模塊定時從代理網站獲取代理,將獲取的代理傳遞給存儲模塊,並保存到數據庫。

  • 檢測模塊定時經過存儲模塊獲取全部代理,並對代理進行檢測,根據不一樣的檢測結果對代理設置不一樣的標識。

  • 接口模塊經過Web API提供服務接口,接口經過鏈接數據庫並經過Web形式返回可用的代理。

4、代理池的實現

接下來,咱們用代碼分別實現這4個模塊。

1. 存儲模塊

這裏咱們使用Redis的有序集合,集合的每個元素都是不重複的,對於代理池來講,集合的元素就變成了一個個代理,也就是IP加端口的形式,如60.207.237.111:8888,這樣的一個代理就是集合的一個元素。另外,有序集合的每個元素都有一個分數字段,分數是能夠重複的,能夠是浮點數類型,也能夠是整數類型。該集合會根據每個元素的分數對集合進行排序,數值小的排在前面,數值大的排在後面,這樣就能夠實現集合元素的排序了。

對於代理池來講,這個分數能夠做爲判斷一個代理是否可用的標誌,100爲最高分,表明最可用,0爲最低分,表明最不可用。若是要獲取可用代理,能夠從代理池中隨機獲取分數最高的代理,注意是隨機,這樣能夠保證每一個可用代理都會被調用到。

分數是咱們判斷代理穩定性的重要標準,設置分數規則以下所示。

  • 分數100爲可用,檢測器會定時循環檢測每一個代理可用狀況,一旦檢測到有可用的代理就當即置爲100,檢測到不可用就將分數減1,分數減至0後代理移除。

  • 新獲取的代理的分數爲10,若是測試可行,分數當即置爲100,不可行則分數減1,分數減至0後代理移除。

這只是一種解決方案,固然可能還有更合理的方案。之因此設置此方案有以下幾個緣由。

  • 在檢測到代理可用時,分數當即置爲100,這樣能夠保證全部可用代理有更大的機會被獲取到。你可能會問,爲何不將分數加1而是直接設爲最高100呢?設想一下,有的代理是從各大免費公開代理網站獲取的,經常一個代理並無那麼穩定,平均五次請求可能有兩次成功,三次失敗,若是按照這種方式來設置分數,那麼這個代理幾乎不可能達到一個高的分數,也就是說即使它有時是可用的,可是篩選的分數最高,那這樣的代理幾乎不可能被取到。若是想追求代理穩定性,能夠用上述方法,這種方法可確保分數最高的代理必定是最穩定可用的。因此,這裏咱們採起「可用即設置100」的方法,確保只要可用的代理均可以被獲取到。

  • 在檢測到代理不可用時,分數減1,分數減至0後,代理移除。這樣一個有效代理若是要被移除須要失敗100次,也就是說當一個可用代理若是嘗試了100次都失敗了,就一直減分直到移除,一旦成功就從新置回100。嘗試機會越多,則這個代理拯救回來的機會越多,這樣就不容易將曾經的一個可用代理丟棄,由於代理不可用的緣由極可能是網絡繁忙或者其餘人用此代理請求太過頻繁,因此在這裏將分數爲100。

  • 新獲取的代理的分數設置爲10,代理若是不可用,分數就減1,分數減到0,代理就移除,若是代理可用,分數就置爲100。因爲不少代理是從免費網站獲取的,因此新獲取的代理無效的比例很是高,可能不足10%。因此在這裏咱們將分數設置爲10,檢測的機會沒有可用代理的100次那麼多,這也能夠適當減小開銷。

上述代理分數的設置思路不必定是最優思路,但據我的實測,它的實用性仍是比較強的。

如今咱們須要定義一個類來操做數據庫的有序集合,定義一些方法來實現分數的設置、代理的獲取等。代碼實現以下所示:

MAX_SCORE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies'

import redis
from random import choice

class RedisClient(object):
   def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
       """
       初始化
       :param host: Redis 地址
       :param port: Redis 端口
       :param password: Redis密碼
       """

       self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)

   def add(self, proxy, score=INITIAL_SCORE):
       """
       添加代理,設置分數爲最高
       :param proxy: 代理
       :param score: 分數
       :return: 添加結果
       """

       if not self.db.zscore(REDIS_KEY, proxy):
           return self.db.zadd(REDIS_KEY, score, proxy)

   def random(self):
       """
       隨機獲取有效代理,首先嚐試獲取最高分數代理,若是最高分數不存在,則按照排名獲取,不然異常
       :return: 隨機代理
       """

       result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
       if len(result):
           return choice(result)
       else:
           result = self.db.zrevrange(REDIS_KEY, 0, 100)
           if len(result):
               return choice(result)
           else:
               raise PoolEmptyError

   def decrease(self, proxy):
       """
       代理值減一分,分數小於最小值,則代理刪除
       :param proxy: 代理
       :return: 修改後的代理分數
       """

       score = self.db.zscore(REDIS_KEY, proxy)
       if score and score > MIN_SCORE:
           print('代理', proxy, '當前分數', score, '減1')
           return self.db.zincrby(REDIS_KEY, proxy, -1)
       else:
           print('代理', proxy, '當前分數', score, '移除')
           return self.db.zrem(REDIS_KEY, proxy)

   def exists(self, proxy):
       """
       判斷是否存在
       :param proxy: 代理
       :return: 是否存在
       """

       return not self.db.zscore(REDIS_KEY, proxy) == None

   def max(self, proxy):
       """
       將代理設置爲MAX_SCORE
       :param proxy: 代理
       :return: 設置結果
       """

       print('代理', proxy, '可用,設置爲', MAX_SCORE)
       return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)

   def count(self):
       """
       獲取數量
       :return: 數量
       """

       return self.db.zcard(REDIS_KEY)

   def all(self):
       """
       獲取所有代理
       :return: 所有代理列表
       """

       return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)

首先咱們定義了一些常量,如MAX_SCOREMIN_SCOREINITIAL_SCORE分別表明最大分數、最小分數、初始分數。REDIS_HOSTREDIS_PORTREDIS_PASSWORD分別表明了Redis的鏈接信息,即地址、端口、密碼。REDIS_KEY是有序集合的鍵名,咱們能夠經過它來獲取代理存儲所使用的有序集合。

接下來定義了一個RedisClient類,這個類能夠用來操做Redis的有序集合,其中定義了一些方法來對集合中的元素進行處理,它的主要功能以下所示。

  • __init__()方法是初始化的方法,其參數是Redis的鏈接信息,默認的鏈接信息已經定義爲常量,在__init__()方法中初始化了一個StrictRedis的類,創建Redis鏈接。

  • add()方法向數據庫添加代理並設置分數,默認的分數是INITIAL_SCORE,也就是10,返回結果是添加的結果。

  • random()方法是隨機獲取代理的方法,首先獲取100分的代理,而後隨機選擇一個返回。若是不存在100分的代理,則此方法按照排名來獲取,選取前100名,而後隨機選擇一個返回,不然拋出異常。

  • decrease()方法是在代理檢測無效的時候設置分數減1的方法,代理傳入後,此方法將代理的分數減1,若是分數達到最低值,那麼代理就刪除。

  • exists()方法可判斷代理是否存在集合中。

  • max()方法將代理的分數設置爲MAX_SCORE,即100,也就是當代理有效時的設置。

  • count()方法返回當前集合的元素個數。

  • all()方法返回全部的代理列表,以供檢測使用。

定義好了這些方法,咱們能夠在後續的模塊中調用此類來鏈接和操做數據庫。如想要獲取隨機可用的代理,只須要調用random()方法便可,獲得的就是隨機的可用代理。

2. 獲取模塊

獲取模塊的邏輯相對簡單,首先要定義一個Crawler來從各大網站抓取代理,示例以下所示:

import json
from .utils import get_page
from pyquery import PyQuery as pq

class ProxyMetaclass(type):
   def __new__(cls, name, bases, attrs):
       count = 0
       attrs['__CrawlFunc__'] = []
       for k, v in attrs.items():
           if 'crawl_' in k:
               attrs['__CrawlFunc__'].append(k)
               count += 1
       attrs['__CrawlFuncCount__'] = count
       return type.__new__(cls, name, bases, attrs)

class Crawler(object, metaclass=ProxyMetaclass):
   def get_proxies(self, callback):
       proxies = []
       for proxy in eval("self.{}()".format(callback)):
           print('成功獲取到代理', proxy)
           proxies.append(proxy)
       return proxies

   def crawl_daili66(self, page_count=4):
       """
       獲取代理66
       :param page_count: 頁碼
       :return: 代理
       """

       start_url = 'http://www.66ip.cn/{}.html'
       urls = [start_url.format(page) for page in range(1, page_count + 1)]
       for url in urls:
           print('Crawling', url)
           html = get_page(url)
           if html:
               doc = pq(html)
               trs = doc('.containerbox table tr:gt(0)').items()
               for tr in trs:
                   ip = tr.find('td:nth-child(1)').text()
                   port = tr.find('td:nth-child(2)').text()
                   yield ':'.join([ip, port])

   def crawl_proxy360(self):
       """
       獲取Proxy360
       :return: 代理
       """

       start_url = 'http://www.proxy360.cn/Region/China'
       print('Crawling', start_url)
       html = get_page(start_url)
       if html:
           doc = pq(html)
           lines = doc('div[name="list_proxy_ip"]').items()
           for line in lines:
               ip = line.find('.tbBottomLine:nth-child(1)').text()
               port = line.find('.tbBottomLine:nth-child(2)').text()
               yield ':'.join([ip, port])

   def crawl_goubanjia(self):
       """
       獲取Goubanjia
       :return: 代理
       """

       start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'
       html = get_page(start_url)
       if html:
           doc = pq(html)
           tds = doc('td.ip').items()
           for td in tds:
               td.find('p').remove()
               yield td.text().replace(' ', '')

方便起見,咱們將獲取代理的每一個方法統必定義爲以crawl開頭,這樣擴展的時候只須要添加crawl開頭的方法便可。

在這裏實現了幾個示例,如抓取代理6六、Proxy360、Goubanjia三個免費代理網站,這些方法都定義成了生成器,經過yield返回一個個代理。程序首先獲取網頁,而後用pyquery解析,解析出IP加端口的形式的代理而後返回。

而後定義了一個get_proxies()方法,將全部以crawl開頭的方法調用一遍,獲取每一個方法返回的代理並組合成列表形式返回。

你可能會想知道,如何獲取全部以crawl開頭的方法名稱呢?其實這裏藉助了元類來實現。咱們定義了一個ProxyMetaclassCrawl類將它設置爲元類,元類中實現了__new__()方法,這個方法有固定的幾個參數,第四個參數attrs中包含了類的一些屬性。咱們能夠遍歷attrs這個參數便可獲取類的全部方法信息,就像遍歷字典同樣,鍵名對應方法的名稱。而後判斷方法的開頭是否crawl,若是是,則將其加入到__CrawlFunc__屬性中。這樣咱們就成功將全部以crawl開頭的方法定義成了一個屬性,動態獲取到全部以crawl開頭的方法列表。

因此,若是要作擴展,咱們只須要添加一個以crawl開頭的方法。例如抓取快代理,咱們只須要在Crawler類中增長crawl_kuaidaili()方法,仿照其餘幾個方法將其定義成生成器,抓取其網站的代理,而後經過yield返回代理便可。這樣,咱們能夠很是方便地擴展,而不用關心類其餘部分的實現邏輯。

代理網站的添加很是靈活,不只能夠添加免費代理,也能夠添加付費代理。一些付費代理的提取方式也相似,也是經過Web的形式獲取,而後進行解析。解析方式可能更加簡單,如解析純文本或JSON,解析以後以一樣的形式返回便可,在此再也不代碼實現,能夠自行擴展。

既然定義了Crawler類,接下來再定義一個Getter類,用來動態地調用全部以crawl開頭的方法,而後獲取抓取到的代理,將其加入到數據庫存儲起來。

from db import RedisClient
from crawler import Crawler

POOL_UPPER_THRESHOLD = 10000

class Getter():
   def __init__(self):
       self.redis = RedisClient()
       self.crawler = Crawler()

   def is_over_threshold(self):
       """
       判斷是否達到了代理池限制
       """

       if self.redis.count() >= POOL_UPPER_THRESHOLD:
           return True
       else:
           return False

   def run(self):
       print('獲取器開始執行')
       if not self.is_over_threshold():
           for callback_label in range(self.crawler.__CrawlFuncCount__):
               callback = self.crawler.__CrawlFunc__[callback_label]
               proxies = self.crawler.get_proxies(callback)
               for proxy in proxies:
                   self.redis.add(proxy)

Getter類就是獲取器類,它定義了一個變量POOL_UPPER_THRESHOLD來表示代理池的最大數量,這個數量能夠靈活配置,而後定義了is_over_threshold()方法來判斷代理池是否已經達到了容量閾值。is_over_threshold()方法調用了RedisClient的count()方法來獲取代理的數量,而後進行判斷,若是數量達到閾值,則返回True,不然返回False。若是不想加這個限制,能夠將此方法永久返回True

接下來定義run()方法。該方法首先判斷了代理池是否達到閾值,而後在這裏就調用了Crawler類的__CrawlFunc__屬性,獲取到全部以crawl開頭的方法列表,依次經過get_proxies()方法調用,獲得各個方法抓取到的代理,而後再利用RedisClientadd()方法加入數據庫,這樣獲取模塊的工做就完成了。

3. 檢測模塊

咱們已經成功將各個網站的代理獲取下來了,如今就須要一個檢測模塊來對全部代理進行多輪檢測。代理檢測可用,分數就設置爲100,代理不可用,分數減1,這樣就能夠實時改變每一個代理的可用狀況。如要獲取有效代理只須要獲取分數高的代理便可。

因爲代理的數量很是多,爲了提升代理的檢測效率,咱們在這裏使用異步請求庫aiohttp來進行檢測。

requests做爲一個同步請求庫,咱們在發出一個請求以後,程序須要等待網頁加載完成以後才能繼續執行。也就是這個過程會阻塞等待響應,若是服務器響應很是慢,好比一個請求等待十幾秒,那麼咱們使用requests完成一個請求就會須要十幾秒的時間,程序也不會繼續往下執行,而在這十幾秒的時間裏程序其實徹底能夠去作其餘的事情,好比調度其餘的請求或者進行網頁解析等。

異步請求庫就解決了這個問題,它相似JavaScript中的回調,即在請求發出以後,程序能夠繼續執行去作其餘的事情,當響應到達時,程序再去處理這個響應。因而,程序就沒有被阻塞,能夠充分利用時間和資源,大大提升效率。

對於響應速度比較快的網站來講,requests同步請求和aiohttp異步請求的效果差距沒那麼大。可對於檢測代理來講,檢測一個代理通常須要十多秒甚至幾十秒的時間,這時候使用aiohttp異步請求庫的優點就大大致現出來了,效率可能會提升幾十倍不止。

因此,咱們的代理檢測使用異步請求庫aiohttp,實現示例以下所示:

VALID_STATUS_CODES = [200]
TEST_URL = 'http://www.baidu.com'
BATCH_TEST_SIZE = 100

class Tester(object):
   def __init__(self):
       self.redis = RedisClient()

   async def test_single_proxy(self, proxy):
       """
       測試單個代理
       :param proxy: 單個代理
       :return: None
       """

       conn = aiohttp.TCPConnector(verify_ssl=False)
       async with aiohttp.ClientSession(connector=conn) as session:
           try:
               if isinstance(proxy, bytes):
                   proxy = proxy.decode('utf-8')
               real_proxy = 'http://' + proxy
               print('正在測試', proxy)
               async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
                   if response.status in VALID_STATUS_CODES:
                       self.redis.max(proxy)
                       print('代理可用', proxy)
                   else:
                       self.redis.decrease(proxy)
                       print('請求響應碼不合法', proxy)
           except (ClientError, ClientConnectorError, TimeoutError, AttributeError):
               self.redis.decrease(proxy)
               print('代理請求失敗', proxy)

   def run(self):
       """
       測試主函數
       :return: None
       """

       print('測試器開始運行')
       try:
           proxies = self.redis.all()
           loop = asyncio.get_event_loop()
           # 批量測試
           for i in range(0, len(proxies), BATCH_TEST_SIZE):
               test_proxies = proxies[i:i + BATCH_TEST_SIZE]
               tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
               loop.run_until_complete(asyncio.wait(tasks))
               time.sleep(5)
       except Exception as e:
           print('測試器發生錯誤', e.args)

這裏定義了一個類Tester__init__()方法中創建了一個RedisClient對象,供該對象中其餘方法使用。接下來定義了一個test_single_proxy()方法,這個方法用來檢測單個代理的可用狀況,其參數就是被檢測的代理。注意,test_single_proxy()方法前面加了async關鍵詞,這表明這個方法是異步的。方法內部首先建立了aiohttp的ClientSession對象,此對象相似於requests的Session對象,能夠直接調用該對象的get()方法來訪問頁面。在這裏,代理的設置是經過proxy參數傳遞給get()方法,請求方法前面也須要加上async關鍵詞來標明其是異步請求,這也是aiohttp使用時的常見寫法。

測試的連接在這裏定義爲常量TEST_URL。若是針對某個網站有抓取需求,建議將TEST_URL設置爲目標網站的地址,由於在抓取的過程當中,代理自己多是可用的,可是該代理的IP已經被目標網站封掉了。例如,某些代理能夠正常訪問百度等頁面,可是對知乎來講可能就被封了,因此咱們能夠將TEST_URL設置爲知乎的某個頁面的連接,當請求失敗、代理被封時,分數天然會減下來,失效的代理就不會被取到了。

若是想作一個通用的代理池,則不須要專門設置TEST_URL,能夠將其設置爲一個不會封IP的網站,也能夠設置爲百度這類響應穩定的網站。

咱們還定義了VALID_STATUS_CODES變量,這個變量是一個列表形式,包含了正常的狀態碼,如能夠定義成[200]。固然某些目標網站可能會出現其餘的狀態碼,能夠自行配置。

程序在獲取Response後須要判斷響應的狀態,若是狀態碼在VALID_STATUS_CODES列表裏,則表明代理可用,能夠調用RedisClientmax()方法將代理分數設爲100,不然調用decrease()方法將代理分數減1,若是出現異常也一樣將代理分數減1。

另外,咱們設置了批量測試的最大值BATCH_TEST_SIZE爲100,也就是一批測試最多100個,這能夠避免代理池過大時一次性測試所有代理致使內存開銷過大的問題。

隨後,在run()方法裏面獲取了全部的代理列表,使用aiohttp分配任務,啓動運行,這樣就能夠進行異步檢測了。可參考aiohttp的官方示例:http://aiohttp.readthedocs.io/。

這樣,測試模塊的邏輯就完成了。

4. 接口模塊

經過上述三個模塊,咱們已經能夠作到代理的獲取、檢測和更新,數據庫就會以有序集合的形式存儲各個代理及其對應的分數,分數100表明可用,分數越小表明越不可用。

可是咱們怎樣方便地獲取可用代理呢?能夠用RedisClient類直接鏈接Redis,而後調用random()方法。這樣作沒問題,效率很高,可是會有幾個弊端。

  • 若是其餘人使用這個代理池,他須要知道Redis鏈接的用戶名和密碼信息,這樣很不安全。

  • 若是代理池須要部署在遠程服務器上運行,而遠程服務器的Redis只容許本地鏈接,那麼咱們就不能遠程直連Redis來獲取代理。

  • 若是爬蟲所在的主機沒有鏈接Redis模塊,或者爬蟲不是由Python語言編寫的,那麼咱們就沒法使用RedisClient來獲取代理。

  • 若是RedisClient類或者數據庫結構有更新,那麼爬蟲端必須同步這些更新,這樣很是麻煩。

綜上考慮,爲了使代理池能夠做爲一個獨立服務運行,咱們最好增長一個接口模塊,並以Web API的形式暴露可用代理。

這樣一來,獲取代理只須要請求接口便可,以上的幾個缺點弊端也能夠避免。

咱們使用一個比較輕量級的庫Flask來實現這個接口模塊,實現示例以下所示:

from flask import Flask, g
from db import RedisClient

__all__ = ['app']
app = Flask(__name__)

def get_conn():
   if not hasattr(g, 'redis'):
       g.redis = RedisClient()
   return g.redis

@app.route('/')
def index():
   return '<h2>Welcome to Proxy Pool System</h2>'

@app.route('/random')
def get_proxy():
   """
   獲取隨機可用代理
   :return: 隨機代理
   """

   conn = get_conn()
   return conn.random()

@app.route('/count')
def get_counts():
   """
   獲取代理池總量
   :return: 代理池總量
   """

   conn = get_conn()
   return str(conn.count())

if __name__ == '__main__':
   app.run()

在這裏,咱們聲明瞭一個Flask對象,定義了三個接口,分別是首頁、隨機代理頁、獲取數量頁。

運行以後,Flask會啓動一個Web服務,咱們只須要訪問對應的接口便可獲取到可用代理。

5. 調度模塊

調度模塊就是調用以上所定義的三個模塊,將這三個模塊經過多進程的形式運行起來,示例以下所示:

TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True

from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester

class Scheduler():
   def schedule_tester(self, cycle=TESTER_CYCLE):
       """
       定時測試代理
       """

       tester = Tester()
       while True:
           print('測試器開始運行')
           tester.run()
           time.sleep(cycle)

   def schedule_getter(self, cycle=GETTER_CYCLE):
       """
       定時獲取代理
       """

       getter = Getter()
       while True:
           print('開始抓取代理')
           getter.run()
           time.sleep(cycle)

   def schedule_api(self):
       """
       開啓API
       """

       app.run(API_HOST, API_PORT)

   def run(self):
       print('代理池開始運行')
       if TESTER_ENABLED:
           tester_process = Process(target=self.schedule_tester)
           tester_process.start()

       if GETTER_ENABLED:
           getter_process = Process(target=self.schedule_getter)
           getter_process.start()

       if API_ENABLED:
           api_process = Process(target=self.schedule_api)
           api_process.start()

三個常量TESTER_ENABLEDGETTER_ENABLEDAPI_ENABLED都是布爾類型,表示測試模塊、獲取模塊、接口模塊的開關,若是都爲True,則表明模塊開啓。

啓動入口是run()方法,這個方法分別判斷三個模塊的開關。若是開關開啓,啓動時程序就新建一個Process進程,設置好啓動目標,而後調用start()方法運行,這樣三個進程就能夠並行執行,互不干擾。

三個調度方法結構也很是清晰。好比,schedule_tester()方法用來調度測試模塊,首先聲明一個Tester對象,而後進入死循環不斷循環調用其run()方法,執行完一輪以後就休眠一段時間,休眠結束以後從新再執行。在這裏,休眠時間也定義爲一個常量,如20秒,即每隔20秒進行一次代理檢測。

最後,只須要調用Scheduler的run()方法便可啓動整個代理池。

以上內容即是整個代理池的架構和相應實現邏輯。

5、運行

接下來,咱們將代碼整合一下,將代理運行起來,運行以後的輸出結果以下圖所示。

圖片

以上是代理池的控制檯輸出,能夠看到,可用代理設置爲100,不可用代理分數減1。

咱們再打開瀏覽器,當前配置了運行在5555端口,因此打開http://127.0.0.1:5555,便可看到其首頁,以下圖所示。

圖片

再訪問:http://127.0.0.1:5555/random,便可獲取隨機可用代理,以下圖所示。

圖片

咱們只須要訪問此接口便可獲取一個隨機可用代理,這很是方便。

獲取代理的代碼以下所示:

import requests

PROXY_POOL_URL = 'http://localhost:5555/random'

def get_proxy():
   try:
       response = requests.get(PROXY_POOL_URL)
       if response.status_code == 200:
           return response.text
   except ConnectionError:
       return None

以後即是一個字符串類型的代理,此代理能夠按照上一節所示的方法設置,如requests的使用方法以下所示:

import requests

proxy = get_proxy()
proxies = {
   'http': 'http://' + proxy,
   'https': 'https://' + proxy,
}
try:
   response = requests.get('http://httpbin.org/get', proxies=proxies)
   print(response.text)
except requests.exceptions.ConnectionError as e:
   print('Error', e.args)

有了代理池以後,咱們再取出代理便可有效防止IP被封禁的狀況。

6、本節代碼

本節代碼地址爲:https://github.com/Python3WebSpider/ProxyPool。

7、結語

本節實現了一個比較高效的代理池,來獲取隨機可用的代理。接下來,咱們會利用代理池來實現數據的抓取。

相關文章
相關標籤/搜索