Golang實現簡單爬蟲框架(5)——項目重構與數據存儲

前言

在上一篇文章《Golang實現簡單爬蟲框架(4)——隊列實現併發任務調度》中,咱們使用用隊列實現了任務調度,接下來首先對兩種併發方式作一個同構,使代碼統一。而後添加數據存儲模塊。html

注意:本次併發是在上一篇文章簡單併發實現的基礎上修改,因此沒有貼出所有代碼,只是貼出部分修改部分,要查看完整項目代碼,能夠查看上篇文章,或者從github下載項目源代碼查看git

一、項目重構

(1)併發引擎

經過分析咱們發現,兩種不一樣調度的區別是每一個worker一個channel仍是 全部worker共用一個channel,因此咱們在接口中定義一個函數WorkerChan(),用來決定這件事,即worker一個channel仍是 全部worker共用一個channel。此時ConfigMasterWorkerChan就再也不須要了。github

在項目文件concurrent.go中咱們定義一個任務調度器Scheduler,以下:golang

// 任務調度器
type Scheduler interface {
	Submit(request Request) // 提交任務
	ConfigMasterWorkerChan(chan Request)
	WorkerReady(w chan Request)
	Run()
}
複製代碼

可是在簡單併發中咱們只實現了SubmitConfigMasterWorkerChan接口,而使用隊列調度中卻實現了接口的全部方法,全部咱們同構一下使concurrent.go文件能夠適用於兩種不一樣的調度器。mongodb

由於在createworker函數中要使用WorkerReady函數,因此要傳入一個Scheduler,可是這樣顯得比較重,咱們能夠利用接口組合,新建一個接口ReadyNotifier,這樣在createworker函數中傳入ReadyNotifier便可。數據庫

修改後的任務調度以下:json

type Scheduler interface {
	ReadyNotifier
	Submit(request Request) // 提交任務
	WorkerChan() chan Request
	Run()
}
type ReadyNotifier interface {
	WorkerReady(chan Request)
}
複製代碼

此時建立goroutine修改以下:session

// 建立 goroutine
for i := 0; i < e.WorkerCount; i++ {
    //任務是每一個 worker 一個 channel 仍是 全部 worker 共用一個 channel 由WorkerChan 來決定
	createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
}
複製代碼

修改後的concurrent.go文件以下:併發

package engine

import (
	"log"
)

// 併發引擎
type ConcurrendEngine struct {
	Scheduler   Scheduler
	WorkerCount int
}

// 任務調度器
type Scheduler interface {
	ReadyNotifier
	Submit(request Request) // 提交任務
	WorkerChan() chan Request
	Run()
}
type ReadyNotifier interface {
	WorkerReady(chan Request)
}

func (e *ConcurrendEngine) Run(seeds ...Request) {

	out := make(chan ParseResult)
	e.Scheduler.Run()

	// 建立 goruntine
	for i := 0; i < e.WorkerCount; i++ {
		// 任務是每一個 worker 一個 channel 仍是 全部 worker 共用一個 channel 由WorkerChan 來決定
		createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
	}

	// engine把請求任務提交給 Scheduler
	for _, request := range seeds {
		e.Scheduler.Submit(request)
	}

	itemCount := 0
	for {
		// 接受 Worker 的解析結果
		result := <-out
		for _, item := range result.Items {
			log.Printf("Got item: #%d: %v\n", itemCount, item)
			itemCount++
		}

		// 而後把 Worker 解析出的 Request 送給 Scheduler
		for _, request := range result.Requests {
			e.Scheduler.Submit(request)
		}
	}
}

func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) {
	go func() {
		for {
			ready.WorkerReady(in) // 告訴調度器任務空閒
			request := <-in
			result, err := worker(request)
			if err != nil {
				continue
			}
			out <- result
		}
	}()
}
複製代碼

(2)簡單併發調度器

scheduler/simple.goapp

package scheduler

import "crawler/engine"

type SimpleScheduler struct {
	workerChan chan engine.Request
}

func (s *SimpleScheduler) WorkerChan() chan engine.Request {
	// 此時全部 worker 共用同一個 channel,直接返回便可
	return s.workerChan
}

func (s *SimpleScheduler) WorkerReady(w chan engine.Request) {

}

func (s *SimpleScheduler) Run() {
    // 建立出 workchannel
	s.workerChan = make(chan engine.Request)
}

func (s *SimpleScheduler) Submit(request engine.Request) {
	// send request down to worker chan
	go func() {
		s.workerChan <- request
	}()
}
複製代碼

(3)隊列實現調度器

scheduler/queued.go

添加WorkerChan()的實現便可

package scheduler

import "crawler/engine"

// 使用隊列來調度任務

type QueuedScheduler struct {
	requestChan chan engine.Request
	workerChan  chan chan engine.Request
}

func (s *QueuedScheduler) WorkerChan() chan engine.Request {
	// 對於隊列實現來說,每一個 worker 共用一個 channel
	return make(chan engine.Request)
}

// 提交請求任務到 requestChan
func (s *QueuedScheduler) Submit(request engine.Request) {
	s.requestChan <- request
}

// 告訴外界有一個 worker 能夠接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
	s.workerChan <- w
}

func (s *QueuedScheduler) Run() {
	s.workerChan = make(chan chan engine.Request)
	s.requestChan = make(chan engine.Request)
	go func() {
		// 建立請求隊列和工做隊列
		var requestQ []engine.Request
		var workerQ []chan engine.Request
		for {
			var activeWorker chan engine.Request
			var activeRequest engine.Request

			if len(requestQ) > 0 && len(workerQ) > 0 {
				activeWorker = workerQ[0]
				activeRequest = requestQ[0]
			}

			select {
			case r := <-s.requestChan: // 當 requestChan 收到數據
				requestQ = append(requestQ, r)
			case w := <-s.workerChan: // 當 workerChan 收到數據
				workerQ = append(workerQ, w)
			case activeWorker <- activeRequest: // 當請求隊列和認讀隊列都不爲空時,給任務隊列分配任務
				requestQ = requestQ[1:]
				workerQ = workerQ[1:]
			}
		}
	}()
}
複製代碼

(4)main函數

通過上述同構,在main函數中如需切換不一樣調度器,只須要相應的配置便可。

package main

import (
	"crawler/engine"
	"crawler/scheduler"
	"crawler/zhenai/parser"
)

func main() {
	e := engine.ConcurrendEngine{
		//Scheduler: &scheduler.QueuedScheduler{}, // 隊列實現調度器
		Scheduler:   &scheduler.SimpleScheduler{},	// 簡單併發調度
		WorkerCount: 50,
	}
	e.Run(engine.Request{
		Url:       "http://www.zhenai.com/zhenghun",
		ParseFunc: parser.ParseCityList,
	})
}
複製代碼

二、數據存儲

(1)Mgo的介紹安裝

爬取到的數據不能僅僅在控制檯打印出來,因此咱們還要給爬蟲添加數據存儲模塊。咱們本次選擇使用mongodb來存儲咱們的數據。

mgo(音mango)是MongoDBGo語言驅動,它用基於Go語法的簡單API實現了豐富的特性,並通過良好測試。

官方網址:labix.org/mgo

文檔:API docs for mgo

首先咱們要安裝mgo,打開終端,輸入下面代碼完成安裝

go get gopkg.in/mgo.v2
複製代碼

mgo基本操做都很簡單,有數據庫操做經驗均可以很快上手。

(2)爬蟲引擎與數據格式

首先,爬蟲引擎獲取到數據要把數據發送給數據存儲模塊,而數據的傳遞用要用到channel,因此打開concurrent.go文件,在引擎添加ItemChan屬性,以下所示:

爬取到數據須要把數據發送到數據存儲模塊,

package engine
// 併發引擎
type ConcurrendEngine struct {
	Scheduler   Scheduler // 任務調度器
	WorkerCount int       // 併發任務數量
	ItemChan    chan Item // 數據保存 channel
}

// ...
for {
    // 接受 Worker 的解析結果
    result := <-out
    for _, item := range result.Items {
        // 當抓取一組數據後,進行保存
        go func(item2 Item) {
			e.ItemChan <- item2
		}(item)
    }
	// ...
}
// ...
複製代碼

engine/types.go中定義Item類型:

package engine

// 請求結構
type Request struct {
	Url       string // 請求地址
	ParseFunc func([]byte) ParseResult } // 解析結果結構 type ParseResult struct {
	Requests []Request // 解析出的請求
	Items    []Item    // 解析出的內容
}

// 解析出的用戶數據格式
type Item struct {
	Url     string      // 我的信息Url地址
	Type    string      // table
	Id      string      // Id
	Payload interface{} // 詳細信息
}

func NilParseFun([]byte) ParseResult {
	return ParseResult{}
}
複製代碼

(3)存儲模塊的實現

在根目錄下建立persist文件夾,而後建立itemsaver.go文件

// persist/itemsaver.go
package persist

import (
	"context"
	"crawler/engine"
	"errors"
	"gopkg.in/mgo.v2"
	"gopkg.in/olivere/elastic.v5"
	"log"
)

func ItemSaver(index string) (chan engine.Item, error) {
	// mongodb connect
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		panic(err)
	}

	out := make(chan engine.Item)
	go func() {
		itemCount := 0
		for {
			// 接收到發送的 item
			item := <-out
			log.Printf("Item Saver: got item #%d: %v\n",
				itemCount, item)
			itemCount++

			// Save data in mongodb
			err := mongo_save(session, index, item)

			if err != nil {
				// if have err, ignore it
				log.Printf("Item Saver: error, saving item %v: %v",
					item, err)
			}
		}
	}()
	return out, nil
}

// 使用 MongoDB 保存數據
func mongo_save(session *mgo.Session, dbName string, item engine.Item) error {
	if item.Type == "" {
		return errors.New("must supply Type")
	}
	c := session.DB(dbName).C(item.Type)	// 選擇要操做的數據庫與集合
	err := c.Insert(item)		// 插入數據
	if err != nil {
		log.Fatal(err)
	}
	return nil
}
複製代碼

(4)存儲測試文件

咱們把一條數據存入mongodb,而後再取出來,比對讀出的數據和寫入的數據是否相同

// persist/itemsaver_test.gp
package persist

import (
	"crawler/engine"
	"crawler/model"
	"encoding/json"
	"fmt"
	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
	"log"
	"testing"
)
func TestMongoSave(t *testing.T) {
	// mongodb connect
	session, err := mgo.Dial("localhost:27017")
	if err != nil {
		panic(err)
	}

	expected := engine.Item{
		Url:  "http://album.zhenai.com/u/1946858930",
		Type: "zhenai",
		Id:   "1946858930",
		Payload: model.Profile{
			Name:     "為你垨候",
			Gender:   "女士",
			Age:      40,
			Height:   163,
			Weight:   54,
			Income:   "5-8千",
			Marriage: "未婚",
			Address:  "佛山順德區",
		},
	}
	// 保存數據
	err = mongo_save(session, "crawler", expected)
	if err != nil {
		panic(err)
	}

	c := session.DB("crawler").C("zhenai")

	var result engine.Item
    // 查詢數據
	err = c.Find(bson.M{"id": "1946858930"}).One(&result)
	// result 爲 Json 類型
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload)
}
複製代碼

(5)parser模塊

咱們要在parse/profile.go文件中組裝好須要保存到數據庫的數據格式

// ...
result := engine.ParseResult{
    Items: []engine.Item{
        {
            Url:     url,
            Type:    "zhenai",
            Id:      extractString([]byte(url), idUrlRe),
            Payload: profile,
        },
    },
}
// ...
複製代碼

(6)main函數

package main

import (
	"crawler/engine"
	"crawler/persist"
	"crawler/scheduler"
	"crawler/zhenai/parser"
)

func main() {
	itemChan, err := persist.ItemSaver()
	if err != nil {
		panic(err)
	}

	e := engine.ConcurrendEngine{
		//Scheduler: &scheduler.QueuedScheduler{},
		Scheduler:   &scheduler.SimpleScheduler{},
		WorkerCount: 100,
		ItemChan:    itemChan,
	}
	e.Run(engine.Request{
		Url:       "http://www.zhenai.com/zhenghun",
		ParseFunc: parser.ParseCityList,
	})
}
複製代碼

運行項目,打開mongodb可視化工具,能夠看到爬取了54410條數據

三、總結

咱們首先把兩種併發方式作一個同構,使代碼統一,直接在main函數中使用不一樣的配置就能夠切換調度器,簡單方便。而後使用Mgo驅動操做數據,添加到mongodb中。內容有點多,不少代碼沒有完整的展現出來,但願你們能夠下載項目源代碼,回滾到對應提交記錄查看,效果會更好。 別無所求,只求隨手給個star

下篇博客中咱們會再當前博客的基礎上添加數據展現功能

若是想獲取Google工程師深度講解go語言視頻資源的,能夠在評論區留下郵箱。

若是以爲文章還能夠,勞煩大人隨手點個贊。。。

相關文章
相關標籤/搜索