在上一篇文章《Golang實現簡單爬蟲框架(4)——隊列實現併發任務調度》中,咱們使用用隊列實現了任務調度,接下來首先對兩種併發方式作一個同構,使代碼統一。而後添加數據存儲模塊。html
注意:本次併發是在上一篇文章簡單併發實現的基礎上修改,因此沒有貼出所有代碼,只是貼出部分修改部分,要查看完整項目代碼,能夠查看上篇文章,或者從github下載項目源代碼查看git
經過分析咱們發現,兩種不一樣調度的區別是每一個worker
一個channel
仍是 全部worker
共用一個channel
,因此咱們在接口中定義一個函數WorkerChan()
,用來決定這件事,即worker
一個channel
仍是 全部worker
共用一個channel
。此時ConfigMasterWorkerChan
就再也不須要了。github
在項目文件concurrent.go中咱們定義一個任務調度器Scheduler,以下:golang
// 任務調度器
type Scheduler interface {
Submit(request Request) // 提交任務
ConfigMasterWorkerChan(chan Request)
WorkerReady(w chan Request)
Run()
}
複製代碼
可是在簡單併發中咱們只實現了Submit
和ConfigMasterWorkerChan
接口,而使用隊列調度中卻實現了接口的全部方法,全部咱們同構一下使concurrent.go
文件能夠適用於兩種不一樣的調度器。mongodb
由於在createworker
函數中要使用WorkerReady
函數,因此要傳入一個Scheduler
,可是這樣顯得比較重,咱們能夠利用接口組合,新建一個接口ReadyNotifier
,這樣在createworker
函數中傳入ReadyNotifier
便可。數據庫
修改後的任務調度以下:json
type Scheduler interface {
ReadyNotifier
Submit(request Request) // 提交任務
WorkerChan() chan Request
Run()
}
type ReadyNotifier interface {
WorkerReady(chan Request)
}
複製代碼
此時建立goroutine修改以下:session
// 建立 goroutine
for i := 0; i < e.WorkerCount; i++ {
//任務是每一個 worker 一個 channel 仍是 全部 worker 共用一個 channel 由WorkerChan 來決定
createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
}
複製代碼
修改後的concurrent.go文件以下:併發
package engine
import (
"log"
)
// 併發引擎
type ConcurrendEngine struct {
Scheduler Scheduler
WorkerCount int
}
// 任務調度器
type Scheduler interface {
ReadyNotifier
Submit(request Request) // 提交任務
WorkerChan() chan Request
Run()
}
type ReadyNotifier interface {
WorkerReady(chan Request)
}
func (e *ConcurrendEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()
// 建立 goruntine
for i := 0; i < e.WorkerCount; i++ {
// 任務是每一個 worker 一個 channel 仍是 全部 worker 共用一個 channel 由WorkerChan 來決定
createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
}
// engine把請求任務提交給 Scheduler
for _, request := range seeds {
e.Scheduler.Submit(request)
}
itemCount := 0
for {
// 接受 Worker 的解析結果
result := <-out
for _, item := range result.Items {
log.Printf("Got item: #%d: %v\n", itemCount, item)
itemCount++
}
// 而後把 Worker 解析出的 Request 送給 Scheduler
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) {
go func() {
for {
ready.WorkerReady(in) // 告訴調度器任務空閒
request := <-in
result, err := worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
複製代碼
scheduler/simple.goapp
package scheduler
import "crawler/engine"
type SimpleScheduler struct {
workerChan chan engine.Request
}
func (s *SimpleScheduler) WorkerChan() chan engine.Request {
// 此時全部 worker 共用同一個 channel,直接返回便可
return s.workerChan
}
func (s *SimpleScheduler) WorkerReady(w chan engine.Request) {
}
func (s *SimpleScheduler) Run() {
// 建立出 workchannel
s.workerChan = make(chan engine.Request)
}
func (s *SimpleScheduler) Submit(request engine.Request) {
// send request down to worker chan
go func() {
s.workerChan <- request
}()
}
複製代碼
scheduler/queued.go
添加WorkerChan()
的實現便可
package scheduler
import "crawler/engine"
// 使用隊列來調度任務
type QueuedScheduler struct {
requestChan chan engine.Request
workerChan chan chan engine.Request
}
func (s *QueuedScheduler) WorkerChan() chan engine.Request {
// 對於隊列實現來說,每一個 worker 共用一個 channel
return make(chan engine.Request)
}
// 提交請求任務到 requestChan
func (s *QueuedScheduler) Submit(request engine.Request) {
s.requestChan <- request
}
// 告訴外界有一個 worker 能夠接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
s.workerChan <- w
}
func (s *QueuedScheduler) Run() {
s.workerChan = make(chan chan engine.Request)
s.requestChan = make(chan engine.Request)
go func() {
// 建立請求隊列和工做隊列
var requestQ []engine.Request
var workerQ []chan engine.Request
for {
var activeWorker chan engine.Request
var activeRequest engine.Request
if len(requestQ) > 0 && len(workerQ) > 0 {
activeWorker = workerQ[0]
activeRequest = requestQ[0]
}
select {
case r := <-s.requestChan: // 當 requestChan 收到數據
requestQ = append(requestQ, r)
case w := <-s.workerChan: // 當 workerChan 收到數據
workerQ = append(workerQ, w)
case activeWorker <- activeRequest: // 當請求隊列和認讀隊列都不爲空時,給任務隊列分配任務
requestQ = requestQ[1:]
workerQ = workerQ[1:]
}
}
}()
}
複製代碼
通過上述同構,在main函數中如需切換不一樣調度器,只須要相應的配置便可。
package main
import (
"crawler/engine"
"crawler/scheduler"
"crawler/zhenai/parser"
)
func main() {
e := engine.ConcurrendEngine{
//Scheduler: &scheduler.QueuedScheduler{}, // 隊列實現調度器
Scheduler: &scheduler.SimpleScheduler{}, // 簡單併發調度
WorkerCount: 50,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
}
複製代碼
爬取到的數據不能僅僅在控制檯打印出來,因此咱們還要給爬蟲添加數據存儲模塊。咱們本次選擇使用mongodb來存儲咱們的數據。
官方網址:labix.org/mgo
首先咱們要安裝mgo,打開終端,輸入下面代碼完成安裝
go get gopkg.in/mgo.v2
複製代碼
mgo基本操做都很簡單,有數據庫操做經驗均可以很快上手。
首先,爬蟲引擎獲取到數據要把數據發送給數據存儲模塊,而數據的傳遞用要用到channel
,因此打開concurrent.go
文件,在引擎添加ItemChan
屬性,以下所示:
爬取到數據須要把數據發送到數據存儲模塊,
package engine
// 併發引擎
type ConcurrendEngine struct {
Scheduler Scheduler // 任務調度器
WorkerCount int // 併發任務數量
ItemChan chan Item // 數據保存 channel
}
// ...
for {
// 接受 Worker 的解析結果
result := <-out
for _, item := range result.Items {
// 當抓取一組數據後,進行保存
go func(item2 Item) {
e.ItemChan <- item2
}(item)
}
// ...
}
// ...
複製代碼
在engine/types.go
中定義Item類型:
package engine
// 請求結構
type Request struct {
Url string // 請求地址
ParseFunc func([]byte) ParseResult } // 解析結果結構 type ParseResult struct {
Requests []Request // 解析出的請求
Items []Item // 解析出的內容
}
// 解析出的用戶數據格式
type Item struct {
Url string // 我的信息Url地址
Type string // table
Id string // Id
Payload interface{} // 詳細信息
}
func NilParseFun([]byte) ParseResult {
return ParseResult{}
}
複製代碼
在根目錄下建立persist文件夾,而後建立itemsaver.go文件
// persist/itemsaver.go
package persist
import (
"context"
"crawler/engine"
"errors"
"gopkg.in/mgo.v2"
"gopkg.in/olivere/elastic.v5"
"log"
)
func ItemSaver(index string) (chan engine.Item, error) {
// mongodb connect
session, err := mgo.Dial("localhost:27017")
if err != nil {
panic(err)
}
out := make(chan engine.Item)
go func() {
itemCount := 0
for {
// 接收到發送的 item
item := <-out
log.Printf("Item Saver: got item #%d: %v\n",
itemCount, item)
itemCount++
// Save data in mongodb
err := mongo_save(session, index, item)
if err != nil {
// if have err, ignore it
log.Printf("Item Saver: error, saving item %v: %v",
item, err)
}
}
}()
return out, nil
}
// 使用 MongoDB 保存數據
func mongo_save(session *mgo.Session, dbName string, item engine.Item) error {
if item.Type == "" {
return errors.New("must supply Type")
}
c := session.DB(dbName).C(item.Type) // 選擇要操做的數據庫與集合
err := c.Insert(item) // 插入數據
if err != nil {
log.Fatal(err)
}
return nil
}
複製代碼
咱們把一條數據存入mongodb,而後再取出來,比對讀出的數據和寫入的數據是否相同
// persist/itemsaver_test.gp
package persist
import (
"crawler/engine"
"crawler/model"
"encoding/json"
"fmt"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"log"
"testing"
)
func TestMongoSave(t *testing.T) {
// mongodb connect
session, err := mgo.Dial("localhost:27017")
if err != nil {
panic(err)
}
expected := engine.Item{
Url: "http://album.zhenai.com/u/1946858930",
Type: "zhenai",
Id: "1946858930",
Payload: model.Profile{
Name: "為你垨候",
Gender: "女士",
Age: 40,
Height: 163,
Weight: 54,
Income: "5-8千",
Marriage: "未婚",
Address: "佛山順德區",
},
}
// 保存數據
err = mongo_save(session, "crawler", expected)
if err != nil {
panic(err)
}
c := session.DB("crawler").C("zhenai")
var result engine.Item
// 查詢數據
err = c.Find(bson.M{"id": "1946858930"}).One(&result)
// result 爲 Json 類型
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload)
}
複製代碼
咱們要在parse/profile.go
文件中組裝好須要保存到數據庫的數據格式
// ...
result := engine.ParseResult{
Items: []engine.Item{
{
Url: url,
Type: "zhenai",
Id: extractString([]byte(url), idUrlRe),
Payload: profile,
},
},
}
// ...
複製代碼
package main
import (
"crawler/engine"
"crawler/persist"
"crawler/scheduler"
"crawler/zhenai/parser"
)
func main() {
itemChan, err := persist.ItemSaver()
if err != nil {
panic(err)
}
e := engine.ConcurrendEngine{
//Scheduler: &scheduler.QueuedScheduler{},
Scheduler: &scheduler.SimpleScheduler{},
WorkerCount: 100,
ItemChan: itemChan,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
}
複製代碼
運行項目,打開mongodb可視化工具,能夠看到爬取了54410條數據
咱們首先把兩種併發方式作一個同構,使代碼統一,直接在main函數中使用不一樣的配置就能夠切換調度器,簡單方便。而後使用Mgo驅動操做數據,添加到mongodb中。內容有點多,不少代碼沒有完整的展現出來,但願你們能夠下載項目源代碼,回滾到對應提交記錄查看,效果會更好。 別無所求,只求隨手給個star
下篇博客中咱們會再當前博客的基礎上添加數據展現功能
若是想獲取Google工程師深度講解go語言視頻資源的,能夠在評論區留下郵箱。
若是以爲文章還能夠,勞煩大人隨手點個贊。。。