手把手教姐姐寫消息隊列(golang-channel實現)

前言

這周姐姐入職了新公司,老闆想探探他的底,看了一眼他的簡歷,呦呵,精通kafka,這小姑娘有兩下子,既然這樣,那你寫一個消息隊列吧。由於要用go語言寫,這可給姐姐愁壞了。趕忙來求助我,我這麼堅貞不屈一人,在姐姐的軟磨硬泡下仍是答應他了,因此接下來我就手把手教姐姐怎麼寫一個消息隊列。下面咱們就來看一看我是怎麼寫的吧~~~。

本代碼已上傳到個人github:git

有須要的小夥伴,可自行下載,順便給個小星星吧~~~github

什麼是消息隊列

姐姐真是把我愁壞了,本身寫的精通kafka,居然不知道什麼是消息隊列,因而,一貫好脾氣的我開始給姐姐講一講什麼是消息隊列。golang

消息隊列,咱們通常稱它爲MQ(Message Queue),兩個單詞的結合,這兩個英文單詞想必你們都應該知道吧,其實最熟悉的仍是Queue吧,即隊列。隊列是一種先進先出的數據結構,隊列的使用仍是比較廣泛的,可是已經有隊列了,怎麼還須要MQ呢?面試

我:問你呢,姐姐,知道嗎?爲何還須要 MQ

姐姐:快點講,想捱打呀?編程

我:噗。。。 算我多嘴,哼~~~設計模式

欠欠的我開始了接下來的耐心講解......安全

舉一個簡單的例子,假設如今咱們要作一個系統,該登錄系統須要在用戶登錄成功後,發送封郵件到用戶郵箱進行提醒,需求仍是很簡單的,咱們先開看一看沒有MQ,咱們該怎麼實現呢?畫一個時序圖來看一看:數據結構

image

看這個圖,郵件發送在請求登錄時進行,當密碼驗證成功後,就發送郵件,而後返回登錄成功。這樣是能夠的,可是他是有缺陷的。這讓咱們的登錄操做變得複雜了,每次請求登錄都須要進行郵件發送,若是這裏出現錯誤,整個登錄請求也出現了錯誤,致使登錄不成功;還有一個問題,原本咱們登錄請求調用接口僅僅須要100ms,由於中間要作一次發送郵件的等待,那麼調用一次登錄接口的時間就要增加,這就是問題所在,一封郵件他的優先級 不是很高的,用戶也不須要實時收到這封郵件,因此這時,就體現了消息隊列的重要性了,咱們用消息隊列進行改進一下。架構

image

這裏咱們將發送郵件請求放到Mq中,這樣咱們就能提升用戶體驗的吞吐量,這個很重要,顧客就是上帝嘛,畢竟也沒有人喜歡用一個很慢很慢的app。併發

這裏只是舉了MQ衆多應用中的其中一個,即異步應用,MQ還在系統解藕、削峯/限流中有着重要應用,這兩個我就不具體講解了,原理都同樣,好好思考一下,大家都能懂得。

channel

好啦,姐姐終於知道什麼是消息隊列了,可是如今仍是無法進行消息隊列開發的,由於還差一個知識點,即go語言中的channel。這個很重要,咱們還須要靠這個來開發咱們的消息隊列呢。

因篇幅有限,這裏不詳細介紹channel,只介紹基本使用方法。

什麼是channel

Goroutine 和 Channel 是 Go 語言併發編程的兩大基石。Goroutine 用於執行併發任務,Channel 用於 goroutine 之間的同步、通訊。Go提倡使用通訊的方法代替共享內存,當一個Goroutine須要和其餘Goroutine資源共享時,Channel就會在他們之間架起一座橋樑,並提供確保安全同步的機制。channel本質上其實仍是一個隊列,遵循FIFO原則。具體規則以下:

  • 先從 Channel 讀取數據的 Goroutine 會先接收到數據;
  • 先向 Channel 發送數據的 Goroutine 會獲得先發送數據的權利;

建立通道

建立通道須要用到關鍵字 make ,格式以下:

通道實例 := make(chan 數據類型)
  • 數據類型:通道內傳輸的元素類型。
  • 通道實例:經過make建立的通道句柄。

無緩衝通道的使用

Go語言中無緩衝的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。這種類型的通道要求發送 goroutine 和接收 goroutine 同時準備好,才能完成發送和接收操做。

無緩衝通道的定義方式以下:

通道實例 := make(chan 通道類型)
  • 通道類型:和無緩衝通道用法一致,影響通道發送和接收的數據類型。
  • 緩衝大小:0
  • 通道實例:被建立出的通道實例。

寫個例子來幫助你們理解一下吧:

package main
import (
    "sync"
    "time"
)
func main() {
    c := make(chan string)
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        c <- `Golang夢工廠`
    }()
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        println(`Message: `+ <-c)
    }()
    wg.Wait()
}

帶緩衝的通道的使用

Go語言中有緩衝的通道(buffered channel)是一種在被接收前能存儲一個或者多個值的通道。這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動做的條件也會不一樣。只有在通道中沒有要接收的值時,接收動做纔會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,發送動做纔會阻塞。

有緩衝通道的定義方式以下:

通道實例 := make(chan 通道類型, 緩衝大小)
  • 通道類型:和無緩衝通道用法一致,影響通道發送和接收的數據類型。
  • 緩衝大小:決定通道最多能夠保存的元素數量。
  • 通道實例:被建立出的通道實例。

來寫一個例子講解一下:

package main
import (
    "sync"
    "time"
)
func main() {
    c := make(chan string, 2)
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        c <- `Golang夢工廠`
        c <- `asong`
    }()
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        println(`公衆號: `+ <-c)
        println(`做者: `+ <-c)
    }()
    wg.Wait()
}

好啦,通道的概念就介紹到這裏了,若是須要,下一篇我出一個channel詳細講解的文章。

消息隊列編碼實現

準備篇

終於開始進入主題了,姐姐都聽的快要睡着了,我轟隆一嗓子,立馬精神,可是呢,asong也是捱了一頓小電炮,代價慘痛呀,嗚嗚嗚............

在開始編寫代碼編寫直接,我須要構思咱們的整個代碼架構,這纔是正確的編碼方式。咱們先來定義一個接口,把咱們須要實現的方法先列出來,後期對每個代碼進行實現就能夠了。所以能夠列出以下方法:

type Broker interface {
 publish(topic string, msg interface{}) error
 subscribe(topic string) (<-chan interface{}, error)
 unsubscribe(topic string, sub <-chan interface{}) error
 close()
 broadcast(msg interface{}, subscribers []chan interface{})
 setConditions(capacity int)
}
  • publish:進行消息的推送,有兩個參數即topicmsg,分別是訂閱的主題、要傳遞的消息
  • subscribe:消息的訂閱,傳入訂閱的主題,便可完成訂閱,並返回對應的channel通道用來接收數據
  • unsubscribe:取消訂閱,傳入訂閱的主題和對應的通道
  • close:這個的做用就是很明顯了,就是用來關閉消息隊列的
  • broadCast:這個屬於內部方法,做用是進行廣播,對推送的消息進行廣播,保證每個訂閱者均可以收到
  • setConditions:這裏是用來設置條件,條件就是消息隊列的容量,這樣咱們就能夠控制消息隊列的大小了

細心的大家有沒有發現什麼問題,這些代碼我都定義的是內部方法,也就是包外不可用。爲何這麼作呢,由於這裏屬於代理要作的事情,咱們還須要在封裝一層,也就是客戶端能直接調用的方法,這樣才符合軟件架構。所以能夠寫出以下代碼:

package mq
type Client struct {
 bro *BrokerImpl
}
func NewClient() *Client {
 return &Client{
  bro: NewBroker(),
 }
}
func (c *Client)SetConditions(capacity int)  {
 c.bro.setConditions(capacity)
}
func (c *Client)Publish(topic string, msg interface{}) error{
 return c.bro.publish(topic,msg)
}
func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
 return c.bro.subscribe(topic)
}
func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
 return c.bro.unsubscribe(topic,sub)
}
func (c *Client)Close()  {
  c.bro.close()
}
func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

上面只是準好了代碼結構,可是消息隊列實現的結構咱們尚未設計,如今咱們就來設計一下。

type BrokerImpl struct {
 exit chan bool
 capacity int
 topics map[string][]chan interface{} // key: topic  value : queue
 sync.RWMutex // 同步鎖
}
  • exit:也是一個通道,這個用來作關閉消息隊列用的
  • capacity:即用來設置消息隊列的容量
  • topics:這裏使用一個map結構,key便是topic,其值則是一個切片,chan類型,這裏這麼作的緣由是咱們一個topic能夠有多個訂閱者,因此一個訂閱者對應着一個通道
  • sync.RWMutex:讀寫鎖,這裏是爲了防止併發狀況下,數據的推送出現錯誤,因此採用加鎖的方式進行保證

好啦,如今咱們已經準備的很充分啦,開始接下來方法填充之旅吧~~~

Publishbroadcast

這裏兩個合在一塊兒講的緣由是braodcast是屬於publish裏的。這裏的思路很簡單,咱們只須要把傳入的數據進行廣播便可了,下面咱們來看代碼實現:

func (b *BrokerImpl) publish(topic string, pub interface{}) error {
 select {
 case <-b.exit:
  return errors.New("broker closed")
 default:
 }
 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()
 if !ok {
  return nil
 }
 b.broadcast(pub, subscribers)
 return nil
}
func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
 count := len(subscribers)
 concurrency := 1
 switch {
 case count > 1000:
  concurrency = 3
 case count > 100:
  concurrency = 2
 default:
  concurrency = 1
 }
 pub := func(start int) {
  for j := start; j < count; j += concurrency {
   select {
   case subscribers[j] <- msg:
   case <-time.After(time.Millisecond * 5):
   case <-b.exit:
    return
   }
  }
 }
 for i := 0; i < concurrency; i++ {
  go pub(i)
 }
}

publish方法中沒有什麼好講的,這裏主要說一下broadcast的實現:

這裏主要對數據進行廣播,因此數據推送出去就能夠了,不必一直等着他推送成功,因此這裏咱們咱們採用goroutine。在推送的時候,當推送失敗時,咱們也不能一直等待呀,因此這裏咱們加了一個超時機制,超過5毫秒就中止推送,接着進行下面的推送。

可能大家會有疑惑,上面怎麼還有一個switch選項呀,幹什麼用的呢?考慮這樣一個問題,當有大量的訂閱者時,,好比10000個,咱們一個for循環去作消息的推送,那推送一次就會耗費不少時間,而且不一樣的消費者之間也會產生延時,,因此採用這種方法進行分解能夠下降必定的時間。

subscribeunsubScribe

咱們先來看代碼:

func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
 select {
 case <-b.exit:
  return nil, errors.New("broker closed")
 default:
 }
 ch := make(chan interface{}, b.capacity)
 b.Lock()
 b.topics[topic] = append(b.topics[topic], ch)
 b.Unlock()
 return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
 select {
 case <-b.exit:
  return errors.New("broker closed")
 default:
 }
 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()
 if !ok {
  return nil
 }
 // delete subscriber
 var newSubs []chan interface{}
 for _, subscriber := range subscribers {
  if subscriber == sub {
   continue
  }
  newSubs = append(newSubs, subscriber)
 }
 b.Lock()
 b.topics[topic] = newSubs
 b.Unlock()
 return nil
}

這裏其實就很簡單了:

  • subscribe:這裏的實現則是爲訂閱的主題建立一個channel,而後將訂閱者加入到對應的topic中就能夠了,而且返回一個接收channel
  • unsubScribe:這裏實現的思路就是將咱們剛纔添加的channel刪除就能夠了。

close

func (b *BrokerImpl) close()  {
 select {
 case <-b.exit:
  return
 default:
  close(b.exit)
  b.Lock()
  b.topics = make(map[string][]chan interface{})
  b.Unlock()
 }
 return
}

這裏就是爲了關閉整個消息隊列,這句代碼b.topics = make(map[string][]chan interface{})比較重要,這裏主要是爲了保證下一次使用該消息隊列不發生衝突。

setConditions GetPayLoad

還差最後兩個方法,一個是設置咱們的消息隊列容量,另外一個是封裝一個方法來獲取咱們訂閱的消息:

func (b *BrokerImpl)setConditions(capacity int)  {
 b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

測試

好啦,代碼這麼快就被寫完了,接下來咱們進行測試一下吧。

單元測試

正式測試以前,咱們仍是須要先進行一下單元測試,養成好的習慣,只有先自測了,纔能有底氣說個人代碼沒問題,要不直接跑程序,會出現不少bug的。

這裏咱們測試方法以下:咱們向不一樣的topic發送不一樣的信息,當訂閱者收到消息後,就行取消訂閱。

func TestClient(t *testing.T) {
 b := NewClient()
 b.SetConditions(100)
 var wg sync.WaitGroup
 for i := 0; i < 100; i++ {
  topic := fmt.Sprintf("Golang夢工廠%d", i)
  payload := fmt.Sprintf("asong%d", i)
  ch, err := b.Subscribe(topic)
  if err != nil {
   t.Fatal(err)
  }
  wg.Add(1)
  go func() {
   e := b.GetPayLoad(ch)
   if e != payload {
    t.Fatalf("%s expected %s but get %s", topic, payload, e)
   }
   if err := b.Unsubscribe(topic, ch); err != nil {
    t.Fatal(err)
   }
   wg.Done()
  }()
  if err := b.Publish(topic, payload); err != nil {
   t.Fatal(err)
  }
 }
 wg.Wait()
}

測試經過,沒問題,接下來咱們在寫幾個方法測試一下

測試

這裏分爲兩種方式測試

測試一:使用一個定時器,向一個主題定時推送消息.

// 一個topic 測試
func OnceTopic()  {
 m := mq.NewClient()
 m.SetConditions(10)
 ch,err :=m.Subscribe(topic)
 if err != nil{
  fmt.Println("subscribe failed")
  return
 }
 go OncePub(m)
 OnceSub(ch,m)
 defer m.Close()
}
// 定時推送
func OncePub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   err := c.Publish(topic,"asong真帥")
   if err != nil{
    fmt.Println("pub message failed")
   }
  default:
  }
 }
}
// 接受訂閱消息
func OnceSub(m <-chan interface{},c *mq.Client)  {
 for  {
  val := c.GetPayLoad(m)
  fmt.Printf("get message is %sn",val)
 }
}

測試二:使用一個定時器,定時向多個主題發送消息:

//多個topic測試
func ManyTopic()  {
 m := mq.NewClient()
 defer m.Close()
 m.SetConditions(10)
 top := ""
 for i:=0;i<10;i++{
  top = fmt.Sprintf("Golang夢工廠_%02d",i)
  go Sub(m,top)
 }
 ManyPub(m)
}
func ManyPub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   for i:= 0;i<10;i++{
    //多個topic 推送不一樣的消息
    top := fmt.Sprintf("Golang夢工廠_%02d",i)
    payload := fmt.Sprintf("asong真帥_%02d",i)
    err := c.Publish(top,payload)
    if err != nil{
     fmt.Println("pub message failed")
    }
   }
  default:
  }
 }
}
func Sub(c *mq.Client,top string)  {
 ch,err := c.Subscribe(top)
 if err != nil{
  fmt.Printf("sub top:%s failedn",top)
 }
 for  {
  val := c.GetPayLoad(ch)
  if val != nil{
   fmt.Printf("%s get message is %sn",top,val)
  }
 }
}

總結

終於幫助姐姐解決了這個問題,姐姐開心死了,給我一頓親,啊不對,是一頓誇,誇的人家都很差意思了。

這一篇你學會了嗎?沒學會沒關係,趕快去把源代碼下載下來,好好通讀一下,很好理解的~~~。

其實這一篇是爲了接下來的kafka學習打基礎的,學好了這一篇,接下來學習的kafka就會容易不少啦~~~

github地址: https://github.com/asong2020/...

若是能給一個小星星就行了~~~

結尾給你們發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,本身也收集了一本PDF,有須要的小夥能夠到自行下載。獲取方式:關注公衆號:[Golang夢工廠],後臺回覆:[微服務],便可獲取。

我翻譯了一份GIN中文文檔,會按期進行維護,有須要的小夥伴後臺回覆[gin]便可下載。

我是asong,一名普普統統的程序猿,讓我一塊兒慢慢變強吧。我本身建了一個golang交流羣,有須要的小夥伴加我vx,我拉你入羣。歡迎各位的關注,咱們下期見~~~

推薦往期文章:

相關文章
相關標籤/搜索