Python網絡爬蟲3 - 生產者消費者模型爬取某金融網站數據

博客首發於www.litreily.tophtml

應一位金融圈的朋友所託,幫忙寫個爬蟲,幫他爬取中國期貨行業協議網站中全部金融機構的從業人員信息。網站數據的獲取自己比較簡單,可是爲了學習一些新的爬蟲方法和技巧,即本文要講述的生產者消費者模型,我又學習了一下Python中隊列庫queue及線程庫Thread的使用方法。python

生產者消費者模型

生產者消費者模型很是簡單,相信大部分程序員都知道,就是一方做爲生產者不斷提供資源,另外一方做爲消費者不斷消費資源。簡單點說,就比如餐館的廚師和顧客,廚師做爲生產者不斷製做美味的食物,而顧客做爲消費者不斷食用廚師提供的食物。此外,生產者與消費者之間能夠是一對1、一對多、多對一和多對多的關係。git

那麼這個模型和爬蟲有什麼關係呢?其實,爬蟲能夠認爲是一個生產者,它不斷從網站爬取數據,爬取到的數據就是食物;而所得數據須要消費者進行數據清洗,把有用的數據吸取掉,把無用的數據丟棄。程序員

在實踐過程當中,爬蟲爬取和數據清洗分別對應一個Thread,兩個線程之間經過順序隊列queue傳遞數據,數據傳遞過程就比如餐館服務員從廚房把食物送到顧客餐桌上的過程。爬取線程負責爬取網站數據,並將原始數據存入隊列,清洗線程從隊列中按入隊順序讀取原始數據並提取出有效數據。github

以上即是對生產者消費者模型的簡單介紹了,下面針對本次爬取任務予以詳細說明。web

分析站點

http://www.cfachina.org/cfainfo/organbaseinfoServlet?all=personinfojson

home page

咱們要爬取的數據是主頁顯示的表格中全部期貨公司的從業人員信息,每一個公司對應一個機構編號(G01001~G01198)。從上圖能夠看到有主頁有分頁,共8頁。以G01001方正中期期貨公司爲例,點擊該公司名稱跳轉至對應網頁以下:數組

personinfo

從網址及網頁內容能夠提取出如下信息:app

  1. 網址
  • http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+&currentPage=1&pageSize=20&selectType=personinfo
    • organid: 機構編號,+G01001+ ~ +G01198+
    • currentPage: 該機構從業人員信息當前頁面編號
    • pageSize: 每一個頁面顯示的人員個數,默認20
    • selectType: 固定爲personinfo
  1. 機構名稱mechanism_name,在每頁表格上方能夠看到當前機構名稱
  2. 從業人員信息,即每頁的表格內容,也是咱們要爬取的對象
  3. 該機構從業人員信息總頁數page_cnt

咱們最終爬取的數據能夠按機構名稱存儲到對應的txt文件或excel文件中。ide

獲取機構名稱

get mechanism name

獲取到某機構的任意從業信息頁面後,使用BeautifulSoup可快速提取機構名稱。

mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
複製代碼

那麼有人可能會問,既然主頁表格都已經包含了全部機構的編號和名稱,爲什麼還要畫蛇添足的再獲取一次呢?這是由於,我壓根就不想爬主頁的那些表格,直接根據機構編號的遞增規律生成對應的網址便可,因此獲取機構名稱的任務就放在了爬取每一個機構首個信息頁面以後。

獲取機構信息對應的網頁數量

get count of page

每一個機構的數據量是不等的,幸虧每一個頁面都包含了當前頁面數及總頁面數。使用如下代碼便可獲取頁碼數。

url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
page_cnt = url_re.search(html).group(1)
複製代碼

從每一個機構首頁獲取頁碼數後,即可for循環修改網址參數中的currentPage,逐頁獲取機構信息。

獲取當前頁面從業人員信息

get personinfo

針對如上圖所示的一個特定信息頁時,人員信息被存放於一個表中,除了固定的表頭信息外,人員信息均被包含在一個帶有idtr標籤中,因此使用BeautifulSoup能夠很容易提取出頁面內全部人員信息。

soup.find_all('tr', id=True)
複製代碼

肯定爬取方案

通常的想法固然是逐頁爬取主頁信息,而後獲取每頁全部機構對應的網頁連接,進而繼續爬取每一個機構信息。

可是因爲該網站的機構信息網址具備明顯的規律,咱們根據每一個機構的編號即可直接獲得每一個機構每一個信息頁面的網址。因此具體爬取方案以下:

  1. 將全部機構編號網址存入隊列url_queue
  2. 新建生產者線程SpiderThread完成抓取任務
  • 循環從隊列url_queue中讀取一個編號,生成機構首頁網址,使用requests抓取之
  • 從抓取結果中獲取頁碼數量,若爲0,則返回該線程第1步
  • 循環爬取當前機構剩餘頁面
  • 將頁面信息存入隊列html_queue
  1. 新建消費者線程DatamineThread完成數據清洗任務
  • 循環從隊列html_queue中讀取一組頁面信息
  • 使用BeautifulSoup提取頁面中的從業人員信息
  • 將信息以二維數組形式存儲,最後交由數據存儲類Storage存入本地文件

代碼實現

生成者SpiderThread

爬蟲線程先從隊列獲取一個機構編號,生成機構首頁網址並進行爬取,接着判斷機構頁面數量是否爲0,如若不爲0則繼續獲取機構名稱,並根據頁面數循環爬取剩餘頁面,將原始html數據以以下dict格式存入隊列html_queue:

{
    'name': mechanismId_mechanismName,
    'num': currentPage,
    'content': html
}
複製代碼

爬蟲產生的數據隊列html_queue將由數據清洗線程進行處理,下面是爬蟲線程的主程序,整個線程代碼請看後面的源碼

def run(self):
    while True:
        mechanism_id = 'G0' + self.url_queue.get()

        # the first page's url
        url = self.__get_url(mechanism_id, 1)
        html = self.grab(url)

        page_cnt = self.url_re.search(html.text).group(1)
        if page_cnt == '0':
            self.url_queue.task_done()
            continue
        
        soup = BeautifulSoup(html.text, 'html.parser')
        mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
        print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))

        # put data into html_queue
        self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
        for i in range(2, int(page_cnt) + 1):
            url = self.__get_url(mechanism_id, i)
            html = self.grab(url)
            self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
        
        self.url_queue.task_done()
複製代碼

消費者DatamineThread

數據清洗線程比較簡單,就是從生產者提供的數據隊列html_queue逐一提取html數據,而後從html數據中提取從業人員信息,以二維數組形式存儲,最後交由存儲模塊Storage完成數據存儲工做。

class DatamineThread(Thread):
    """Parse data from html"""
    def __init__(self, html_queue, filetype):
        Thread.__init__(self)
        self.html_queue = html_queue
        self.filetype = filetype

    def __datamine(self, data):
        '''Get data from html content'''
        soup = BeautifulSoup(data['content'].text, 'html.parser')
        infos = []
        for info in soup.find_all('tr', id=True):
            items = []
            for item in info.find_all('td'):
                items.append(item.get_text())
            infos.append(items)
        return infos
        
    def run(self):
        while True:
            data = self.html_queue.get()
            print('Datamine Thread: get %s_%d' % (data['name'], data['num']))

            store = Storage(data['name'], self.filetype)
            store.save(self.__datamine(data))
            self.html_queue.task_done()
複製代碼

數據存儲Storage

我寫了兩類文件格式的存儲函數,write_txt, write_excel,分別對應txt,excel文件。實際存儲時由調用方肯定文件格式。

def save(self, data):
    {
        '.txt': self.write_txt,
        '.xls': self.write_excel
    }.get(self.filetype)(data)
複製代碼

存入txt文件

存入txt文件是比較簡單的,就是以附加(a)形式打開文件,寫入數據,關閉文件。其中,文件名稱由調用方提供。寫入數據時,每一個人員信息佔用一行,以製表符\t分隔。

def write_txt(self, data):
    '''Write data to txt file'''
    fid = open(self.path, 'a', encoding='utf-8')

    # insert the header of table
    if not os.path.getsize(self.path):
        fid.write('\t'.join(self.table_header) + '\n')
    
    for info in data:
        fid.write('\t'.join(info) + '\n')
    fid.close()
複製代碼

存入Excel文件

存入Excel文件仍是比較繁瑣的,因爲經驗很少,選用的是xlwt, xlrdxlutils庫。說實話,這3個庫真心不大好用,勉強完成任務而已。爲何這麼說,且看:

  1. 修改文件麻煩:xlwt只能寫,xlrd只能讀,須要xlutilscopy函數將xlrd讀取的數據複製到內存,再用xlwt修改
  2. 只支持.xls文件:.xlsx經讀寫也會變成.xls格式
  3. 表格樣式易變:只要從新寫入文件,表格樣式必然重置

因此後續我確定會再學學其它的excel庫,固然,當前解決方案暫時還用這三個。代碼以下:

def write_excel(self, data):
    '''write data to excel file'''
    if not os.path.exists(self.path):
        header_style = xlwt.easyxf('font:name 楷體, color-index black, bold on')
        wb = xlwt.Workbook(encoding='utf-8')
        ws = wb.add_sheet('Data')

        # insert the header of table
        for i in range(len(self.table_header)):
            ws.write(0, i, self.table_header[i], header_style)
    else:
        rb = open_workbook(self.path)
        wb = copy(rb)
        ws = wb.get_sheet(0)
    
    # write data
    offset = len(ws.rows)
    for i in range(0, len(data)):
        for j in range(0, len(data[0])):
            ws.write(offset + i, j, data[i][j])

    # When use xlutils.copy.copy function to copy data from exist .xls file,
    # it will loss the origin style, so we need overwrite the width of column,
    # maybe there some other good solution, but I have not found yet.
    for i in range(len(self.table_header)):
        ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]

    # save to file
    while True:
        try:
            wb.save(self.path)
            break
        except PermissionError as e:
            print('{0} error: {1}'.format(self.path, e.strerror))
            time.sleep(5)
        finally:
            pass
複製代碼

說明:

  1. 一個文件對應一個機構的數據,須要屢次讀取和寫入,因此須要計算文件寫入時的行數偏移量offset,即當前文件已包含數據的行數
  2. 當被寫入文件被人爲打開時,會出現PermissionError異常,能夠在捕獲該異常而後提示錯誤信息,並定時等待直到文件被關閉。

main

主函數用於建立和啓動生產者線程和消費者線程,同時爲生產者線程提供機構編號隊列。

url_queue = queue.Queue()
html_queue = queue.Queue()

def main():
    for i in range(1001, 1199):
        url_queue.put(str(i))

    # create and start a spider thread
    st = SpiderThread(url_queue, html_queue)
    st.setDaemon(True)
    st.start()

    # create and start a datamine thread
    dt = DatamineThread(html_queue, '.xls')
    dt.setDaemon(True)
    dt.start()

    # wait on the queue until everything has been processed
    url_queue.join()
    html_queue.join()
複製代碼

從主函數能夠看到,兩個隊列都調用了join函數,用於阻塞,直到對應隊列爲空爲止。要注意的是,隊列操做中,每一個出隊操做queue.get()須要對應一個queue.task_done()操做,不然會出現隊列數據已所有處理完,但主線程仍在執行的狀況。

至此,爬蟲的主要代碼便講解完了,下面是完整源碼。

源碼

#!/usr/bin/python3
# -*-coding:utf-8-*-

import queue
from threading import Thread

import requests

import re
from bs4 import BeautifulSoup

import os
import platform

import xlwt
from xlrd import open_workbook
from xlutils.copy import copy

import time

# url format ↓
# http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+&currentPage=1&pageSize=20&selectType=personinfo&all=undefined
# organid: +G01001+, +G01002+, +G01003+, ...
# currentPage: 1, 2, 3, ...
# pageSize: 20(default)
# 
# Algorithm design:
# 2 threads with 2 queues
# Thread-1, get first page url, then get page_num and mechanism_name from first page
# Thread-2, parse html file and get data from it, then output data to local file
# url_queue data -> 'url' # first url of each mechanism
# html_queue data -> {'name':'mechanism_name', 'html':data}

url_queue = queue.Queue()
html_queue = queue.Queue()


class SpiderThread(Thread):
    """Threaded Url Grab"""
    def __init__(self, url_queue, html_queue):
        Thread.__init__(self)
        self.url_queue = url_queue
        self.html_queue = html_queue
        self.page_size = 20
        self.url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
        self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}

    def __get_url(self, mechanism_id, current_page):
        return 'http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+%s+&currentPage=%d&pageSize=%d&selectType=personinfo&all=undefined' \
        % (mechanism_id, current_page, self.page_size)

    def grab(self, url):
        '''Grab html of url from web'''
        while True:
            try:
                html = requests.get(url, headers=self.headers, timeout=20)
                if html.status_code == 200:
                    break
            except requests.exceptions.ConnectionError as e:
                print(url + ' Connection error, try again...')
            except requests.exceptions.ReadTimeout as e:
                print(url + ' Read timeout, try again...')
            except Exception as e:
                print(str(e))
            finally:
                pass
        return html
    
    def run(self):
        '''Grab all htmls of mechanism one by one Steps: 1. grab first page of each mechanism from url_queue 2. get number of pages and mechanism name from first page 3. grab all html file of each mechanism 4. push all html to html_queue '''
        while True:
            mechanism_id = 'G0' + self.url_queue.get()

            # the first page's url
            url = self.__get_url(mechanism_id, 1)
            html = self.grab(url)

            page_cnt = self.url_re.search(html.text).group(1)
            if page_cnt == '0':
                self.url_queue.task_done()
                continue
            
            soup = BeautifulSoup(html.text, 'html.parser')
            mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
            print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))

            # put data into html_queue
            self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
            for i in range(2, int(page_cnt) + 1):
                url = self.__get_url(mechanism_id, i)
                html = self.grab(url)
                self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
            
            self.url_queue.task_done()
    

class DatamineThread(Thread):
    """Parse data from html"""
    def __init__(self, html_queue, filetype):
        Thread.__init__(self)
        self.html_queue = html_queue
        self.filetype = filetype

    def __datamine(self, data):
        '''Get data from html content'''
        soup = BeautifulSoup(data['content'].text, 'html.parser')
        infos = []
        for info in soup.find_all('tr', id=True):
            items = []
            for item in info.find_all('td'):
                items.append(item.get_text())
            infos.append(items)
        return infos
        
    def run(self):
        while True:
            data = self.html_queue.get()
            print('Datamine Thread: get %s_%d' % (data['name'], data['num']))

            store = Storage(data['name'], self.filetype)
            store.save(self.__datamine(data))
            self.html_queue.task_done()


class Storage():
    def __init__(self, filename, filetype):
        self.filetype = filetype
        self.filename = filename + filetype
        self.table_header = ('姓名', '性別', '從業資格號', '投資諮詢從業證書號', '任職部門', '職務', '任現職時間')
        self.path = self.__get_path()

    def __get_path(self):
        path = {
            'Windows': 'D:/litreily/Documents/python/cfachina',
            'Linux': '/mnt/d/litreily/Documents/python/cfachina'
        }.get(platform.system())

        if not os.path.isdir(path):
            os.makedirs(path)
        return '%s/%s' % (path, self.filename)
    
    def write_txt(self, data):
        '''Write data to txt file'''
        fid = open(self.path, 'a', encoding='utf-8')

        # insert the header of table
        if not os.path.getsize(self.path):
            fid.write('\t'.join(self.table_header) + '\n')
        
        for info in data:
            fid.write('\t'.join(info) + '\n')
        fid.close()
    
    def write_excel(self, data):
        '''write data to excel file'''
        if not os.path.exists(self.path):
            header_style = xlwt.easyxf('font:name 楷體, color-index black, bold on')
            wb = xlwt.Workbook(encoding='utf-8')
            ws = wb.add_sheet('Data')

            # insert the header of table
            for i in range(len(self.table_header)):
                ws.write(0, i, self.table_header[i], header_style)
        else:
            rb = open_workbook(self.path)
            wb = copy(rb)
            ws = wb.get_sheet(0)
        
        # write data
        offset = len(ws.rows)
        for i in range(0, len(data)):
            for j in range(0, len(data[0])):
                ws.write(offset + i, j, data[i][j])

        # When use xlutils.copy.copy function to copy data from exist .xls file,
        # it will loss the origin style, so we need overwrite the width of column,
        # maybe there some other good solution, but I have not found yet.
        for i in range(len(self.table_header)):
            ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]

        # save to file
        while True:
            try:
                wb.save(self.path)
                break
            except PermissionError as e:
                print('{0} error: {1}'.format(self.path, e.strerror))
                time.sleep(5)
            finally:
                pass
    
    def save(self, data):
        '''Write data to local file. According filetype to choose function to save data, filetype can be '.txt' or '.xls', but '.txt' type is saved more faster then '.xls' type Args: data: a 2d-list array that need be save '''
        {
            '.txt': self.write_txt,
            '.xls': self.write_excel
        }.get(self.filetype)(data)


def main():
    for i in range(1001, 1199):
        url_queue.put(str(i))

    # create and start a spider thread
    st = SpiderThread(url_queue, html_queue)
    st.setDaemon(True)
    st.start()

    # create and start a datamine thread
    dt = DatamineThread(html_queue, '.xls')
    dt.setDaemon(True)
    dt.start()

    # wait on the queue until everything has been processed
    url_queue.join()
    html_queue.join()


if __name__ == '__main__':
    main()
複製代碼

爬取測試

spider

save to txt

save to excel

寫在最後

  • 測試發現,寫入txt的速度明顯高於寫入excel的速度
  • 若是將頁面網址中的pageSize修改成1000或更大,則能夠一次性獲取某機構的全部從業人員信息,而不用逐頁爬取,效率能夠大大提升。
  • 該爬蟲已託管至github Python-demos
相關文章
相關標籤/搜索