這周姐姐入職了新公司,老闆想探探他的底,看了一眼他的簡歷,呦呵,精通kafka,這小姑娘有兩下子,既然這樣,那你寫一個消息隊列吧。由於要用go語言寫,這可給姐姐愁壞了。趕忙來求助我,我這麼堅貞不屈一人,在姐姐的軟磨硬泡下仍是答應他了,因此接下來我就手把手教姐姐怎麼寫一個消息隊列。下面咱們就來看一看我是怎麼寫的吧~~~。本代碼已上傳到個人github:git
有須要的小夥伴,可自行下載,順便給個小星星吧~~~github
姐姐真是把我愁壞了,本身寫的精通kafka
,居然不知道什麼是消息隊列,因而,一貫好脾氣的我開始給姐姐講一講什麼是消息隊列。golang
消息隊列,咱們通常稱它爲MQ(Message Queue)
,兩個單詞的結合,這兩個英文單詞想必你們都應該知道吧,其實最熟悉的仍是Queue
吧,即隊列。隊列是一種先進先出的數據結構,隊列的使用仍是比較廣泛的,可是已經有隊列了,怎麼還須要MQ
呢?面試
我:問你呢,姐姐,知道嗎?爲何還須要MQ
?姐姐:快點講,想捱打呀?編程
我:噗。。。 算我多嘴,哼~~~設計模式
欠欠的我開始了接下來的耐心講解......安全
舉一個簡單的例子,假設如今咱們要作一個系統,該登錄系統須要在用戶登錄成功後,發送封郵件到用戶郵箱進行提醒,需求仍是很簡單的,咱們先開看一看沒有MQ
,咱們該怎麼實現呢?畫一個時序圖來看一看:數據結構
看這個圖,郵件發送在請求登錄時進行,當密碼驗證成功後,就發送郵件,而後返回登錄成功。這樣是能夠的,可是他是有缺陷的。這讓咱們的登錄操做變得複雜了,每次請求登錄都須要進行郵件發送,若是這裏出現錯誤,整個登錄請求也出現了錯誤,致使登錄不成功;還有一個問題,原本咱們登錄請求調用接口僅僅須要100ms,由於中間要作一次發送郵件的等待,那麼調用一次登錄接口的時間就要增加,這就是問題所在,一封郵件他的優先級 不是很高的,用戶也不須要實時收到這封郵件,因此這時,就體現了消息隊列的重要性了,咱們用消息隊列進行改進一下。架構
這裏咱們將發送郵件請求放到Mq
中,這樣咱們就能提升用戶體驗的吞吐量,這個很重要,顧客就是上帝嘛,畢竟也沒有人喜歡用一個很慢很慢的app。併發
這裏只是舉了MQ
衆多應用中的其中一個,即異步應用,MQ
還在系統解藕、削峯/限流中有着重要應用,這兩個我就不具體講解了,原理都同樣,好好思考一下,大家都能懂得。
好啦,姐姐終於知道什麼是消息隊列了,可是如今仍是無法進行消息隊列開發的,由於還差一個知識點,即go語言中的channel
。這個很重要,咱們還須要靠這個來開發咱們的消息隊列呢。
因篇幅有限,這裏不詳細介紹channel
,只介紹基本使用方法。
channel
Goroutine 和 Channel 是 Go 語言併發編程的兩大基石。Goroutine 用於執行併發任務,Channel 用於 goroutine 之間的同步、通訊。Go提倡使用通訊的方法代替共享內存,當一個Goroutine須要和其餘Goroutine資源共享時,Channel就會在他們之間架起一座橋樑,並提供確保安全同步的機制。channel
本質上其實仍是一個隊列,遵循FIFO原則。具體規則以下:
建立通道須要用到關鍵字 make ,格式以下:
通道實例 := make(chan 數據類型)
Go語言中無緩衝的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。這種類型的通道要求發送 goroutine 和接收 goroutine 同時準備好,才能完成發送和接收操做。
無緩衝通道的定義方式以下:
通道實例 := make(chan 通道類型)
寫個例子來幫助你們理解一下吧:
package main import ( "sync" "time" ) func main() { c := make(chan string) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() c <- `Golang夢工廠` }() go func() { defer wg.Done() time.Sleep(time.Second * 1) println(`Message: `+ <-c) }() wg.Wait() }
Go語言中有緩衝的通道(buffered channel)是一種在被接收前能存儲一個或者多個值的通道。這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動做的條件也會不一樣。只有在通道中沒有要接收的值時,接收動做纔會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,發送動做纔會阻塞。
有緩衝通道的定義方式以下:
通道實例 := make(chan 通道類型, 緩衝大小)
來寫一個例子講解一下:
package main import ( "sync" "time" ) func main() { c := make(chan string, 2) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() c <- `Golang夢工廠` c <- `asong` }() go func() { defer wg.Done() time.Sleep(time.Second * 1) println(`公衆號: `+ <-c) println(`做者: `+ <-c) }() wg.Wait() }
好啦,通道的概念就介紹到這裏了,若是須要,下一篇我出一個channel
詳細講解的文章。
終於開始進入主題了,姐姐都聽的快要睡着了,我轟隆一嗓子,立馬精神,可是呢,asong也是捱了一頓小電炮,代價慘痛呀,嗚嗚嗚............
在開始編寫代碼編寫直接,我須要構思咱們的整個代碼架構,這纔是正確的編碼方式。咱們先來定義一個接口,把咱們須要實現的方法先列出來,後期對每個代碼進行實現就能夠了。所以能夠列出以下方法:
type Broker interface { publish(topic string, msg interface{}) error subscribe(topic string) (<-chan interface{}, error) unsubscribe(topic string, sub <-chan interface{}) error close() broadcast(msg interface{}, subscribers []chan interface{}) setConditions(capacity int) }
publish
:進行消息的推送,有兩個參數即topic
、msg
,分別是訂閱的主題、要傳遞的消息subscribe
:消息的訂閱,傳入訂閱的主題,便可完成訂閱,並返回對應的channel
通道用來接收數據unsubscribe
:取消訂閱,傳入訂閱的主題和對應的通道close
:這個的做用就是很明顯了,就是用來關閉消息隊列的broadCast
:這個屬於內部方法,做用是進行廣播,對推送的消息進行廣播,保證每個訂閱者均可以收到setConditions
:這裏是用來設置條件,條件就是消息隊列的容量,這樣咱們就能夠控制消息隊列的大小了細心的大家有沒有發現什麼問題,這些代碼我都定義的是內部方法,也就是包外不可用。爲何這麼作呢,由於這裏屬於代理要作的事情,咱們還須要在封裝一層,也就是客戶端能直接調用的方法,這樣才符合軟件架構。所以能夠寫出以下代碼:
package mq type Client struct { bro *BrokerImpl } func NewClient() *Client { return &Client{ bro: NewBroker(), } } func (c *Client)SetConditions(capacity int) { c.bro.setConditions(capacity) } func (c *Client)Publish(topic string, msg interface{}) error{ return c.bro.publish(topic,msg) } func (c *Client)Subscribe(topic string) (<-chan interface{}, error){ return c.bro.subscribe(topic) } func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error { return c.bro.unsubscribe(topic,sub) } func (c *Client)Close() { c.bro.close() } func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{ for val:= range sub{ if val != nil{ return val } } return nil }
上面只是準好了代碼結構,可是消息隊列實現的結構咱們尚未設計,如今咱們就來設計一下。
type BrokerImpl struct { exit chan bool capacity int topics map[string][]chan interface{} // key: topic value : queue sync.RWMutex // 同步鎖 }
exit
:也是一個通道,這個用來作關閉消息隊列用的capacity
:即用來設置消息隊列的容量topics
:這裏使用一個map結構,key便是topic
,其值則是一個切片,chan
類型,這裏這麼作的緣由是咱們一個topic能夠有多個訂閱者,因此一個訂閱者對應着一個通道sync.RWMutex
:讀寫鎖,這裏是爲了防止併發狀況下,數據的推送出現錯誤,因此採用加鎖的方式進行保證好啦,如今咱們已經準備的很充分啦,開始接下來方法填充之旅吧~~~
Publish
和broadcast
這裏兩個合在一塊兒講的緣由是braodcast
是屬於publish
裏的。這裏的思路很簡單,咱們只須要把傳入的數據進行廣播便可了,下面咱們來看代碼實現:
func (b *BrokerImpl) publish(topic string, pub interface{}) error { select { case <-b.exit: return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok { return nil } b.broadcast(pub, subscribers) return nil } func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) { count := len(subscribers) concurrency := 1 switch { case count > 1000: concurrency = 3 case count > 100: concurrency = 2 default: concurrency = 1 } pub := func(start int) { for j := start; j < count; j += concurrency { select { case subscribers[j] <- msg: case <-time.After(time.Millisecond * 5): case <-b.exit: return } } } for i := 0; i < concurrency; i++ { go pub(i) } }
publish
方法中沒有什麼好講的,這裏主要說一下broadcast
的實現:
這裏主要對數據進行廣播,因此數據推送出去就能夠了,不必一直等着他推送成功,因此這裏咱們咱們採用goroutine
。在推送的時候,當推送失敗時,咱們也不能一直等待呀,因此這裏咱們加了一個超時機制,超過5毫秒就中止推送,接着進行下面的推送。
可能大家會有疑惑,上面怎麼還有一個switch
選項呀,幹什麼用的呢?考慮這樣一個問題,當有大量的訂閱者時,,好比10000個,咱們一個for循環去作消息的推送,那推送一次就會耗費不少時間,而且不一樣的消費者之間也會產生延時,,因此採用這種方法進行分解能夠下降必定的時間。
subscribe
和 unsubScribe
咱們先來看代碼:
func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) { select { case <-b.exit: return nil, errors.New("broker closed") default: } ch := make(chan interface{}, b.capacity) b.Lock() b.topics[topic] = append(b.topics[topic], ch) b.Unlock() return ch, nil } func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error { select { case <-b.exit: return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok { return nil } // delete subscriber var newSubs []chan interface{} for _, subscriber := range subscribers { if subscriber == sub { continue } newSubs = append(newSubs, subscriber) } b.Lock() b.topics[topic] = newSubs b.Unlock() return nil }
這裏其實就很簡單了:
subscribe
:這裏的實現則是爲訂閱的主題建立一個channel
,而後將訂閱者加入到對應的topic
中就能夠了,而且返回一個接收channel
。unsubScribe
:這裏實現的思路就是將咱們剛纔添加的channel
刪除就能夠了。close
func (b *BrokerImpl) close() { select { case <-b.exit: return default: close(b.exit) b.Lock() b.topics = make(map[string][]chan interface{}) b.Unlock() } return }
這裏就是爲了關閉整個消息隊列,這句代碼b.topics = make(map[string][]chan interface{})
比較重要,這裏主要是爲了保證下一次使用該消息隊列不發生衝突。
setConditions
GetPayLoad
還差最後兩個方法,一個是設置咱們的消息隊列容量,另外一個是封裝一個方法來獲取咱們訂閱的消息:
func (b *BrokerImpl)setConditions(capacity int) { b.capacity = capacity } func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{ for val:= range sub{ if val != nil{ return val } } return nil }
好啦,代碼這麼快就被寫完了,接下來咱們進行測試一下吧。
正式測試以前,咱們仍是須要先進行一下單元測試,養成好的習慣,只有先自測了,纔能有底氣說個人代碼沒問題,要不直接跑程序,會出現不少bug
的。
這裏咱們測試方法以下:咱們向不一樣的topic
發送不一樣的信息,當訂閱者收到消息後,就行取消訂閱。
func TestClient(t *testing.T) { b := NewClient() b.SetConditions(100) var wg sync.WaitGroup for i := 0; i < 100; i++ { topic := fmt.Sprintf("Golang夢工廠%d", i) payload := fmt.Sprintf("asong%d", i) ch, err := b.Subscribe(topic) if err != nil { t.Fatal(err) } wg.Add(1) go func() { e := b.GetPayLoad(ch) if e != payload { t.Fatalf("%s expected %s but get %s", topic, payload, e) } if err := b.Unsubscribe(topic, ch); err != nil { t.Fatal(err) } wg.Done() }() if err := b.Publish(topic, payload); err != nil { t.Fatal(err) } } wg.Wait() }
測試經過,沒問題,接下來咱們在寫幾個方法測試一下
這裏分爲兩種方式測試
測試一:使用一個定時器,向一個主題定時推送消息.
// 一個topic 測試 func OnceTopic() { m := mq.NewClient() m.SetConditions(10) ch,err :=m.Subscribe(topic) if err != nil{ fmt.Println("subscribe failed") return } go OncePub(m) OnceSub(ch,m) defer m.Close() } // 定時推送 func OncePub(c *mq.Client) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <- t.C: err := c.Publish(topic,"asong真帥") if err != nil{ fmt.Println("pub message failed") } default: } } } // 接受訂閱消息 func OnceSub(m <-chan interface{},c *mq.Client) { for { val := c.GetPayLoad(m) fmt.Printf("get message is %sn",val) } }
測試二:使用一個定時器,定時向多個主題發送消息:
//多個topic測試 func ManyTopic() { m := mq.NewClient() defer m.Close() m.SetConditions(10) top := "" for i:=0;i<10;i++{ top = fmt.Sprintf("Golang夢工廠_%02d",i) go Sub(m,top) } ManyPub(m) } func ManyPub(c *mq.Client) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <- t.C: for i:= 0;i<10;i++{ //多個topic 推送不一樣的消息 top := fmt.Sprintf("Golang夢工廠_%02d",i) payload := fmt.Sprintf("asong真帥_%02d",i) err := c.Publish(top,payload) if err != nil{ fmt.Println("pub message failed") } } default: } } } func Sub(c *mq.Client,top string) { ch,err := c.Subscribe(top) if err != nil{ fmt.Printf("sub top:%s failedn",top) } for { val := c.GetPayLoad(ch) if val != nil{ fmt.Printf("%s get message is %sn",top,val) } } }
終於幫助姐姐解決了這個問題,姐姐開心死了,給我一頓親,啊不對,是一頓誇,誇的人家都很差意思了。
這一篇你學會了嗎?沒學會沒關係,趕快去把源代碼下載下來,好好通讀一下,很好理解的~~~。
其實這一篇是爲了接下來的kafka學習打基礎的,學好了這一篇,接下來學習的kafka就會容易不少啦~~~
github地址: https://github.com/asong2020/...若是能給一個小星星就行了~~~
結尾給你們發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,本身也收集了一本PDF,有須要的小夥能夠到自行下載。獲取方式:關注公衆號:[Golang夢工廠],後臺回覆:[微服務],便可獲取。
我翻譯了一份GIN中文文檔,會按期進行維護,有須要的小夥伴後臺回覆[gin]便可下載。
我是asong,一名普普統統的程序猿,讓我一塊兒慢慢變強吧。我本身建了一個golang
交流羣,有須要的小夥伴加我vx
,我拉你入羣。歡迎各位的關注,咱們下期見~~~
推薦往期文章: