Golang搭建並行版爬蟲信息採集框架

1.簡介

Go語言在分佈式系統領域有着更高的開發效率,提供了海量並行的支持。本博文介紹的是採用Go語言搭建一個並行版爬蟲信息採集框架,博文中使用58同城中租房網頁作案例。相比較其餘爬蟲程序它的優勢是:git

  • 1.抓取信息速度很是快,由於是並行處理的,經過配置協程數量,能夠比普通的爬蟲信息採集程序快上上百倍。
  • 2.功能模塊化,每一個功能模塊各司其職,配置簡單。經過修改信息抓取規則,就能夠採集不一樣網站中的數據。

程序源代碼放到github上,連接地址是: https://github.com/GuoZhaoran/crawlergithub

2.項目架構

下面是項目總體架構的示意圖:
圖片描述正則表達式

2.1 Request(請求)

該爬蟲架構中Request請求能夠理解爲:抓取請求url的內容,例如抓取58同城北京市的租房信息時,請求的url是:https://bj.58.com/chuzu/
打開url會發現,網頁頁面中是房源列表信息,那麼接下來要作的工做就是抓取房源詳情信息和分頁後的下一頁房源列表信息。因而就會有新的請求Request,對應不一樣的url連接地址。算法

2.2 Worker(工做者)

咱們在拿到Request請求以後,抓取到網頁頁面內容,就須要有單獨的程序去解析頁面,提取相關信息,這就是worker所要作的工做。數據庫

2.3 Request隊列和Worker隊列

Go語言在構建並行處理程序中有着自然的優點,在該框架中處理Request請求和使用Worker提取相關信息也都是並行工做的。程序中會同時存在着不少個Request,也會有不少個Worker在處理不一樣Request頁面中的內容。因此分別須要一個Request隊列和Worker隊列來管理它們。編程

2.4 Scheduler(調度器)

調度器的職責是將Request分配給空閒的Worker來處理,實現任務調度。由於Request和Worker分別使用隊列進行管理,能夠經過調度器來控制程序的運行過程,例如:分配不一樣數量的Worker,將特定的Request分配給相應的Worker進行處理等。瀏覽器

3.功能模塊和代碼解析

下面咱們來看一下項目的目錄結構,瞭解一下爬蟲架構的功能模塊,再詳細對每個功能模塊的實現過程作介紹:
圖片描述安全

3.1 定義數據結構體

經過上面對項目架構介紹能夠看出,運行該爬蟲程序,須要的數據結構體很簡單,定義數據結構的程序文件是:engine/type.go數據結構

package engine

//請求數據結構
type Request struct {
    Url string    //請求url
    ParserFunc func([]byte) ParseResult    //內容解析函數
}

//通過內容解析函數解析後的返回數據結構體
type ParseResult struct {
    Requests []Request        //請求數據結構切片
    Items []interface{}       //抓取到的有用信息項
}

Request(請求)所要包含的信息是請求url和解析函數,不一樣的url所需的解析函數是不同的,好比咱們要提取的「58同城房源列表」和「房源詳情頁面」信息是不同的,所需解析函數也是不同的,接下來會對者者兩個頁面的解析函數進行介紹。
Worker對請求進行處理以後,返回的結果中可能有新的Request,好比從房源列表中提取出房源詳情頁面的連接。在房源詳情頁面中咱們會拿到詳情信息,這些詳情信息咱們經過Items進行輸出便可(企業中更通用的作法是將這些信息存儲到數據庫,用來作數據分析,這裏咱們只是對並行爬蟲框架實現思路作介紹)架構

3.2 採集器

採集器實現的功能是根據url提取網頁內容,使用Go語言處理很簡單,只須要封裝一個簡單的函數便可,下面是源代碼,不作過多介紹。(若是想要將採集器作的更通用一些,同城還須要對不一樣網站url的編碼作兼容處理),採集器相關的代碼實如今:fetcher/fetcher.go

//根據網頁連接獲取到網頁內容
func Fetch(url string) ([]byte, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("wrong status code: %d", resp.StatusCode)
    }

    bodyReader := bufio.NewReader(resp.Body)

    return ioutil.ReadAll(bodyReader)
}

3.3 解析器

解析器要作的工做是根據fetch拿到的網頁內容,從中提取出有用的信息。上邊咱們提到過Request結構體中,不一樣的Url須要不一樣的解析器,下面咱們就分別看一下房源列表解析器和房源詳情頁面解析器。房源列表解析器代碼實現代碼是:samecity/parser/city.go

package parser

import (
    "depthLearn/goCrawler/engine"
    "regexp"
    "strings"
)
const housesRe = `<a href="(//short.58.com/[^"]*?)"[^>]*>([^<]+)</a>`
const nextPage = `<a class="next" href="([^>]+)"><span>下一頁</span></a>`

func ParseCity(contents []byte) engine.ParseResult {
    re := regexp.MustCompile(housesRe)
    matches := re.FindAllSubmatch(contents, -1)

    result := engine.ParseResult{}
    for _, m := range matches {
        name := string(m[2])
        //格式化抓取的url
        fmtUrl := strings.Replace(string(m[1]), "/", "https:/", 1)
        result.Items = append(
            result.Items, "User "+string(m[2]))
        result.Requests = append(
            result.Requests, engine.Request{
                Url: fmtUrl,
                ParserFunc: func(c []byte) engine.ParseResult {
                    return ParseRoomMsg(c, name)
                },
            })
    }

    nextRe := regexp.MustCompile(nextPage)
    linkMatch := nextRe.FindStringSubmatch(string(contents))
    if len(linkMatch) >= 2 {
    result.Requests = append(
        result.Requests, engine.Request{
            Url:linkMatch[1],
            ParserFunc:ParseCity,
        },
    )}


    return result
}

從代碼中能夠看出,列表解析器所作的工做是提取房源詳情連接,和下一頁房源列表連接。如圖所示:
圖片描述
正則表達式定義到函數循環外部是由於提取連接所用的正則表達式都是同樣的,程序只須要定義一次,檢查正則表達式是否編譯經過(regexp.MustCompile)就能夠了。
經過瀏覽器工具查看源代碼咱們會發現咱們提取的連接並非標準的url形式,而是以下格式的字符串://legoclick.58.com/cpaclick?target=pZwY0jCfsvFJsWN3shPf......,咱們要作的就是把字符串前邊加上https://,這也很容易實現,使用Go語言標準庫函數strings.Replace就能夠實現。
另一個須要注意的地方就是,咱們提取到的房源列表url和房源詳情url所須要的解析函數(ParseFunc)是不同的,從代碼中能夠看出,房源列表url的解析函數是ParseCity,而房源詳情解析函數是ParseRoomMsg。咱們會發現。咱們經過解析房源列表url,會獲得新的房源列表url和房源詳情url,房源詳情url能夠經過解析函數直接拿到咱們想要的數據,而新的房源列表url須要進一步的解析,而後獲得一樣的內容,直到最後一頁,房源列表url解析後再也沒有新的房源列表url位置,數據就抓取完畢了,這種層層遞進的處理數據的方法在算法上叫作:深度優先遍歷算法,感興趣的同窗能夠查找資料學習一下。

3.4 信息模版

上面咱們提到了解析器,信息模版代碼實現文件是:/samecity/parser/profile.go,它所定以的僅僅是咱們要提取信息的一個模版struct。以下圖所示是一個房源詳情頁面,紅圈部分是咱們要提取的數據信息:
圖片描述
咱們再來對比一下profile.go信息模版中所定義的數據結構:

package model

//成員信息結構體
type Profile struct {
    Title     string       //標題
    Price     int          //價格
    LeaseStyle    string   //租賃方式
    HouseStyle    string   //房屋類型
    Community     string   //所在小區
    Address       string   //詳細地址
}

將信息模版單獨定義一個文件也是爲了可以使程序更加模塊化,模塊化帶來的好處是代碼易於維護,假如咱們想要抓取其餘網站的信息,就能夠經過修改解析器的規則,配置信息模版來使用。正如前邊提到的咱們的爬蟲框架比較通用。

3.5 調度器

「調度器」是整個框架中最核心的部分,它實現了將請求分配到worker的調度。爲了讓數據爬取工做可以順利進行,咱們將Worker和每個Request都使用隊列進行管理。咱們先來看一個調度器的接口和實現。
調度器的接口定義是這樣的:

type Scheduler interface {
    Submit(Request)
    ConfigureWorkerMasterChan(chan chan Request)
    WorkerReady(chan Request)
    Run()
}
  • Submit:顧名思義就是將接收到的請求提交給調度器,由調度器分配給空閒的Worker執行。
  • ConfigureWorkerMasterChan:爲每個Worker都分配一個channel,咱們知道Go語言的- channel是協程通訊最經常使用手段,這種基於CSP的通訊模型給咱們的併發編程帶來很大的遍歷。咱們的框架中調度器和Request,Worker之間都是使用channel進行信息傳遞。
  • WorkerReady:當有Worker能夠被分配任務時,向調度器發送的信號,將該Worker加入隊列。
  • Run是啓動程序的發動機,它所作的就是將任務的初始化工做作好,啓動程序。

下面咱們看一下這些方法的具體實現(/scheduler/queue.go)

package scheduler

import "depthLearn/goCrawler/engine"

//隊列調度器
type QueuedScheduler struct {
    requestChan chan engine.Request
    workerChan chan chan engine.Request
}

//將任務提交
func (s *QueuedScheduler) Submit(r engine.Request) {
    s.requestChan <- r
}

//當有worker能夠接收新的任務時
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
    s.workerChan <- w
}

//將request的channel送給調度器
func (s *QueuedScheduler) ConfigureWorkerMasterChan(c chan chan engine.Request) {
    s.workerChan = c
}

func (s *QueuedScheduler) Run(){
    s.workerChan = make(chan chan engine.Request)
    s.requestChan = make(chan engine.Request)
    go func() {
        //創建request隊列和worker隊列
        var requestQ  []engine.Request
        var workerQ   []chan engine.Request
        for {
            //查看是否既存在request又存在worker,取出做爲活動的request和worker
            var activeRequest engine.Request
            var activeWorker chan engine.Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeWorker = workerQ[0]
                activeRequest = requestQ[0]
            }
            select {
            //調度器中有請求時,將請求加入到請求隊列
            case r := <-s.requestChan:
                requestQ = append(requestQ, r)
            //調度器中有能夠接收任務的worker時,將請求加入到worker中
            case w := <-s.workerChan:
                workerQ = append(workerQ, w)
            //當同時有請求又有worker時,將請求分配給worker執行,從隊列中移除
            case activeWorker <- activeRequest:
                workerQ = workerQ[1:]
                requestQ = requestQ[1:]
            }
        }
    }()
}

咱們重點看一下Run方法,首先創建好兩個隊列(workerChan和requestChan),而後開啓一個協程掛起任務,當有request時,加入request隊列;當有worker時,加入worker隊列;當worker和request同時存在時,就將第一個request分配給第一個worker。這樣咱們就實現了調度器,worker和解析器並行工做了。
全部工做都作完以後,咱們就能夠經過ConcurrentEngine,實現程序了,ConcurrentEngine所作的工做就是配置worker數量,接收一個種子url,將調度器,採集器和worker都發動起來工做了,代碼的實現文件是:/engine/concurrent.go

package engine

import "fmt"

type ConcurrentEngine struct {
    Scheduler Scheduler
    WorkerCount int
}

type Scheduler interface {
    Submit(Request)
    ConfigureMasterWorkerChan(chan chan Request)
    WorkerReady(chan Request)
    Run()
}

func (e *ConcurrentEngine) Run(seeds ...Request) {
    out := make(chan ParseResult)
    e.Scheduler.Run()
    for i := 0; i < e.WorkerCount; i++ {
        createWorker(out, e.Scheduler)
    }

    for _, r := range seeds {
        e.Scheduler.Submit(r)
    }

    for {
        result := <- out
        for _, item := range result.Items {
            fmt.Printf("Got item: %v", item)
        }

        for _, request := range result.Requests {
            e.Scheduler.Submit(request)
        }
    }
}

func createWorker(out chan ParseResult, s Scheduler) {
    go func() {
        in := make(chan Request)
        for {
            s.WorkerReady(in)
            // tell scheduler i'm ready
            request := <- in
            result, err := worker(request)
            if err != nil {
                continue
            }
            out <- result
        }
    }()
}

配置worker數量,讓worker工做起來,createWorker就是當worker接收到Request以後開始工做,工做完成以後告訴調度器(經過WorkerReady方法)。worker的實現也很簡單,以下所示:

func  worker(r Request) (ParseResult, error){
    log.Printf("Fetching %s", r.Url)
    body, err := fetcher.Fetch(r.Url)
    if err != nil {
        log.Printf("Fetcher: error " + "fetching url %s: %v", r.Url, err)
        return ParseResult{}, err
    }

    return r.ParserFunc(body), nil
}

至此,全部的工做都準備好了,就能夠開始工做了,入口文件crawler.go:

package main

import (
    "depthLearn/ConcurrentCrawler/engine"
    "depthLearn/ConcurrentCrawler/scheduler"
    "depthLearn/ConcurrentCrawler/zhenai/parser"
)

func main() {
    e := engine.ConcurrentEngine{
        Scheduler: &scheduler.QueuedScheduler{},
        WorkerCount: 100,
    }
    e.Run(engine.Request{
        Url:       "http://www.samecity.com/zhenghun",
        ParserFunc: parser.ParseCityList,
    })
}

下面是命令行打印出來的效果圖:
圖片描述
能夠看到,咱們抓取到數據了。

4.拓展與總結

咱們的爬蟲程序功能還算完備,當時還有不少能夠改進優化的地方,我以爲最主要的有三點:

  • 程序中咱們抓取到的信息是經過文本命令行打印出來的,而在企業應用中咱們更多的將這些有價值的數據存儲到數據庫中。咱們的程序設計的很合理,在parseResult中的item中,包含了咱們抓取的全部信息,讀者可自行編寫數據存儲模塊來實現該功能。
  • 在程序的調度器中,咱們是經過取出request隊列中的第一個和worker中的第一個,將request分配給worker。由於咱們是經過隊列管理了,因此咱們能夠修改調度器的調度規則,從而實現更合理,高效的調度策略。
  • 咱們實現的只是最簡單的爬蟲,真實的場景中,有不少安全性作的都比較好的網站。都有QPS限制,用戶認證,IP過濾等多種防禦手段防止數據抓取。咱們也能夠將這種種狀況都考慮在內,把框架封裝的更通用,功能更完備。

整體來講咱們的並行爬蟲框架仍是挺不錯的,其中涉及到的模塊化編程,隊列管理,調度器等在工做中仍是值得借鑑的。固然,筆者水平有限,語言組織能力也不是太好,雖然參考了不少其餘資料,代碼中存在不少值得優化的地方,但願你們可以留言指正。謝謝你們!

相關文章
相關標籤/搜索