博客首發於www.litreily.tophtml
應一位金融圈的朋友所託,幫忙寫個爬蟲,幫他爬取中國期貨行業協議網站中全部金融機構的從業人員信息。網站數據的獲取自己比較簡單,可是爲了學習一些新的爬蟲方法和技巧,即本文要講述的生產者消費者模型,我又學習了一下Python中隊列庫queue
及線程庫Thread
的使用方法。python
生產者消費者模型很是簡單,相信大部分程序員都知道,就是一方做爲生產者不斷提供資源,另外一方做爲消費者不斷消費資源。簡單點說,就比如餐館的廚師和顧客,廚師做爲生產者不斷製做美味的食物,而顧客做爲消費者不斷食用廚師提供的食物。此外,生產者與消費者之間能夠是一對1、一對多、多對一和多對多的關係。git
那麼這個模型和爬蟲有什麼關係呢?其實,爬蟲能夠認爲是一個生產者,它不斷從網站爬取數據,爬取到的數據就是食物;而所得數據須要消費者進行數據清洗,把有用的數據吸取掉,把無用的數據丟棄。程序員
在實踐過程當中,爬蟲爬取和數據清洗分別對應一個Thread
,兩個線程之間經過順序隊列queue
傳遞數據,數據傳遞過程就比如餐館服務員從廚房把食物送到顧客餐桌上的過程。爬取線程負責爬取網站數據,並將原始數據存入隊列,清洗線程從隊列中按入隊順序讀取原始數據並提取出有效數據。github
以上即是對生產者消費者模型的簡單介紹了,下面針對本次爬取任務予以詳細說明。web
http://www.cfachina.org/cfainfo/organbaseinfoServlet?all=personinfojson
咱們要爬取的數據是主頁顯示的表格中全部期貨公司的從業人員信息,每一個公司對應一個機構編號(G01001~G01198
)。從上圖能夠看到有主頁有分頁,共8頁。以G01001
方正中期期貨公司爲例,點擊該公司名稱跳轉至對應網頁以下:數組
從網址及網頁內容能夠提取出如下信息:app
organid
: 機構編號,+G01001+
~ +G01198+
currentPage
: 該機構從業人員信息當前頁面編號pageSize
: 每一個頁面顯示的人員個數,默認20selectType
: 固定爲personinfo
mechanism_name
,在每頁表格上方能夠看到當前機構名稱page_cnt
咱們最終爬取的數據能夠按機構名稱存儲到對應的txt文件或excel文件中。ide
獲取到某機構的任意從業信息頁面後,使用BeautifulSoup
可快速提取機構名稱。
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
複製代碼
那麼有人可能會問,既然主頁表格都已經包含了全部機構的編號和名稱,爲什麼還要畫蛇添足的再獲取一次呢?這是由於,我壓根就不想爬主頁的那些表格,直接根據機構編號的遞增規律生成對應的網址便可,因此獲取機構名稱的任務就放在了爬取每一個機構首個信息頁面以後。
每一個機構的數據量是不等的,幸虧每一個頁面都包含了當前頁面數及總頁面數。使用如下代碼便可獲取頁碼數。
url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
page_cnt = url_re.search(html).group(1)
複製代碼
從每一個機構首頁獲取頁碼數後,即可for
循環修改網址參數中的currentPage
,逐頁獲取機構信息。
針對如上圖所示的一個特定信息頁時,人員信息被存放於一個表中,除了固定的表頭信息外,人員信息均被包含在一個帶有id
的tr
標籤中,因此使用BeautifulSoup
能夠很容易提取出頁面內全部人員信息。
soup.find_all('tr', id=True)
複製代碼
通常的想法固然是逐頁爬取主頁信息,而後獲取每頁全部機構對應的網頁連接,進而繼續爬取每一個機構信息。
可是因爲該網站的機構信息網址具備明顯的規律,咱們根據每一個機構的編號即可直接獲得每一個機構每一個信息頁面的網址。因此具體爬取方案以下:
url_queue
SpiderThread
完成抓取任務url_queue
中讀取一個編號,生成機構首頁網址,使用requests
抓取之html_queue
DatamineThread
完成數據清洗任務html_queue
中讀取一組頁面信息BeautifulSoup
提取頁面中的從業人員信息Storage
存入本地文件SpiderThread
爬蟲線程先從隊列獲取一個機構編號,生成機構首頁網址並進行爬取,接着判斷機構頁面數量是否爲0,如若不爲0則繼續獲取機構名稱,並根據頁面數循環爬取剩餘頁面,將原始html數據以以下dict
格式存入隊列html_queue
:
{
'name': mechanismId_mechanismName,
'num': currentPage,
'content': html
}
複製代碼
爬蟲產生的數據隊列html_queue
將由數據清洗線程進行處理,下面是爬蟲線程的主程序,整個線程代碼請看後面的源碼。
def run(self):
while True:
mechanism_id = 'G0' + self.url_queue.get()
# the first page's url
url = self.__get_url(mechanism_id, 1)
html = self.grab(url)
page_cnt = self.url_re.search(html.text).group(1)
if page_cnt == '0':
self.url_queue.task_done()
continue
soup = BeautifulSoup(html.text, 'html.parser')
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))
# put data into html_queue
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
for i in range(2, int(page_cnt) + 1):
url = self.__get_url(mechanism_id, i)
html = self.grab(url)
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
self.url_queue.task_done()
複製代碼
DatamineThread
數據清洗線程比較簡單,就是從生產者提供的數據隊列html_queue
逐一提取html
數據,而後從html
數據中提取從業人員信息,以二維數組形式存儲,最後交由存儲模塊Storage
完成數據存儲工做。
class DatamineThread(Thread):
"""Parse data from html"""
def __init__(self, html_queue, filetype):
Thread.__init__(self)
self.html_queue = html_queue
self.filetype = filetype
def __datamine(self, data):
'''Get data from html content'''
soup = BeautifulSoup(data['content'].text, 'html.parser')
infos = []
for info in soup.find_all('tr', id=True):
items = []
for item in info.find_all('td'):
items.append(item.get_text())
infos.append(items)
return infos
def run(self):
while True:
data = self.html_queue.get()
print('Datamine Thread: get %s_%d' % (data['name'], data['num']))
store = Storage(data['name'], self.filetype)
store.save(self.__datamine(data))
self.html_queue.task_done()
複製代碼
Storage
我寫了兩類文件格式的存儲函數,write_txt
, write_excel
,分別對應txt
,excel
文件。實際存儲時由調用方肯定文件格式。
def save(self, data):
{
'.txt': self.write_txt,
'.xls': self.write_excel
}.get(self.filetype)(data)
複製代碼
存入txt
文件是比較簡單的,就是以附加(a
)形式打開文件,寫入數據,關閉文件。其中,文件名稱由調用方提供。寫入數據時,每一個人員信息佔用一行,以製表符\t
分隔。
def write_txt(self, data):
'''Write data to txt file'''
fid = open(self.path, 'a', encoding='utf-8')
# insert the header of table
if not os.path.getsize(self.path):
fid.write('\t'.join(self.table_header) + '\n')
for info in data:
fid.write('\t'.join(info) + '\n')
fid.close()
複製代碼
存入Excel
文件仍是比較繁瑣的,因爲經驗很少,選用的是xlwt
, xlrd
和xlutils
庫。說實話,這3個庫真心不大好用,勉強完成任務而已。爲何這麼說,且看:
xlwt
只能寫,xlrd
只能讀,須要xlutils
的copy
函數將xlrd
讀取的數據複製到內存,再用xlwt
修改.xls
文件:.xlsx
經讀寫也會變成.xls
格式因此後續我確定會再學學其它的excel
庫,固然,當前解決方案暫時還用這三個。代碼以下:
def write_excel(self, data):
'''write data to excel file'''
if not os.path.exists(self.path):
header_style = xlwt.easyxf('font:name 楷體, color-index black, bold on')
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet('Data')
# insert the header of table
for i in range(len(self.table_header)):
ws.write(0, i, self.table_header[i], header_style)
else:
rb = open_workbook(self.path)
wb = copy(rb)
ws = wb.get_sheet(0)
# write data
offset = len(ws.rows)
for i in range(0, len(data)):
for j in range(0, len(data[0])):
ws.write(offset + i, j, data[i][j])
# When use xlutils.copy.copy function to copy data from exist .xls file,
# it will loss the origin style, so we need overwrite the width of column,
# maybe there some other good solution, but I have not found yet.
for i in range(len(self.table_header)):
ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]
# save to file
while True:
try:
wb.save(self.path)
break
except PermissionError as e:
print('{0} error: {1}'.format(self.path, e.strerror))
time.sleep(5)
finally:
pass
複製代碼
說明:
offset
,即當前文件已包含數據的行數PermissionError
異常,能夠在捕獲該異常而後提示錯誤信息,並定時等待直到文件被關閉。主函數用於建立和啓動生產者線程和消費者線程,同時爲生產者線程提供機構編號隊列。
url_queue = queue.Queue()
html_queue = queue.Queue()
def main():
for i in range(1001, 1199):
url_queue.put(str(i))
# create and start a spider thread
st = SpiderThread(url_queue, html_queue)
st.setDaemon(True)
st.start()
# create and start a datamine thread
dt = DatamineThread(html_queue, '.xls')
dt.setDaemon(True)
dt.start()
# wait on the queue until everything has been processed
url_queue.join()
html_queue.join()
複製代碼
從主函數能夠看到,兩個隊列都調用了join
函數,用於阻塞,直到對應隊列爲空爲止。要注意的是,隊列操做中,每一個出隊操做queue.get()
須要對應一個queue.task_done()
操做,不然會出現隊列數據已所有處理完,但主線程仍在執行的狀況。
至此,爬蟲的主要代碼便講解完了,下面是完整源碼。
#!/usr/bin/python3
# -*-coding:utf-8-*-
import queue
from threading import Thread
import requests
import re
from bs4 import BeautifulSoup
import os
import platform
import xlwt
from xlrd import open_workbook
from xlutils.copy import copy
import time
# url format ↓
# http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+¤tPage=1&pageSize=20&selectType=personinfo&all=undefined
# organid: +G01001+, +G01002+, +G01003+, ...
# currentPage: 1, 2, 3, ...
# pageSize: 20(default)
#
# Algorithm design:
# 2 threads with 2 queues
# Thread-1, get first page url, then get page_num and mechanism_name from first page
# Thread-2, parse html file and get data from it, then output data to local file
# url_queue data -> 'url' # first url of each mechanism
# html_queue data -> {'name':'mechanism_name', 'html':data}
url_queue = queue.Queue()
html_queue = queue.Queue()
class SpiderThread(Thread):
"""Threaded Url Grab"""
def __init__(self, url_queue, html_queue):
Thread.__init__(self)
self.url_queue = url_queue
self.html_queue = html_queue
self.page_size = 20
self.url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}
def __get_url(self, mechanism_id, current_page):
return 'http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+%s+¤tPage=%d&pageSize=%d&selectType=personinfo&all=undefined' \
% (mechanism_id, current_page, self.page_size)
def grab(self, url):
'''Grab html of url from web'''
while True:
try:
html = requests.get(url, headers=self.headers, timeout=20)
if html.status_code == 200:
break
except requests.exceptions.ConnectionError as e:
print(url + ' Connection error, try again...')
except requests.exceptions.ReadTimeout as e:
print(url + ' Read timeout, try again...')
except Exception as e:
print(str(e))
finally:
pass
return html
def run(self):
'''Grab all htmls of mechanism one by one Steps: 1. grab first page of each mechanism from url_queue 2. get number of pages and mechanism name from first page 3. grab all html file of each mechanism 4. push all html to html_queue '''
while True:
mechanism_id = 'G0' + self.url_queue.get()
# the first page's url
url = self.__get_url(mechanism_id, 1)
html = self.grab(url)
page_cnt = self.url_re.search(html.text).group(1)
if page_cnt == '0':
self.url_queue.task_done()
continue
soup = BeautifulSoup(html.text, 'html.parser')
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))
# put data into html_queue
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
for i in range(2, int(page_cnt) + 1):
url = self.__get_url(mechanism_id, i)
html = self.grab(url)
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
self.url_queue.task_done()
class DatamineThread(Thread):
"""Parse data from html"""
def __init__(self, html_queue, filetype):
Thread.__init__(self)
self.html_queue = html_queue
self.filetype = filetype
def __datamine(self, data):
'''Get data from html content'''
soup = BeautifulSoup(data['content'].text, 'html.parser')
infos = []
for info in soup.find_all('tr', id=True):
items = []
for item in info.find_all('td'):
items.append(item.get_text())
infos.append(items)
return infos
def run(self):
while True:
data = self.html_queue.get()
print('Datamine Thread: get %s_%d' % (data['name'], data['num']))
store = Storage(data['name'], self.filetype)
store.save(self.__datamine(data))
self.html_queue.task_done()
class Storage():
def __init__(self, filename, filetype):
self.filetype = filetype
self.filename = filename + filetype
self.table_header = ('姓名', '性別', '從業資格號', '投資諮詢從業證書號', '任職部門', '職務', '任現職時間')
self.path = self.__get_path()
def __get_path(self):
path = {
'Windows': 'D:/litreily/Documents/python/cfachina',
'Linux': '/mnt/d/litreily/Documents/python/cfachina'
}.get(platform.system())
if not os.path.isdir(path):
os.makedirs(path)
return '%s/%s' % (path, self.filename)
def write_txt(self, data):
'''Write data to txt file'''
fid = open(self.path, 'a', encoding='utf-8')
# insert the header of table
if not os.path.getsize(self.path):
fid.write('\t'.join(self.table_header) + '\n')
for info in data:
fid.write('\t'.join(info) + '\n')
fid.close()
def write_excel(self, data):
'''write data to excel file'''
if not os.path.exists(self.path):
header_style = xlwt.easyxf('font:name 楷體, color-index black, bold on')
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet('Data')
# insert the header of table
for i in range(len(self.table_header)):
ws.write(0, i, self.table_header[i], header_style)
else:
rb = open_workbook(self.path)
wb = copy(rb)
ws = wb.get_sheet(0)
# write data
offset = len(ws.rows)
for i in range(0, len(data)):
for j in range(0, len(data[0])):
ws.write(offset + i, j, data[i][j])
# When use xlutils.copy.copy function to copy data from exist .xls file,
# it will loss the origin style, so we need overwrite the width of column,
# maybe there some other good solution, but I have not found yet.
for i in range(len(self.table_header)):
ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]
# save to file
while True:
try:
wb.save(self.path)
break
except PermissionError as e:
print('{0} error: {1}'.format(self.path, e.strerror))
time.sleep(5)
finally:
pass
def save(self, data):
'''Write data to local file. According filetype to choose function to save data, filetype can be '.txt' or '.xls', but '.txt' type is saved more faster then '.xls' type Args: data: a 2d-list array that need be save '''
{
'.txt': self.write_txt,
'.xls': self.write_excel
}.get(self.filetype)(data)
def main():
for i in range(1001, 1199):
url_queue.put(str(i))
# create and start a spider thread
st = SpiderThread(url_queue, html_queue)
st.setDaemon(True)
st.start()
# create and start a datamine thread
dt = DatamineThread(html_queue, '.xls')
dt.setDaemon(True)
dt.start()
# wait on the queue until everything has been processed
url_queue.join()
html_queue.join()
if __name__ == '__main__':
main()
複製代碼
txt
的速度明顯高於寫入excel
的速度pageSize
修改成1000
或更大,則能夠一次性獲取某機構的全部從業人員信息,而不用逐頁爬取,效率能夠大大提升。