簡易多線程爬蟲框架

本文首發於知乎python

本文使用多線程實現一個簡易爬蟲框架,讓咱們只須要關注網頁的解析,不用本身設置多線程、隊列等事情。調用形式相似scrapy,而諸多功能還不完善,所以稱爲簡易爬蟲框架。數據庫

這個框架實現了Spider類,讓咱們只須要寫出下面代碼,便可多線程運行爬蟲編程

class DouBan(Spider):

    def __init__(self):
        super(DouBan, self).__init__()
        self.start_url = 'https://movie.douban.com/top250'
        self.filename = 'douban.json' # 覆蓋默認值
        self.output_result = False 
        self.thread_num = 10

    def start_requests(self): # 覆蓋默認函數
        yield (self.start_url, self.parse_first)

    def parse_first(self, url): # 只須要yield待爬url和回調函數
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'lxml')

        movies = soup.find_all('div', class_ = 'info')[:5]
        for movie in movies:
            url = movie.find('div', class_ = 'hd').a['href']
            yield (url, self.parse_second)

        nextpage = soup.find('span', class_ = 'next').a
        if nextpage:
            nexturl = self.start_url + nextpage['href']
            yield (nexturl, self.parse_first)
        else:
            self.running = False # 代表運行到這裏則不會繼續添加待爬URL隊列

    def parse_second(self, url):
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'lxml')
        mydict = {}
        title = soup.find('span', property = 'v:itemreviewed')
        mydict['title'] = title.text if title else None
        duration = soup.find('span', property = 'v:runtime')
        mydict['duration'] = duration.text if duration else None
        time = soup.find('span', property = 'v:initialReleaseDate')
        mydict['time'] = time.text if time else None
        yield mydict


if __name__ == '__main__':
    douban = DouBan()
    douban.run()
複製代碼

能夠看到這個使用方式和scrapy很是類似json

  • 繼承類,只須要寫解析函數(由於是簡易框架,所以還須要寫請求函數)
  • 用yield返回數據或者新的請求及回調函數
  • 自動多線程(scrapy是異步)
  • 運行都同樣只要run
  • 能夠設置是否存儲到文件等,只是沒有考慮可擴展性(數據庫等)

下面咱們來講一說它是怎麼實現的bash

咱們能夠對比下面兩個版本,一個是上一篇文章中的使用方法,另外一個是進行了一些修改,將一些功能抽象出來,以便擴展功能。多線程

上一篇文章版本代碼請讀者自行點擊連接去看,下面是修改後的版本代碼。app

import requests
import time
import threading
from queue import Queue, Empty
import json
from bs4 import BeautifulSoup

def run_time(func):
    def wrapper(*args, **kw):
        start = time.time()
        func(*args, **kw)
        end = time.time()
        print('running', end-start, 's')
    return wrapper


class Spider():

    def __init__(self):
        self.start_url = 'https://movie.douban.com/top250'
        self.qtasks = Queue()
        self.data = list()
        self.thread_num = 5
        self.running = True

    def start_requests(self):
        yield (self.start_url, self.parse_first)

    def parse_first(self, url):
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'lxml')

        movies = soup.find_all('div', class_ = 'info')[:5]
        for movie in movies:
            url = movie.find('div', class_ = 'hd').a['href']
            yield (url, self.parse_second)

        nextpage = soup.find('span', class_ = 'next').a
        if nextpage:
            nexturl = self.start_url + nextpage['href']
            yield (nexturl, self.parse_first)
        else:
            self.running = False


    def parse_second(self, url):
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'lxml')
        mydict = {}
        title = soup.find('span', property = 'v:itemreviewed')
        mydict['title'] = title.text if title else None
        duration = soup.find('span', property = 'v:runtime')
        mydict['duration'] = duration.text if duration else None
        time = soup.find('span', property = 'v:initialReleaseDate')
        mydict['time'] = time.text if time else None
        yield mydict


    def start_req(self):
        for task in self.start_requests():
            self.qtasks.put(task)

    def parses(self):
        while self.running or not self.qtasks.empty():
            try:
                url, func = self.qtasks.get(timeout=3)
                print('crawling', url)
                for task in func(url):
                    if isinstance(task, tuple):
                        self.qtasks.put(task)
                    elif isinstance(task, dict):
                        self.data.append(task)
                    else:
                        raise TypeError('parse functions have to yield url-function tuple or data dict')
            except Empty:
                print('{}: Timeout occurred'.format(threading.current_thread().name))
        print(threading.current_thread().name, 'finished')


    @run_time
    def run(self, filename=False):
        ths = []

        th1 = threading.Thread(target=self.start_req)
        th1.start()
        ths.append(th1)

        for _ in range(self.thread_num):
            th = threading.Thread(target=self.parses)
            th.start()
            ths.append(th)

        for th in ths:
            th.join()

        if filename:
            s = json.dumps(self.data, ensure_ascii=False, indent=4)
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(s)

        print('Data crawling is finished.')

if __name__ == '__main__':
    Spider().run(filename='frame.json')
複製代碼

這個改進主要思路以下框架

  • 咱們但願寫解析函數時,像scrapy同樣,用yield返回待抓取的URL和它對應的解析函數,因而就作了一個包含(URL,解析函數)的元組隊列,以後只要不斷從隊列中獲取元素,用函數解析url便可,這個提取的過程使用多線程
  • yield能夠返回兩種類型數據,一種是元組(URL,解析函數),一種是字典(即咱們要的數據),經過判斷分別加入不一樣隊列中。元組隊列是不斷消耗和增添的過程,而字典隊列是一隻增長,最後再一塊兒輸出到文件中
  • queue.get時,加入了timeout參數並作異常處理,保證每個線程都能結束

這裏其實沒有特別的知識,也不須要解釋不少,讀者本身複製代碼到文本文件裏對比就知道了異步

而後框架的形式就是從第二種中,剝離一些通用的設定,讓用戶自定義每一個爬蟲獨特的部分,完整代碼以下(本文開頭的代碼就是下面這塊代碼的後半部分)scrapy

import requests
import time
import threading
from queue import Queue, Empty
import json
from bs4 import BeautifulSoup

def run_time(func):
    def wrapper(*args, **kw):
        start = time.time()
        func(*args, **kw)
        end = time.time()
        print('running', end-start, 's')
    return wrapper


class Spider():

    def __init__(self):
        self.qtasks = Queue()
        self.data = list()
        self.thread_num = 5
        self.running = True
        self.filename = False
        self.output_result = True

    def start_requests(self):
        yield (self.start_url, self.parse)

    def start_req(self):
        for task in self.start_requests():
            self.qtasks.put(task)

    def parses(self):
        while self.running or not self.qtasks.empty():
            try:
                url, func = self.qtasks.get(timeout=3)
                print('crawling', url)
                for task in func(url):
                    if isinstance(task, tuple):
                        self.qtasks.put(task)
                    elif isinstance(task, dict):
                        if self.output_result:
                            print(task)
                        self.data.append(task)
                    else:
                        raise TypeError('parse functions have to yield url-function tuple or data dict')
            except Empty:
                print('{}: Timeout occurred'.format(threading.current_thread().name))
        print(threading.current_thread().name, 'finished')

    @run_time
    def run(self):
        ths = []

        th1 = threading.Thread(target=self.start_req)
        th1.start()
        ths.append(th1)

        for _ in range(self.thread_num):
            th = threading.Thread(target=self.parses)
            th.start()
            ths.append(th)

        for th in ths:
            th.join()

        if self.filename:
            s = json.dumps(self.data, ensure_ascii=False, indent=4)
            with open(self.filename, 'w', encoding='utf-8') as f:
                f.write(s)

        print('Data crawling is finished.')



class DouBan(Spider):

    def __init__(self):
        super(DouBan, self).__init__()
        self.start_url = 'https://movie.douban.com/top250'
        self.filename = 'douban.json' # 覆蓋默認值
        self.output_result = False 
        self.thread_num = 10

    def start_requests(self): # 覆蓋默認函數
        yield (self.start_url, self.parse_first)

    def parse_first(self, url): # 只須要yield待爬url和回調函數
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'lxml')

        movies = soup.find_all('div', class_ = 'info')[:5]
        for movie in movies:
            url = movie.find('div', class_ = 'hd').a['href']
            yield (url, self.parse_second)

        nextpage = soup.find('span', class_ = 'next').a
        if nextpage:
            nexturl = self.start_url + nextpage['href']
            yield (nexturl, self.parse_first)
        else:
            self.running = False # 代表運行到這裏則不會繼續添加待爬URL隊列

    def parse_second(self, url):
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'lxml')
        mydict = {}
        title = soup.find('span', property = 'v:itemreviewed')
        mydict['title'] = title.text if title else None
        duration = soup.find('span', property = 'v:runtime')
        mydict['duration'] = duration.text if duration else None
        time = soup.find('span', property = 'v:initialReleaseDate')
        mydict['time'] = time.text if time else None
        yield mydict


if __name__ == '__main__':
    douban = DouBan()
    douban.run()
複製代碼

咱們這樣剝離以後,就只須要寫後半部分的代碼,只關心網頁的解析,不用考慮多線程的實現了。

歡迎關注個人知乎專欄

專欄主頁:python編程

專欄目錄:目錄

版本說明:軟件及包版本說明

相關文章
相關標籤/搜索