原文連接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/golang
直入本文要描述的問題:網站流量上來了,高併發負載是不可避免滴問題了,當服務端須要處理大量耗時的任務時,咱們通常都會考慮將耗時任務異步處理。那麼若是使用Go如何實現?redis
傳統上,咱們會考慮使用如下方法建立工做者層架構:json
golang的異步處理之攜程:go func()能夠帶來了很大的方便,雖然協程相對於線程佔用的系統資源更少,但這並不表明咱們能夠無休止的建立協程。緩存
不停建立協程也有壓垮系統的風險。然而絕大多數的時候,咱們不能簡單粗暴的建立協程來處理異步任務,緣由是不可控。下面咱們引用原做者的demo,一個執行耗時任務的handler。安全
代碼咱們只用看大體的實現流程原理,實現細節暫且不論。架構
package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "time" ) type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
對於適量的負載,這個方案應該沒有問題。可是負載增長之後這個方法就不能很好地工做。當咱們把這個版本部署到生產環境中後,若是咱們遇到了比預期大一個數量級的請求量。併發
那麼這個方法就有些不盡如人意了。它沒法控制建立goroutine的數量。由於咱們每分鐘收到了一百萬個POST請求,上面的代碼很快就奔潰了。app
這就是咱們遇到的第一個問題,簡單粗暴起協程處理耗時任務致使的系統不可控性。咱們天然而然就會想,怎麼能讓系統更可控呢?異步
建立帶緩衝的channel。這樣咱們能夠把工做任務放到隊列裏而後再上傳到S3。由於能夠控制隊列的長度而且有充足的內存,我以爲把工做任務緩存在channel隊列裏應該沒有問題。函數
因此一個很天然的思路那就是:創建任務隊列。golang提供了線程安全的任務隊列實現方式:帶緩衝的channal。可是這樣只是延後了請求的爆發。
做者意識到,要解決這一問題,必須控制協程的數量。如何控制協程的數量?Job/Worker模式!這裏我將做者的代碼修改了一下,單文件可執行,以記錄並理解這一模式。
package main import ( "fmt" "reflect" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待執行的工做 type Job struct { Payload Payload } //任務channal var JobQueue chan Job //執行任務的工做者單元 type Worker struct { WorkerPool chan chan Job //工做者池--每一個元素是一個工做者的私有任務channal JobChannel chan Job //每一個工做者單元包含一個任務管道 用於獲取任務 quit chan bool //退出信號 no int //編號 } //建立一個新工做者單元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("建立一個新工做者單元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //循環 監放任務和結束信號 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任務 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信號 return } } }() } // 中止信號 func (w Worker) Stop() { go func() { w.quit <- true }() } //調度中心 type Dispatcher struct { //工做者池 WorkerPool chan chan Job //工做者數量 MaxWorkers int } //建立調度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //工做者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空閒worker (任務多的時候會阻塞這裏) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 20; i++ { // 新建一個任務 payLoad := Payload{Num: 1} work := Job{Payload: payLoad} // 任務放入任務隊列channal JobQueue <- work fmt.Println("JobQueue <- work") time.Sleep(1 * time.Second) } } /* 一個任務的執行過程以下 JobQueue <- work 新任務入隊 job := <-JobQueue: 調度中心收到任務 jobChannel := <-d.WorkerPool 從工做者池取到一個工做者 jobChannel <- job 任務給到工做者 job := <-w.JobChannel 工做者取出任務 {{1}} 執行任務 w.WorkerPool <- w.JobChannel 工做者在放回工做者池 */
這樣,咱們已經可以主動的控制worker的數量。這時候,我問哈你們,咱們徹底解決問題了麼?若是有大量的任務同時涌入,會發生什麼樣的結果。程序會阻塞等待可用的worker
jobChannel := <-d.WorkerPool
下面是咱們的dispatcher實現代碼:
//調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空閒worker (任務多的時候會阻塞這裏) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } }
這裏咱們提供了建立worker的最大數目做爲參數,並把這些worker加入到worker池裏。不要忘記,這個調度方法也是在不斷的建立協程等待空閒的worker。咱們再改一下代碼以下:
package main import ( "fmt" "reflect" "runtime" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待執行的工做 type Job struct { Payload Payload } //任務channal var JobQueue chan Job //執行任務的工做者單元 type Worker struct { WorkerPool chan chan Job //工做者池--每一個元素是一個工做者的私有任務channal JobChannel chan Job //每一個工做者單元包含一個任務管道 用於獲取任務 quit chan bool //退出信號 no int //編號 } //建立一個新工做者單元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("建立一個新工做者單元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //循環 監放任務和結束信號 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任務 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信號 return } } }() } // 中止信號 func (w Worker) Stop() { go func() { w.quit <- true }() } //調度中心 type Dispatcher struct { //工做者池 WorkerPool chan chan Job //工做者數量 MaxWorkers int } //建立調度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //工做者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //調度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { fmt.Println("等待空閒worker (任務多的時候會阻塞這裏") //等待空閒worker (任務多的時候會阻塞這裏) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 將任務放到上述woker的私有任務channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 100; i++ { // 新建一個任務 payLoad := Payload{Num: i} work := Job{Payload: payLoad} // 任務放入任務隊列channal JobQueue <- work fmt.Println("JobQueue <- work", i) fmt.Println("當前協程數:", runtime.NumGoroutine()) time.Sleep(100 * time.Millisecond) } }
執行結果以下:
這裏咱們發現,咱們依然沒能控制住協程數量,咱們只是控制住了worker的數量。這種狀況下,若是worker數量設置的合理且異步任務耗時不是特別長的狀況下通常沒有問題。可是出於安全的考慮,我要把這個阻塞的協程數作一個控制,若是達到限制時候拒絕服務以保護系統該怎麼處理?
咱們能夠控制併發執行(包括等待執行)的任務數。咱們加入任務使用以下判斷。用一個帶緩衝的Channel控制併發執行的任務數。
當任務異步處理完成的時候執行<- DispatchNumControl
釋放控制便可。用這種方法,
咱們能夠根據壓測結果設置合適的併發數從而保證系統可以儘量的發揮本身的能力,同時保證不會由於任務量太大而崩潰(由於達到極限的時候,系統會告訴調用方:牛仔我很忙)。
好比定義一個limit函數讀取是否存在發送的任務隊列:
//用於控制併發處理的協程數 var DispatchNumControl = make(chan bool, 10000) func Limit(work Job) bool { select { case <-time.After(time.Millisecond * 100): fmt.println("牛仔我很忙") return false case DispatchNumControl <- true: // 任務放入任務隊列channal jobChannel <- work return true } }
咱們本能夠經過大量的隊列,後臺workers,複雜的調度來設計一套複雜的系統,協程是個好的設計,但任何東西都不能過分使用。
咱們作系統設計的時候,必定也要時刻想着控制:要對本身設計的系統有着足夠的控制力。另外綜合上面的實現。爲何 dispatch 這裏要用 協程 呢?阻塞徹底沒問題? 歡迎廣大博友拍磚留言。。。。