原文:medium.com/smsjunk/han…c++
我在不一樣公司從事反爬蟲、反病毒、反惡意程序已經有15年了,我知道,因爲天天須要處理和應對的大量數據,這些系統最終會所以變得十分複雜。golang
目前我是smsjunk.com的CEO以及KnowBe4的首席架構師,兩家公司都是活躍與網絡安全行業。web
有趣的是在過去10年做爲一名軟件工程師,幾乎全部我參與的後端開發項目裏面都是用Ruby on Rails來完成的。但是你不要誤會,我熱愛Ruby on Rails而且我認爲它是一個很是出色的開發環境,但當你用ruby的思路在設計和開發系統一段時間之後,你每每會忘記,其實你還能夠利用多線程,並行化,高速執行以及更小的內存開銷來開發系統。我是一名c/c++,Delphi以及c#的開發人員已經不少年了,而後我開始慢慢意識到,使用合適的工具讓系統變得更加簡單明瞭纔是一件正確的事情。docker
編程界對於編程語言以及框架的爭論從未停歇,而我並不想參與到其中去。我相信效率高低,生產力大小以及代碼的可維護性很大一部分取決於你所設計的架構是否足夠簡單。編程
當咱們開發一個匿名遙測以及數據分析系統的時候,其中一個需求是可以處理和應付百萬數量級的POST請求,網絡請求處理器會接收一個POST過來JSON,這個JSON裏面會包含許多須要寫入到Amazon S3的數據集合,以便咱們的map-reduce系統能夠在後續來處理這些數據。json
通常狀況下咱們會考慮構建一個worker分層的結構,而且利用一些中間件,例如:c#
而後設立兩個不一樣的集羣,一個是給web客戶端,另外一個是給worker,而後咱們能夠將worker擴容到咱們處理業務時所須要的數量。後端
但在最開始的時候,咱們的團隊就意識到能夠用Go來實現全部這些,由於在討論期間咱們認爲這將會是一個很是高訪問量的系統。我利用Go來開發也已經有兩年了,用它來開發過一些系統,可是負載規模遠沒有這次的需求這麼大。安全
咱們先定義一些struct來規定咱們POST接收的請求體,以及定義一個上傳到S3 bucket的方法UploadToS3
ruby
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
複製代碼
最開始的時候咱們很是天真地實現一個POST的鉤子方法以下,只是簡單地將每一個請求體的上傳動做放到Go rutinues中讓他們並行執行:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
複製代碼
在中等規模的負載狀況下,這種方法對大部分人都是沒有問題的,但在應對更大規模的請求量時候,咱們很快就招架不住了。當咱們把這個版本的代碼部署到生產環境之後,咱們期待能有大量的請求進來但實際還不能達到百萬級別的數量級。咱們徹底低估了這個系統要處理的流量數。
但無論怎麼說上面的方法都是欠妥的。由於它沒有任何方法讓咱們去控制Go runtinues啓動的數量。因此當咱們的系統在面對每分鐘百萬級POST請求的時候很快就垮掉了。
咱們須要找到另外的方法。在一開始咱們就在討論如何讓咱們的請求處理程序的生命週期儘量地縮短以及上傳到S3的操做能在後臺或者異步運行。固然,在Ruby on Rails裏面你必須這麼作,不然你將會阻塞到全部其餘的網絡請求處理程序。不管您使用的是美洲獅,獨角獸仍是過路人(請不要參與JRuby討論)。而後咱們想到使用消息隊列這種比較常見的方法來處理來達到咱們的目的,例如Resque, Sidekiq, SQS等等,還有數不清的工具由於實在有太多方法來實現這個功能。
因此在第二次迭代的時候,咱們須要建立一個緩衝隊列,咱們會將任務放入隊列裏面而後再一個個地上傳到S3上,但因爲咱們但願達到可以控制這個隊列的最大容量的目的,而且咱們有足夠的RAM來容許咱們將請求體儲存到內存當中,因此咱們認爲直接使用了Go提供的channel,而後將咱們的請求直接入隊到channel中處理就能夠了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
複製代碼
咱們會從channel中獲取任務而且執行他們的上傳操做
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
複製代碼
但說句老實話,我並不知道這是在幹嗎。確定是由於那時已經太晚還有咱們已經喝了太多的紅牛。😌😌
這個改動並無讓咱們的困境獲得任何改善,咱們將併發任務放到了隊列中執行僅僅是看上去好像解決了問題。可是咱們的異步程序一次只會上傳一個請求體到S3上面,可是咱們的請求數此時遠遠大於咱們上傳到S3的數量,可想而知咱們的緩衝隊列很快就到達了他的極限爆滿了,而後它阻擋了其餘網絡請求的入隊操做。
至關於咱們僅僅迴避了問題,而且讓咱們的系統的崩潰時間進入了倒數。咱們這個缺陷的版本發佈之後,整個系統的延遲率在持續性地每分鐘在上漲。
咱們決定採用協同的方式來改進咱們的Go channel,經過創建一個帶有2個的channel處理系統,一個用於將請求體入隊,另外一個是負責控制worker
在JobQueue
中併發運行時的數量。
這個想法的核心是以一個相對穩定的頻率去並行上傳數據到S3,這樣的話既不會把咱們的服務器弄垮,也不會由於鏈接過多形成不少S3的鏈接錯誤。因此咱們開始着手於Job/Worker模式。這個對於熟悉Java,c#開發 來講並不陌生,你能夠理解爲這是Go利用channel來實現worker線程池的方法。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
複製代碼
接下來修改咱們網絡請求的鉤子函數,負責建立一個Job的結構體的實例而後將其放入JobQueue channel中等待worker來獲取執行。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
複製代碼
在咱們網絡服務初始化的時候建立一個Dispather
而且調用Run()
建立一個裝有必定數量worker的線程池,用來接收和處理來自JobQueue
的Job
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
複製代碼
下面是咱們Dispather
的實現
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
複製代碼
注意咱們限制了worker在線程池的最大數量。咱們的應用運行在一個docker化的Go環境中,部署在Amazon的Elasticbeanstalk上,而且儘可能遵循12要素原則來配置咱們的生產環境,在環境變量中獲取對應的參數值,這樣咱們就能夠控制worker的數量以及JobQueue
的最大容量經過直接修改對應的值而不須要從新去部署咱們的應用。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
複製代碼
當咱們將這個版本發佈到生產環境之後咱們的延遲率立刻有明顯的降低,咱們處理請求的能力有一個質的飛躍。
在一分鐘之後等咱們的負載均衡器徹底啓動起來之後,能夠看到ElasticBeanstalk上服務器接收的請求數將近一百萬次每分鐘。一般咱們早上都有幾個小時的流量高峯期,那時甚至會超過百萬請求次數每分鐘。並且當咱們發佈完新代碼之後服務器的數量就從100臺降低到並穩定在了20臺。
當給集羣加上合適的配置以及設置自動伸縮之後,甚至能夠降到僅僅用4臺c4.Large的EC2實例來處理平常業務。而且集羣會自動增長新的實例當CPU使用率持續5分鐘達到90%時。
簡潔化設計永遠是我所追求的東西。咱們能夠設計一個複雜的系統用不少的隊列,後臺運行worker,複雜的部署等等,但取而代之咱們決定利用Elasticbeanstalk強大的自動伸縮功能以及Go所提供開箱即用的併發特性。
總會有一個工具適合你的工做,在有的時候當你Ruby on Rails系統須要一個強大的網絡請求處理功能的時候,能夠試着考慮一下除了ruby生態圈之外的更增強大和簡潔的替代方案。
若是你能關注一下個人Twittwer而且分享給身邊的朋友的話,我會很是感謝的!個人Twitter是 twitter.com/mcastilho