Golang實現簡單爬蟲框架(3)——簡單併發版

在上篇文章Golang實現簡單爬蟲框架(2)——單任務版爬蟲中咱們實現了一個簡單的單任務版爬蟲,對於單任務版爬蟲,每次都要請求頁面,而後解析數據,而後才能請求下一個頁面。整個過程當中,獲取網頁數據速度比較慢,那麼咱們就把獲取數據模塊作成併發執行。在項目的基礎上,實現多任務併發版爬蟲。html

項目github地址:github.com/NovemberCho… 回滾到相應記錄食用,效果更佳。git

一、項目架構

首先咱們把但任務版爬蟲架構中的Fetcher模塊和Parser模塊合併成一個Worker模塊,而後併發執行Worker模塊github

而後獲得併發版的架構圖:golang

  • 在併發版爬蟲中,會同時執行多個Worker,每一個Worker任務接受一個Request請求,而後請求頁面解析數據,輸出解析出的RequestsItembash

  • 由於又不少RequestWorker,因此還須要Scheduler模塊,負責對請求任務的調度處理架構

  • Engine模塊接受Worker發送的RequestsItems,當前咱們先把Items打印出,把解析出的Request發送給調度器併發

  • 其中EngineScheduler是一個goroutineWorker包含多個goroutine,各個模塊之間都是用channel進行鏈接框架

    先放上重構後的項目文件結構:函數

二、Worker實現

咱們從engine.go中提取下面功能做爲Worker模塊,同時把engine.go 改名爲simple.go。修改後的simple.go文件請自行調整,或者去github項目源代碼回滾查看。post

engine/worker.go

package engine

import (
	"crawler/fetcher"
	"log"
)

// 輸入 Request, 返回 ParseResult
func worker(request Request) (ParseResult, error) {
	log.Printf("Fetching %s\n", request.Url)
	content, err := fetcher.Fetch(request.Url)
	if err != nil {
		log.Printf("Fetch error, Url: %s %v\n", request.Url, err)
		return ParseResult{}, err
	}
	return request.ParseFunc(content), nil
}
複製代碼

對於每個Worker接受一個請求,而後返回解析出的內容

三、併發引擎Concurrent實現

請你們根據架構圖來看,效果會更好。

package engine

import "log"

// 併發引擎
type ConcurrendEngine struct {
   Scheduler   Scheduler	// 任務調度器
   WorkerCount int			// 任務併發數量
}

// 任務調度器
type Scheduler interface {
   Submit(request Request) // 提交任務
   ConfigMasterWorkerChan(chan Request)	// 配置初始請求任務
}

func (e *ConcurrendEngine) Run(seeds ...Request) {

   in := make(chan Request)			// scheduler的輸入
   out := make(chan ParseResult)	// worker的輸出
   e.Scheduler.ConfigMasterWorkerChan(in)	// 把初始請求提交給scheduler

   // 建立 goruntine
   for i := 0; i < e.WorkerCount; i++ {
      createWorker(in, out)
   }

   // engine把請求任務提交給 Scheduler
   for _, request := range seeds {
      e.Scheduler.Submit(request)
   }

   itemCount := 0
   for {
      // 接受 Worker 的解析結果
      result := <-out
      for _, item := range result.Items {
         log.Printf("Got item: #%d: %v\n", itemCount, item)
         itemCount++
      }

      // 而後把 Worker 解析出的 Request 送給 Scheduler
      for _, request := range result.Requests {
         e.Scheduler.Submit(request)
      }
   }
}

// 建立任務,調用worker,分發goroutine
func createWorker(in chan Request, out chan ParseResult) {
   go func() {
      for {
         request := <-in
         result, err := worker(request)
         if err != nil {
            continue
         }
         out <- result
      }
   }()
}
複製代碼

四、任務調度器Scheduler實現

scheduler/scheduler.go

package scheduler

import "crawler/engine"

type SimpleScheduler struct {
	workerChan chan engine.Request
}

func (s *SimpleScheduler) Submit(request engine.Request) {
	// 爲每個 Request 建立 goroutine
	go func() {
		s.workerChan <- request
	}()
}

// 把初始請求發送給 Scheduler
func (s *SimpleScheduler) ConfigMasterWorkerChan(in chan engine.Request) {
	s.workerChan = in
}

複製代碼

五、main函數

package main

import (
	"crawler/engine"
	"crawler/scheduler"
	"crawler/zhenai/parser"
)

func main() {
	e := engine.ConcurrendEngine{	// 配置爬蟲引擎
		Scheduler:   &scheduler.SimpleScheduler{},
		WorkerCount: 50,
	}
	e.Run(engine.Request{		// 配置爬蟲目標信息
		Url:       "http://www.zhenai.com/zhenghun",
		ParseFunc: parser.ParseCityList,
	})
}
複製代碼

六、小結

本次博客咱們實現一個最簡單的併發版爬蟲,調度器源源不斷的接受任務,一旦有一個worker空閒,就給其分配任務。這樣子有一個缺點,就是咱們不知道咱們分發出那麼多worker的工做狀況,對worker的控制力比較弱,因此在下次博客中會用隊列來實現任務調度。

若是想獲取Google工程師深度講解go語言視頻資源的,能夠在評論區留下郵箱。

項目的源代碼已經託管到Github上,對於各個版本都有記錄,歡迎你們查看,記得給個star,在此先謝謝你們了

相關文章
相關標籤/搜索