在上一篇文章《Golang實現簡單爬蟲框架(3)——簡單併發版》中咱們實現了一個最簡單併發爬蟲,調度器爲每個Request
建立一個goroutine
,每一個goroutine
往Worker
隊列中分發任務,發完就結束。全部的Worker
都在搶一個channel
中的任務。可是這樣作仍是有些許不足之處,好比控制力弱:全部的Worker在搶同一個channel
中的任務,咱們沒有辦法控制給哪個worker任務。html
其實咱們能夠本身作一個任務分發的機制,咱們來決定分發給哪個Workergit
注意:本次併發是在上一篇文章簡單併發實現的基礎上修改,因此沒有貼出所有代碼,只是貼出部分修改部分,要查看完整項目代碼,能夠查看上篇文章,或者從github下載項目源代碼查看github
在上一篇文章實現簡單併發的基礎上,咱們修改下Scheduler
的任務分發機制golang
Scheduler
接收到一個Request
後,不能直接發給Worker
,也不能爲每一個Request
建立一個goroutine
,因此這裏使用一個Request隊列Worker
實現一個更多的控制,能夠決定把任務分發給哪個Worker
,因此這裏咱們還須要一個Worker
隊列Request
和Worker
,咱們就能夠把選擇的Request發送給選擇的Worker
在scheduler目錄下建立queued.go文件架構
package scheduler
import "crawler/engine"
// 使用隊列來調度任務
type QueuedScheduler struct {
requestChan chan engine.Request // Request channel
// Worker channel, 其中每個Worker是一個 chan engine.Request 類型
workerChan chan chan engine.Request
}
// 提交請求任務到 requestChannel
func (s *QueuedScheduler) Submit(request engine.Request) {
s.requestChan <- request
}
func (s *QueuedScheduler) ConfigMasterWorkerChan(chan engine.Request) {
panic("implement me")
}
// 告訴外界有一個 worker 能夠接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
s.workerChan <- w
}
func (s *QueuedScheduler) Run() {
// 生成channel
s.workerChan = make(chan chan engine.Request)
s.requestChan = make(chan engine.Request)
go func() {
// 建立請求隊列和工做隊列
var requestQ []engine.Request
var workerQ []chan engine.Request
for {
var activeWorker chan engine.Request
var activeRequest engine.Request
// 當requestQ和workerQ同時有數據時
if len(requestQ) > 0 && len(workerQ) > 0 {
activeWorker = workerQ[0]
activeRequest = requestQ[0]
}
select {
case r := <-s.requestChan: // 當 requestChan 收到數據
requestQ = append(requestQ, r)
case w := <-s.workerChan: // 當 workerChan 收到數據
workerQ = append(workerQ, w)
case activeWorker <- activeRequest: // 當請求隊列和認讀隊列都不爲空時,給任務隊列分配任務
requestQ = requestQ[1:]
workerQ = workerQ[1:]
}
}
}()
}
複製代碼
修改後的concurrent.go文件以下併發
package engine
import (
"log"
)
// 併發引擎
type ConcurrendEngine struct {
Scheduler Scheduler
WorkerCount int
}
// 任務調度器
type Scheduler interface {
Submit(request Request) // 提交任務
ConfigMasterWorkerChan(chan Request)
WorkerReady(w chan Request)
Run()
}
func (e *ConcurrendEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()
// 建立 goruntine
for i := 0; i < e.WorkerCount; i++ {
createWorker(out, e.Scheduler)
}
// engine把請求任務提交給 Scheduler
for _, request := range seeds {
e.Scheduler.Submit(request)
}
itemCount := 0
for {
// 接受 Worker 的解析結果
result := <-out
for _, item := range result.Items {
log.Printf("Got item: #%d: %v\n", itemCount, item)
itemCount++
}
// 而後把 Worker 解析出的 Request 送給 Scheduler
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWorker(out chan ParseResult, s Scheduler) {
// 爲每個Worker建立一個channel
in := make(chan Request)
go func() {
for {
s.WorkerReady(in) // 告訴調度器任務空閒
request := <-in
result, err := worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
複製代碼
package main
import (
"crawler/engine"
"crawler/scheduler"
"crawler/zhenai/parser"
)
func main() {
e := engine.ConcurrendEngine{
Scheduler: &scheduler.QueuedScheduler{},// 這裏調用併發調度器
WorkerCount: 50,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
}
複製代碼
運行結果以下:app
在這篇文章中咱們使用隊列實現對併發任務的調度,從而實現了對Worker的控制。咱們如今併發有兩種實現方式,可是他們的調度方法是不一樣的,爲了代碼的統一,因此在下一篇文章中的內容有:框架
若是想獲取Google工程師深度講解go語言視頻資源的,能夠在評論區留下郵箱。函數
項目的源代碼已經託管到Github上,對於各個版本都有記錄,歡迎你們查看,記得給個star,在此先謝謝你們post
若是以爲博客不錯,勞煩大人給個贊,