代理池——代理採集,測試,保存和接口使用

代理池的設置主要有四部

  1. 獲取代理
  2. 代理測試
  3. 數據存儲
  4. API接口

 

1.1先設置須要獲取的代理的網站和解析規則

config.pyhtml

# 全部網站得解析方式
 parse_list = [ { 'urls': ['http://www.66ip.cn/{}.html'.format(n) for n in range(1, 10)], # 66代理 'pattern': '//div[@id="main"]//div/div/table//tr[position()>1]', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[3]'}, }, { 'urls':['https://www.kuaidaili.com/free/inha/{}/'.format(n) for n in range(1, 10)], # 快代理 'pattern': '//div[@id="list"]/table/tbody/tr', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[5]'}, }, { 'urls':['http://www.xicidaili.com/nn/{}'.format(n) for n in range(1, 5)], # 西刺代理 'pattern': '//table[@id="ip_list"]/tr[position()>1]', 'position': {'ip': './td[2]', 'port': './td[3]', 'address': './td[4]/a/'} } ] DEFAULT_HEADERS = { 'User-Agent': 'ozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', }

 

1.2 設置數據解析的方法sql

Parser.py數據庫

# 解析方法
import re from lxml import etree class Parser_response(object): # 對傳遞過來的html代碼進行數據解析
 @staticmethod def parser_xpath(response, parser): '''根據parser規則對response進行解析''' etre = etree.HTML(response) items = etre.xpath(parser['pattern']) proxy_list = [] for item in items: try: # 解析數據
                ip = item.xpath(parser['position']['ip'])[0].text port = item.xpath(parser['position']['port'])[0].text address = item.xpath(parser['position']['address'])[0].text except Exception: continue
            else: proxy = {'ip': ip, 'port': port, 'address': address} # 設置成字典格式
                proxy_list.append(proxy) # 加入到列表中返回

        return proxy_list @staticmethod def parser_re(): pass

1.3 主程序中調用他們proxy_pool.pyflask

 
 
from Parser import Parser_response # 解析過程


'''
代理url的請求測試''' class Download_HTML(object): @staticmethod def download(url): try: print('正在請求{}.....'.format(url)) r = requests.get(url, headers=DEFAULT_HEADERS) # 對代理的網站url進行請求 r.encoding = 'utf-8' # 設置編碼格式 if not r.ok: # 請求不成功 raise ConnectionError else: return r.text # 成功返回網絡代碼 except Exception as e: print('請求失敗,異常: ', e) ....... def crawl(self, parser): '''對配置文件中的網站進行解析''' Download = Download_HTML() # 實例化代理網站 for url in parser['urls']: # 取出url列表,多頁 time.sleep(2) # 延時兩秒 response = Download.download(url) # 調用方法獲取網頁的html代碼 if not response: return None # 將html代碼和代碼解析規則傳給相應類的方法,獲取返回的代理信息列表(ip,port, address) proxy_list = Parser_response.parser_xpath(response, parser) for proxy in proxy_list: # 遍歷 while True: if self.queue1.full(): # 若是隊列滿了的話 time.sleep(1) else: self.queue1.put(proxy) # 沒滿就加入到隊列queue1中 break

 

2代理的檢測

def detect(queue1, queue2): #queue1是全部的代理數據,queue2是可用的代理的數據 '''對queue1中的代理進行檢測''' proxy_list = [] while True: if not queue1.empty(): # 若是隊列不爲空
            proxy = queue1.get() # 從隊列取出數據
            proxy_list.append(proxy) # 再將數據添加到列表中

            if len(proxy_list) > 9: # 若是列表中得數量到10時
                spanws = [] for proxy in proxy_list: # 遍歷列表
                    spanws.append(gevent.spawn(detect_baidu, proxy, queue2)) # 設置協程,傳入方法和變量(代理和一個表明可用隊列的隊列)
 gevent.joinall(spanws) # 開啓協程
                proxy_list = [] # 將列表清空

'''代理測試放入可用隊列'''
def detect_baidu(proxy, queue2=None): '''請求測試''' proxies = { 'http': 'http://{}:{}'.format(proxy['ip'], proxy['port']), 'https': 'https://{}:{}'.format(proxy['ip'], proxy['port']), } # 完善代理的數據
    result = None # 設置初始標誌None
    try: print(proxies) # 用代理進行網絡請求以測試代理是否可用
        result = requests.get(url='https://www.baidu.com', headers=DEFAULT_HEADERS, proxies=proxies, timeout=5) except Exception: print('代理不可用,已丟棄....') '''detect_baidu方法被調用了兩次,主要看調用時是否傳入了queue2這個參數, 若是加了queue2這個參數,表示調用判斷代理可用性決定加入到可用隊列queue2中; 沒有帶queue2這個參數的話,表示可用代理回測,返回的標誌決定了queue2中的代理是否刪除 '''
    # 判斷加入到可用隊列與否(主要看是否傳入了queue2)
    if result and queue2: print('代理{}可用,已添加到可用隊列....'.format(proxy)) queue2.put(proxy) # 判斷加入到數據庫中與否的前提(主要看是否傳入了queue2)
    elif result: return True

 

3 數據的存儲

3.1 數據庫的方法設置網絡

Mongo_DB.py併發

import pymongo class MongoHelper(object): def __init__(self): self.client = pymongo.MongoClient(host='127.0.0.1', port=27017) self.db = self.client['tanzhou_homework'] self.proxy = self.db['proxy'] def insert(self, data=None): #
        if data: self.proxy.save(data) def delete(self, data=None): #
        if data: self.proxy.remove(data) def update(self, data, conditions): #
        if data and conditions: self.proxy.update(data, {'$set': conditions}) def select(self, data=None): #
        if data: items = self.proxy.find(data) else: data={} items = self.proxy.find(data) results = [] for item in items: result = (item['ip'], item['port'], item['address']) results.append(result) return results if __name__ == '__main__': mongo = MongoHelper() 

 

 

3.2 主程序中調用proxy_pool.pyapp

from Mongo_DB import MongoHelper # 數據庫操做
 mongo = MongoHelper() def insert_sql(queue2): '''讀取queue2裏面得數據再添加到數據庫'''
    while True: proxy = queue2.get() if proxy: mongo.insert(proxy) # print('代理{}已經成功添加到數據庫'.format(proxy))

 

4 API接口

Service.pydom

from random import choice from flask import Flask from Mongo_DB import MongoHelper mongo = MongoHelper() app = Flask(__name__) @app.route('/') # 路由根目錄r
def index(): result = mongo.select() # 獲取IP列表
    text = choice(result) # 隨機選擇一個出來
    return '{}:{}'.format(text[0], text[1]) # 返回

def start_service(): app.run(debug=True, host='0.0.0.0', port=9999) if __name__ == '__main__': start_service() 

主程序中ide

from Service import start_service # 接口配置


if __name__ == '__main__': p4 = Process(target=start_service) # 接口

 

 


 

 

完整代碼以下:

config.py高併發

# 全部網站得解析方式
 parse_list = [ { 'urls': ['http://www.66ip.cn/{}.html'.format(n) for n in range(1, 10)], 'pattern': '//div[@id="main"]//div/div/table//tr[position()>1]', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[3]'}, }, { 'urls':['https://www.kuaidaili.com/free/inha/{}/'.format(n) for n in range(1, 10)], 'pattern': '//div[@id="list"]/table/tbody/tr', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[5]'}, }, { 'urls':['http://www.xicidaili.com/nn/{}'.format(n) for n in range(1, 5)], 'pattern': '//table[@id="ip_list"]/tr[position()>1]', 'position': {'ip': './td[2]', 'port': './td[3]', 'address': './td[4]/a/'} } ] DEFAULT_HEADERS = { 'User-Agent': 'ozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', }

Mongo_DB.py

import pymongo class MongoHelper(object): def __init__(self): self.client = pymongo.MongoClient(host='127.0.0.1', port=27017) self.db = self.client['tanzhou_homework'] self.proxy = self.db['proxy'] def insert(self, data=None): #
        if data: self.proxy.save(data) def delete(self, data=None): #
        if data: self.proxy.remove(data) def update(self, data, conditions): #
        if data and conditions: self.proxy.update(data, {'$set': conditions}) def select(self, data=None): #
        if data: items = self.proxy.find(data) else: data={} items = self.proxy.find(data) results = [] for item in items: result = (item['ip'], item['port'], item['address']) results.append(result) return results if __name__ == '__main__': mongo = MongoHelper() 

Parser.py

# 解析方法
import re from lxml import etree class Parser_response(object): # 對傳遞過來的html代碼進行數據解析
 @staticmethod def parser_xpath(response, parser): '''根據parser規則對response進行解析''' etre = etree.HTML(response) items = etre.xpath(parser['pattern']) proxy_list = [] for item in items: try: # 解析數據
                ip = item.xpath(parser['position']['ip'])[0].text port = item.xpath(parser['position']['port'])[0].text address = item.xpath(parser['position']['address'])[0].text except Exception: continue
            else: proxy = {'ip': ip, 'port': port, 'address': address} # 設置成字典格式
                proxy_list.append(proxy) # 加入到列表中返回

        return proxy_list @staticmethod def parser_re(): pass

Service.py

from random import choice from flask import Flask from Mongo_DB import MongoHelper mongo = MongoHelper() app = Flask(__name__) @app.route('/') # 路由根目錄r
def index(): result = mongo.select() # 獲取IP列表
    text = choice(result) # 隨機選擇一個出來
    return '{}:{}'.format(text[0], text[1]) # 返回

def start_service(): app.run(debug=True, host='0.0.0.0', port=9999) if __name__ == '__main__': start_service() 

proxy_pool.py

import time import requests from multiprocessing import Process, Queue import gevent from gevent import monkey;monkey.patch_all() from config import parse_list, DEFAULT_HEADERS # 代理網站及相應解析規則,請求頭
from Parser import Parser_response  # 解析過程
from Mongo_DB import MongoHelper # 數據庫操做
from Service import start_service # 接口配置
 mongo = MongoHelper() '''代理url的請求測試'''
class Download_HTML(object): @staticmethod def download(url): try: print('正在請求{}.....'.format(url)) r = requests.get(url, headers=DEFAULT_HEADERS) # 對代理的網站url進行請求
            r.encoding = 'utf-8' # 設置編碼格式
            if not r.ok: # 請求不成功
                raise ConnectionError else: return r.text # 成功返回網絡代碼
        except Exception as e: print('請求失敗,異常: ', e) '''爬取代理數據並添加到隊列中'''
class Proxy_crawl(object): porxies_set = set() # 空集合,查重用

    def __init__(self, queue1): self.queue1 = queue1 def open_spider(self): while True: proxy_list = mongo.select() # 獲取數據庫中得全部代理
            spawns = [] for proxy in proxy_list: # 協程,再進行可用隊列中回測代理可用性
 spawns.append(gevent.spawn(detece_from_db,proxy,self.porxies_set)) gevent.joinall(spawns) if len(self.porxies_set) < 5: # 當集合數量小於5的時候
                print('當前可用代理數量爲{},開始獲取代理...'.format(len(self.porxies_set))) for parser in parse_list: self.crawl(parser) # 繼續爬取
            else: print('當前可用代理數量大於閾值....開始休眠') time.sleep(300) def crawl(self, parser): '''對配置文件中的網站進行解析''' Download = Download_HTML() # 實例化代理網站
        for url in parser['urls']: # 取出url列表,多頁
            time.sleep(2) # 延時兩秒
            response = Download.download(url) # 調用方法獲取網頁的html代碼
            if not response: return None # 將html代碼和代碼解析規則傳給相應類的方法,獲取返回的代理信息列表(ip,port, address)
            proxy_list = Parser_response.parser_xpath(response, parser) for proxy in proxy_list: # 遍歷
                while True: if self.queue1.full(): # 若是隊列滿了的話
                        time.sleep(1) else: self.queue1.put(proxy) # 沒滿就加入到隊列queue1中
                        break

def detece_from_db(proxy, porxies_set): proxy = {'ip':proxy[0], 'port':proxy[1]} result = detect_baidu(proxy) # 對數據庫中的代理進行回測
    if not result: mongo.delete(proxy) # 不可用就刪除
        return None porxies_set.add('{}:{}'.format(proxy['ip'], proxy['port'])) # 可用就添加到set集合中去

'''設置協程實現高併發'''
def detect(queue1, queue2): '''對queue1中的代理進行檢測''' proxy_list = [] while True: if not queue1.empty(): # 若是隊列不爲空
            proxy = queue1.get() # 從隊列取出數據
            proxy_list.append(proxy) # 再將數據添加到列表中

            if len(proxy_list) > 9: # 若是列表中得數量到10時
                spanws = [] for proxy in proxy_list: # 遍歷列表
                    spanws.append(gevent.spawn(detect_baidu, proxy, queue2)) # 設置協程,傳入方法和變量(代理和一個表明可用隊列的隊列)
 gevent.joinall(spanws) # 開啓協程
                proxy_list = [] # 將列表清空

'''代理測試放入可用隊列'''
def detect_baidu(proxy, queue2=None): '''請求測試''' proxies = { 'http': 'http://{}:{}'.format(proxy['ip'], proxy['port']), 'https': 'https://{}:{}'.format(proxy['ip'], proxy['port']), } # 完善代理的數據
    result = None # 設置初始標誌None
    try: print(proxies) # 用代理進行網絡請求以測試代理是否可用
        result = requests.get(url='https://www.baidu.com', headers=DEFAULT_HEADERS, proxies=proxies, timeout=5) except Exception: print('代理不可用,已丟棄....') '''detect_baidu方法被調用了兩次,主要看調用時是否傳入了queue2這個參數, 若是加了queue2這個參數,表示調用判斷代理可用性決定加入到可用隊列queue2中; 沒有帶queue2這個參數的話,表示可用代理回測,返回的標誌決定了queue2中的代理是否刪除 '''
    # 判斷加入到可用隊列與否(主要看是否傳入了queue2)
    if result and queue2: print('代理{}可用,已添加到可用隊列....'.format(proxy)) queue2.put(proxy) # 判斷加入到數據庫中與否的前提(主要看是否傳入了queue2)
    elif result: return True '''爬取代理數據的前提調用'''
def start_crawl(queue1): p = Proxy_crawl(queue1) # 類的實例化
 p.open_spider() def insert_sql(queue2): '''讀取queue2裏面得數據再添加到數據庫'''
    while True: proxy = queue2.get() if proxy: mongo.insert(proxy) # print('代理{}已經成功添加到數據庫'.format(proxy))


if __name__ == '__main__': q1 = Queue() # 全部代理的隊列
    q2 = Queue() # 可用代理的隊列
    p1 = Process(target=detect, args=(q1, q2)) # 代理檢測
    p2 = Process(target=start_crawl, args=(q1, )) # 下載代理
    p3 = Process(target=insert_sql, args=(q2, )) # 存入數據
    p4 = Process(target=start_service) # 接口
 p2.start() # 開啓進程
 p1.start() p3.start() p4.start() 
相關文章
相關標籤/搜索