原文記錄: http://www.codedata.cn/hackne...前端
我在反廣告、殺病毒、檢木馬等行業的不一樣軟件公司裏已經工做 15 年以上了,很是瞭解這類系統軟件因天天處理海量數據而致使的複雜性。golang
目前我做爲 smsjunk.com 的 CEO 和 KnowBe4 的主架構師,在這兩個網絡安全領域的公司裏工做。 web
有趣的是,在過去的 10 年裏,做爲軟件工程師,我接觸到的 web 後端代碼大可能是用 Ruby on Rails 開發的。請不要誤會,我很喜歡 Ruby on Railds 框架,並且我認爲它是一套使人稱讚的框架,不過期間一長,你就會習慣於使用 ruby 語言的方式思考和設計系統,會忘記利用多線程,並行化,快速執行和小的內存消耗,軟件架構本能夠如此高效且簡單。不少年來,我也是一個 C/C++,Delphi 以及 C# 的使用者,並且我開始認識到使用正確的工具能讓事情變得更簡單。docker
我對互聯網上沒完沒了的語言框架之間的論戰並不感冒。由於我相信解決方案的效能及代碼可維護性主要倚仗於你的架構能作到多簡單。json
在實現某個遙測分析系統時,咱們遇到一個實際問題,要處理來自數百萬終端的 POST 請求。其中的 web 請求處理過程會接收到一個 JSON 文檔,它包含一個由許多荷載數據組成的集合,咱們要把它寫到 Amazon S3 存儲中,以後咱們的 map-reduce 系統就能夠對這些數據進行處理。swift
通常咱們會利用以下的組件去建立一個有後臺工做層的架構,如:後端
而且創建兩個不一樣的服務集羣,一個用做 web 前端接收數據,另外一個執行具體的工做,這樣咱們就能動態調整後臺處理工做的能力了。安全
不過從項目伊始,咱們的團隊就認爲應該用 Go 語言來實現這項工做,由於在討論過程當中咱們發現這多是一個流量巨大的系統。我已經使用 Go 語言快兩年了,並且咱們已經在工做中用它開發了一些系統,只是還沒遇到過負載如此大的系統。ruby
咱們從定義一些 web 的 POST 請求載荷數據結構開始,還有一個用於上傳到 S3 存儲的方法。服務器
type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) }
起初咱們實現了一個很是簡單的 POST 處理接口,嘗試用一個簡單的 goroutine 並行工做處理過程:
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
在普通負載的狀況下,這段代碼對於大多數人已經夠用了,不過很快就被證實了不適合大流量的情形。當咱們把第一個版本的代碼部署到生產環境後,才發現實際狀況遠遠超出咱們的預期,系統流量比以前預計的大許多,咱們低估了數據負載量。
上面的處理方式從幾個方面來看都有問題。咱們沒法辦法控制建立的 go routines 的數量。並且咱們每分鐘收到一百萬次的 POST 請求,代碼必然很快就崩潰。
咱們須要尋找別的出路。從一開始,咱們就在討論怎樣保證請求處理時間較短,而後在後臺進行工做處理。固然,在 Ruby on Rails 裏必須這樣作,不然你會阻塞掉全部的 web 處理進程,不管你是否使用了 puma,unicorn,passenger(咱們這裏就不討論 JRuby 了)。而後咱們可能會使用常見的解決方案,好比 Resque,Sidkiq,SQS,等等。有許多方法能夠完成這個任務。
因此第二次迭代採用了緩衝通道( buffered channel ),咱們能夠將一些工做先放入隊列,再將它們上傳至 S3,因爲咱們可以控制隊列的大小,並且有充足的內存可用,因此咱們覺得將任務緩衝到 channel 隊列中就能夠了。
var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... }
而後將任務從隊列中取出再進行處理,咱們使用了相似下面的代碼:
func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- 仍然很差使! } } }
老實說,我都不知道當時咱們在想些什麼。這必定是喝紅牛熬夜致使的結果。這個方案沒給咱們帶來任何好處,咱們只是將一個有問題的併發過程替換爲了一個緩衝隊列,它只是將問題推後了而已。咱們的同步處理過程每次只將一份載荷數據上傳到 S3,因爲接受到請求的速率遠大於單例程上傳到 S3 的能力,咱們的緩衝隊列很快就滿了,致使請求處理過程阻塞,沒法將更多的數據送入隊列。
咱們傻乎乎地忽略了問題,最終開始了系統的死亡倒計時。在部署了這個問題版本以後幾分鐘裏,系統的延遲以固定的速率不斷增長。
咱們決定使用 Go 通道的一種經常使用模式構建一個兩層的通道系統,一個通道用做任務隊列,另外一個來控制處理任務時的併發量。
這個辦法是想以一種可持續的速率、併發地上傳數據至 S3 存儲,這樣既不會把機器跑掛掉也不會產生 S3 的鏈接錯誤。所以咱們選擇使用了一種 Job/Worker 模式。若是你熟悉 Java,C# 等語言,能夠認爲這是使用通道以 Go 語言的方式實現了一個工做線程池。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() }
咱們修改了 web 請求處理過程,使用數據載荷建立了一個 Job
實例,而後將其送入 JobQueue
通道中供工做例程使用。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) }
在 web 服務初始化的過程當中,咱們建立了一個 Dispatcher
實例,調用 Run()
方法建立了工做例程池,而且經過監聽 JobQueue
獲取工做任務。
dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
下面的代碼是任務分派器的具體實現:
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } }
注意咱們提供了一個最大數量的參數,用於控制工做池中初始的例程數量。由於這個項目使用了 Amazon Elasticbeanstalk 以及 docker 中的 Go 環境,因此咱們努力遵循 12-factor 的方法,從環境變量中讀取配置值,便於在生產環境中進行系統配置。經過這種方式,咱們能夠控制工做例程的數量和工做隊列的長度,無需對集羣進行從新部署,咱們就能快速調整參數值。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") )
在部署這份代碼後,咱們發現系統的延遲馬上大幅降低,而咱們處理請求的能力獲得了巨大的提高。
在咱們的 Elastic Load Balancers 所有預熱完成幾分鐘後,能夠看到咱們的 ElasticBeanstalk 應用每分鐘能夠處理近一百萬的請求,經常會在流量早高峯的時候突破每分鐘一百萬。
咱們剛把新代碼部署上去,服務器數量就從 100 臺服務器大幅降低到大約 20 臺服務器。
在咱們調整集羣配置和自動縮放配置後,咱們能將服務器的使用數量下降到四個 EC2 c4.Large 實例,再將 Elastic Auto-Scaling 設置爲 CPU 使用率持續五分鐘超 90% 的時候,增長一個實例。
在個人認知中,「簡單化」纔是常勝祕訣。咱們本可能設計一個更復雜的系統,擁有許多隊列和後臺工做例程,部署也更復雜。可是咱們最終利用了 Elasticbeanstalk 的自動縮放能力和 Go 語言爲咱們帶來的高效簡單的併發解決方案。
並非天天都能發生這樣的事情:一個只有四臺機器集羣處理着每分鐘一百萬的 POST 請求,把數據寫入 Amazon S3 存儲中,並且這些機器可能比我如今的 MacBook Pro 性能還差。
每件工做總會有更合適的工具。當你的 Ruby on Rails 系統須要強大的請求處理能力時,不妨嘗試一下 ruby 生態圈外那些更加簡單有效的解決方案。