多進程+協程方案處理高IO密集,提高爬取效率

# coding=utf-8
import gevent
from gevent import monkey
# monkey.patch_all()

gevent.monkey.patch_all(thread=False, socket=False, select=False)
# 協程gevent庫和多進程,進程池衝突,須要關閉thread
# 如不關閉, 代碼會卡至建立進程池處.

import requests

import time
# import sys
from requests.adapters import HTTPAdapter
from bs4 import BeautifulSoup
# import multiprocessing
from multiprocessing import Pool

# sys.setrecursionlimit(10000)
rs = requests.Session()
rs.mount('http://', HTTPAdapter(max_retries=30))
rs.mount('https://', HTTPAdapter(max_retries=30))
# 設置最高重連次數


# import threading

# 測試以後決定放棄多線程使用
# 在爬取數據上.
# 相比較多進程下多線程
# 多進程下協程更具備性能優點.
# class MyThread(threading.Thread):
#     """重寫多線程,使其可以返回值"""
#     def __init__(self, target=None, args=()):
#         super(MyThread, self).__init__()
#         self.func = target
#         self.args = args
#
#     def run(self):
#         self.result = self.func(*self.args)
#
#     def get_result(self):
#         try:
#             return self.result  # 若是子線程不使用join方法,此處可能會報沒有self.result的錯誤
#         except Exception:
#             return None


# lock = threading.Lock()


# 獲取小說內容
def extraction_chapter(id, chapter_url, threads_content):
    """獲取小說內容"""

    res = rs.get(chapter_url, timeout=(5, 7))
    # print(result)
    # res.encoding = "gbk"
    # print (res)
    soup = BeautifulSoup(res.text, 'lxml')
    # print(soup)
    # title = soup.select('div.txtbox > h1')[].text

    title = soup.select('#txtbox > h1')
    content = soup.select('#content')
    # con = title + content
    title_str = str(title[0])
    content_str = str(content[0])

    # print(content_str)

    title_re = title_str.replace('<h1>', '')
    title_re = title_re.replace('</h1>', '\n')
    content_re = content_str.replace('<div id="content">', '')
    content_re = content_re.replace('<p>', '\n\t')
    content_re = content_re.replace('</p>', '')
    content_re = content_re.replace('</div>', '')

    make_sign = "\n\n\t_____(ฅ>ω<*ฅ)喵嗚~~~_____\n\n\n"  # 小mark

    con = title_re + content_re + make_sign

    threads_content[id] = con
    # 此處經過字典輸入內容


# 獲取小說每章網址(已分進程)
def extraction(novel_url, ):
    # print("+")

    res = rs.get(novel_url, timeout=(3, 5))
    # 輸入小說總頁面

    # 獲取元素
    soup = BeautifulSoup(res.text, 'lxml')
    start_time = time.time()

    # 尋找書名
    novel_title = soup.select('#bookinfo-right>h1')
    novel_title = str(novel_title[0])
    novel_title = novel_title.replace('<h1>', '')
    novel_title = novel_title.replace('</h1>', '')
    print("開始:  >>>"+novel_title+"<<<  ")

    chapter_all = soup.select('#list>ul>li>a')
    # 獲取章節所在元素,a標籤

    # chapter = str(chapter[0].attrs["href"])
    # 獲取a標籤href屬性
    # print(type(chapter_all))
    file_name = novel_title + '.txt'

    with open(file_name, 'w', encoding='utf-8') as f:
        f.write('')

    # content_con = ""
    id = 0
    g_list = []
    threads_content = {}
    # 遍歷拼接每章網址
    for chapter in chapter_all:
        chapter = str(chapter.attrs["href"])
        # 獲取a標籤href屬性

        chapter_url = novel_url + chapter
        # 完成拼接
        # print("協程建立+")
        # charpter_con = extraction_chapter(chapter_url)
        # 調用子方法, 萃取每章內容.
        # 使用協程提升效率
        # charpter_con = gevent.spawn(extraction_chapter, chapter_url)
        # charpter_con.join()
        g = gevent.spawn(extraction_chapter, id, chapter_url, threads_content)
        id += 1
        g_list.append(g)

    # 等待全部協程任務完成
    gevent.joinall(g_list)

    # 遍歷全部線程,等待全部線程都完成任務
    # for t in threads:
    #     t.join()
    # print(content_con)

    # 遍歷線程字典, 導入內容
    # i = 0
    # value = ""
    # while i <= len(threads_content):
    #     value = value + threads_content[i]
    #     i += 1

    # con_content = ""

    threads_content_key = sorted(threads_content.keys())

    # 字典排序, 按照key值從小到大排列

    for i in threads_content_key:
        # lock.acquire()
        with open(file_name, 'a', encoding='utf-8') as f:
            f.write(threads_content[i])
        # lock.release()
        # con_content += threads_content[i]
        # 存儲爲字符串, 遍歷完以後一次寫入.[測試時間204]


    # threads_content.clear()
    # with open(file_name, 'a', encoding='utf-8') as f:
    #     f.write(con_content)
    #
    # del con_content
    # 清除

    end_time = time.time()
    elapsed = str( float('%.2f' % (end_time - start_time)) )

    with open('console.log', 'a', encoding='utf-8') as f:
        f.write("Spend:["+ elapsed + "s]\t\t<<"+novel_title+">>\n")

    print("Spend:["+ elapsed + "s]\t\t<<"+novel_title+">>")

# 完本頁面網址
def end_book(end_url):
    res = rs.get(end_url, timeout=(3, 5))  # 鏈接超時和讀取超時時間設置
    # 輸入小說總頁面

    # 獲取元素
    soup = BeautifulSoup(res.text, 'lxml')

    # 尋找書籍a元素
    novel_name = soup.select('.bookimg>a')

    # print("準備建立進程")
    # 定義進程池, 默認爲cpu核數

    # print("建立進程池")
    # 默認進程數量爲核心數量
    po = Pool(8)
    # 使用八進程
    # 使用協程後能效獲得控制, 可根據總爬取數量進行更改.

# ><><><測試><><><><
# 處理器:i5,3230M 四核, 內存8G
# 爬取內容爲同頁,21本,每本約300章,30.9MB. 網絡有浮動, 如下測試數據僅能做爲參考

# >>效率對比<<
# 4進程-協程,91s,118s,125s,131s,109s,100s     <112.3>   四核CPU佔用均約: 32%  內存最高佔用:71.5%
# 8進程-協程,74s,91s,89s,86s,89s,67s,80s,65s  <80.12>   四核CPU佔用均約: 45%  內存最高佔用:77.9%
# 12進程-協程,89s,96s,73s,81s,82s,78s,74s,69s <80.25>   四核CPU佔用均約: 67%  內存最高佔用:85.7%

# <根據本數決定進程數>
# 21進程-協程,82s,96s,89s,90s,85s             <88.4>    四核CPU佔用均約: 72%  內存最高佔用:93.7%

# <<>><><><>
    """
    總結:
    
    計算密集型項目, 就只需使用多進程(核心數),可以達到最大效率,可跑滿每顆核心.(核心數+1)可避免由於內存頁缺失致使的計算資源浪費,可能形成一拖多現象,應根據具體狀況調整.
    I/O 密集型項目, 則使用多進程,加線程或協程.(大部分爬蟲項目,協程比多線程更有效率.)
    
    
    在I/O密集型任務當中,多進程+協程的解決方案,應該適當變更進程數量.
    
        決定因素有:
        
            1.硬件性能.
                CPU:    CPU還沒有跑滿,則尚有提高空間,可適當增長進程(N*核心數,N<=3). 
                內存:    一旦寫滿未能及時釋放進程佔用,則崩潰, 應減小進程.
                    (硬盤寫入門檻在小項目中很難觸碰. 尤爲是爬蟲類,在使用協程時可不考慮)
            2.網絡.
                自身帶寬: 爬蟲項目中, 帶寬上限應爲最終門檻.獲取數據達到帶寬上限, 代碼可沒必要再進行優化.  遺憾的是此項目中, 抓取效率最高爲800+Kb/s,遠遠未達到目標.
                網頁載入: 爬蟲項目中最重要的限制, 頁面的載入速度越快,獲取數據越快,則進程應越少. 頁面載入越慢, 則進程應越多才可提高效率,減小一拖多成本.
            3.項目總量.
                項目體量過大的時候, 應當仔細計算I/O時間與計算時間
                    公式應爲:  (IO時間+計算時間)/(計算時間+進程數*調度消耗)
                    ***** 此公式另貼細表 *****
                    
                項目體量不大的時候, 就根據具體的項目數量決定進程數
                    此項目中, 由於分頁, 每頁的21本書進行多進程操做.因此進行了一下這種很是規測試.
                    雖然此處效率並非很理想, 可是這種因地制宜進程數一定有可取之處.
        
    """

    # print("準備建立進程+")

    for name in novel_name:
        # 獲取每一個元素的網址
        # print("進程建立")
        novel_url = name.attrs["href"]

        # print(novel_url)
        # extraction(novel_url)
        # 把 網址傳入方法.

        # 進程池方式進行,把進程放入進程池
        # p = multiprocessing.Process(target=extraction, args=(novel_url,))
        po.apply_async(extraction, (novel_url,))
        # p.start()
        # p_list.append(p)

    po.close()
    po.join()
    # 爲避免抓取中斷, 進程池設置, 本頁數據抓取完畢以後再抓取下一頁. 犧牲了一些性能, 可酌情更改


def book(index_url, start, end):
    num = start

    while num <= end:

        start_time = time.time()

        index = '/index_' + str(num) + '.html'

        if num == 1:
            index = "/"

        # 全本書索引頁面
        index_con = index_url + index

        print(index_con)  # 輸出網址

        # 調用全本方法, 並傳入參數
        end_book(index_con)

        end_time = time.time()
        # 傳入耗時參數
        elapsed = str(float('%.2f' % (end_time - start_time)))

        localtime_end = time.asctime(time.localtime(time.time()))

        with open('console.log', 'a', encoding='utf-8') as f:
            f.write(
                '\n' + '*' * 50 + '\n'+ index +"\t"+ '消耗時間=\t' + elapsed + "\n" + localtime_end + "\n"+ '*' * 50+'\n\n')

        num += 1


if __name__ == '__main__':
    # 輸入網址
    
    url = "https://www.xxxxx.com/quanben"  # 此處輸入小說總網址
    page_start = 1  # 開始頁數
    page_end = 96  # 結束頁數

    # 開始時間
    start_time = time.time()

    localtime = time.asctime(time.localtime(time.time()))
    with open('console.log', 'w', encoding='utf-8') as f:
        f.write('<=====Start=====>\n\n' + localtime + '\n\n'+'-'*50+'\n\n')

    book(url, page_start, page_end)

    # 結束時間
    end_time = time.time()

    # 耗時
    elapsed = str( float('%.2f' % (end_time - start_time)) )


    localtime_end = time.asctime(time.localtime(time.time()))
    with open('console.log', 'a', encoding='utf-8') as f:
        f.write('\n'+'-'*50+'\n'+'消耗時間=====' + elapsed + "\t\t"  + "\n\n"+ localtime_end+"\n\n<=====Start=====>")

    print('消耗時間:' + elapsed)

 

 

    總結:
    
    計算密集型項目, 就只需使用多進程(核心數),可以達到最大效率,可跑滿每顆核心.(核心數+1)可避免由於內存頁缺失致使的計算資源浪費,可能形成一拖多現象,應根據具體狀況調整.
    I/O 密集型項目, 則使用多進程,加線程或協程.(大部分爬蟲項目,協程比多線程更有效率.)
    
    
    在I/O密集型任務當中,多進程+協程的解決方案,應該適當變更進程數量.
    
        決定因素有:
        
            1.硬件性能.
                CPU:    CPU還沒有跑滿,則尚有提高空間,可適當增長進程(N*核心數,N<=3). 
                內存:    一旦寫滿未能及時釋放進程佔用,則崩潰, 應減小進程.
                    (硬盤寫入門檻在小項目中很難觸碰. 尤爲是爬蟲類,在使用協程時可不考慮)
            2.網絡.
                自身帶寬: 爬蟲項目中, 帶寬上限應爲最終門檻.獲取數據達到帶寬上限, 代碼可沒必要再進行優化.  遺憾的是此項目中, 抓取效率最高爲800+Kb/s,遠遠未達到目標.
                網頁載入: 爬蟲項目中最重要的限制, 頁面的載入速度越快,獲取數據越快,則進程應越少. 頁面載入越慢, 則進程應越多才可提高效率,減小一拖多成本.
            3.項目總量.
                項目體量過大的時候, 應當仔細計算I/O時間與計算時間
                    公式應爲:  (IO時間+計算時間)/(計算時間+進程數*調度消耗)
                    ***** 此公式另貼細表 *****
                    
                項目體量不大的時候, 就根據具體的項目數量決定進程數
                    此項目中, 由於分頁, 每頁的21本書進行多進程操做.因此進行了一下這種很是規測試.
                    雖然此處效率並非很理想, 可是這種因地制宜進程數一定有可取之處.




爲閨中密友系列加了個書目
 
相關文章
相關標籤/搜索