Go 每日一庫之 watermill

簡介

在上一篇文章Go 每日一庫之 message-bus中,咱們介紹了一款小巧、實現簡單的異步通訊庫。做爲學習,message-bus確實不錯。可是在實際使用上,message-bus的功能就有點捉襟見肘了。例如,message-bus將消息發送到訂閱者管道以後就無論了,這樣若是訂閱者處理壓力較大,會在管道中堆積太多消息,一旦訂閱者異常退出,這些消息將會所有丟失!另外,message-bus不負責保存消息,若是訂閱者後啓動,以前發佈的消息,這個訂閱者是沒法收到的。這些問題,咱們將要介紹的watermill都能解決!git

watermill是 Go 語言的一個異步消息解決方案,它支持消息重傳、保存消息,後啓動的訂閱者也能收到前面發佈的消息。watermill內置了多種訂閱-發佈實現,包括Kafka/RabbitMQ,甚至還支持HTTP/MySQL binlog。固然也能夠編寫本身的訂閱-發佈實現。此外,它還提供了監控、限流等中間件。github

快速使用

watermill內置了不少訂閱-發佈實現,最簡單、直接的要屬GoChannel。咱們就以這個實現爲例介紹watermill的特性。golang

安裝:windows

$ go get github.com/ThreeDotsLabs/watermill
複製代碼

使用:服務器

package main

import (
  "context"
  "log"
  "time"

  "github.com/ThreeDotsLabs/watermill"
  "github.com/ThreeDotsLabs/watermill/message"
  "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
  pubSub := gochannel.NewGoChannel(
    gochannel.Config{},
    watermill.NewStdLogger(false, false),
  )

  messages, err := pubSub.Subscribe(context.Background(), "example.topic")
  if err != nil {
    panic(err)
  }

  go process(messages)

  publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
  for {
    msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

    if err := publisher.Publish("example.topic", msg); err != nil {
      panic(err)
    }

    time.Sleep(time.Second)
  }
}

func process(messages <-chan *message.Message) {
  for msg := range messages {
    log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
    msg.Ack()
  }
}
複製代碼

首先,咱們建立一個GoChannel對象,它是一個消息管理器。能夠調用其Subscribe訂閱某個主題(topic)的消息,調用其Publish()以某個主題發佈消息。Subscribe()方法會返回一個<-chan *message.Message,一旦該主題有消息發佈,GoChannel就會將消息發送到該管道中。訂閱者只需監聽此管道,接收消息進行處理。在上面的例子中,咱們啓動了一個消息處理的goroutine,持續從管道中讀取消息,而後打印輸出。主goroutine在一個死循環中每隔 1s 發佈一次消息。微信

message.Message這個結構是watermill庫的核心,每一個消息都會封裝到該結構中發送。Message保存的是原始的字節流([]byte),因此能夠將 JSON/protobuf/XML 等等格式的序列化結果保存到Message中。併發

有兩點注意:dom

  • 收到的每一個消息都須要調用MessageAck() 方法確認,不然GoChannel會重發當前消息;
  • Message有一個UUID字段,建議設置爲惟一的,方便定位問題。watermill提供方法NewUUID()生成惟一 id。

下面看示例運行:異步

路由

上面的發佈和訂閱實現是很是底層的模式。在實際應用中,咱們一般想要監控、重試、統計等一些功能。並且上面的例子中,每一個消息處理結束須要手動調用Ack()方法,消息管理器纔會下發後面一條信息,很容易遺忘。還有些時候,咱們有這樣的需求,處理完某個消息後,從新發布另一些消息。函數

這些功能都是比較通用的,爲此watermill提供了路由(Router)功能。直接拿來官網的圖:

路由其實管理多個訂閱者,每一個訂閱者在一個獨立的goroutine中運行,彼此互不干擾。訂閱者收到消息後,交由註冊時指定的處理函數(HandlerFunc)。路由還能夠設置插件(plugin)和中間件(middleware),插件是定製路由的行爲,而中間件是定製處理器的行爲。處理器處理消息後會返回若干消息,這些消息會被路由從新發布到(另外一個)管理器中。

var (
  logger = watermill.NewStdLogger(false, false)
)

func main() {
  router, err := message.NewRouter(message.RouterConfig{}, logger)
  if err != nil {
    panic(err)
  }

  pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
  go publishMessages(pubSub)

  router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)

  router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
  router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

  ctx := context.Background()
  if err := router.Run(ctx); err != nil {
    panic(err)
  }
}

func publishMessages(publisher message.Publisher) {
  for {
    msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
    if err := publisher.Publish("in_topic", msg); err != nil {
      panic(err)
    }

    time.Sleep(time.Second)
  }
}

func printMessages(msg *message.Message) error {
  fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
  return nil
}

type myHandler struct {
}

func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
  log.Println("myHandler received message", msg.UUID)

  msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
  return message.Messages{msg}, nil
}
複製代碼

首先,咱們建立一個路由:

router, err := message.NewRouter(message.RouterConfig{}, logger)
複製代碼

而後爲路由註冊處理器。註冊的處理器有兩種類型,一種是:

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
複製代碼

這個方法原型爲:

func (r *Router) AddHandler( handlerName string, subscribeTopic string, subscriber Subscriber, publishTopic string, publisher Publisher, handlerFunc HandlerFunc, ) *Handler 複製代碼

該方法的做用是建立一個名爲handlerName的處理器,監聽subscriber中主題爲subscribeTopic的消息,收到消息後調用handlerFunc處理,將返回的消息以主題publishTopic發佈到publisher中。

另一種處理器是下面這種形式:

router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)
複製代碼

從名字咱們也能夠看出,這種形式的處理器只處理接收到的消息,不發佈新消息。

最後,咱們調用router.Run()運行這個路由。

其中,建立GoChannel發佈消息和上面的沒什麼不一樣。

使用路由還有個好處,處理器返回時,若無錯誤,路由會自動調用消息的Ack()方法;若發生錯誤,路由會調用消息的Nack()方法通知管理器重發這條消息。

上面只是路由的最基本用法,路由的強大之處在於中間件。

中間件

watermill中內置了幾個比較經常使用的中間件:

  • IgnoreErrors:能夠忽略指定的錯誤;
  • Throttle:限流,限制單位時間內處理的消息數量;
  • Poison:將處理失敗的消息以另外一個主題發佈;
  • Retry:重試,處理失敗能夠重試;
  • Timeout:超時,若是消息處理時間超過給定的時間,直接失敗。
  • InstantAck:直接調用消息的Ack()方法,無論後續成功仍是失敗;
  • RandomFail:隨機拋出錯誤,測試時使用;
  • Duplicator:調用兩次處理函數,兩次返回的消息都從新發布出去,double~
  • Correlation:處理函數生成的消息都統一設置成原始消息中的correlation id,方便追蹤消息來源;
  • Recoverer:捕獲處理函數中的panic,包裝成錯誤返回。

中間件的使用也是比較簡單和直接的:調用router.AddMiddleware()。例如,咱們想要把處理返回的消息 double 一下:

router.AddMiddleware(middleware.Duplicator)
複製代碼

想重試?能夠:

router.AddMiddleware(middleware.Retry{
  MaxRetries:      3,
  InitialInterval: time.Millisecond * 100,
  Logger:          logger,
}.Middleware)
複製代碼

上面設置最大重試次數爲 3,重試初始時間間隔爲 100ms。

通常狀況下,生產環境須要保證穩定性,某個處理異常不能影響後續的消息處理。故設置Recoverer是比較好的選擇:

router.AddMiddleware(middleware.Recoverer)
複製代碼

也能夠實現本身的中間件:

func MyMiddleware(h message.HandlerFunc) message.HandlerFunc {
  return func(message *message.Message) ([]*message.Message, error) {
    fields := watermill.LogFields{"name": m.Name}
    logger.Info("myMiddleware before", fields)
    ms, err := h(message)
    logger.Info("myMiddleware after", fields)
    return ms, err
  }
}
複製代碼

中間件有兩種實現方式,若是不須要參數或依賴,那麼直接實現爲函數便可,像上面這樣。若是須要有參數,那麼能夠實現爲一個結構:

type myMiddleware struct {
  Name string
}

func (m myMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
  return func(message *message.Message) ([]*message.Message, error) {
    fields := watermill.LogFields{"name": m.Name}
    logger.Info("myMiddleware before", fields)
    ms, err := h(message)
    logger.Info("myMiddleware after", fields)
    return ms, err
  }
}
複製代碼

這兩種中間件的添加方式有所不一樣,第一種直接添加:

router.AddMiddleware(MyMiddleware)
複製代碼

第二種要構造一個對象,而後將其Middleware方法傳入,在該方法中能夠訪問MyMiddleware對象的字段:

router.AddMiddleware(MyMiddleware{Name:"dj"}.Middleware)
複製代碼

設置

若是運行上面程序,你極可能會看到這樣一條日誌:

No subscribers to send message
複製代碼

由於發佈消息是在另外一個goroutine,咱們沒有控制什麼時候發佈,可能發佈消息時,咱們還未訂閱。咱們觀察後面的處理日誌,對比 uuid 發現這條消息直接被丟棄了。watermill提供了一個選項,能夠將消息都保存下來,訂閱某個主題時將該主題以前的消息也發送給它:

pubSub := gochannel.NewGoChannel(
  gochannel.Config{
    Persistent: true,
  }, logger)
複製代碼

建立GoChannel時將ConfigPersistent字段設置爲true便可。此時運行,咱們仔細觀察一下,出現No subscribers to send message信息的消息後續確實被處理了。

RabbitMQ

除了GoChannelwatermill還內置了其餘的發佈-訂閱實現。這些實現除了發佈-訂閱器建立的方式不一樣,其餘與咱們以前介紹的基本同樣。這裏咱們簡單介紹一下RabbitMQ,其餘的可自行研究。

使用RabbitMQ須要先運行RabbitMQ程序,RabbitMQ採用Erlang開發。咱們以前不少文章也介紹過 windows 上的軟件安裝神器choco。使用choco安裝RabbitMQ

$ choco install rabbitmq
複製代碼

啓動RabbitMQ服務器:

$ rabbitmq-server.bat
複製代碼

watermillRabbitMQ的支持使用獨立庫的形式,須要另行安裝:

$ go get -u github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp
複製代碼

發佈訂閱:

var amqpURI = "amqp://localhost:5672/"

func main() {
  amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

  subscriber, err := amqp.NewSubscriber(
    amqpConfig,
    watermill.NewStdLogger(false, false),
  )
  if err != nil {
    panic(err)
  }

  messages, err := subscriber.Subscribe(context.Background(), "example.topic")
  if err != nil {
    panic(err)
  }

  go process(messages)

  publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
  if err != nil {
    panic(err)
  }

  publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
  for {
    msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

    if err := publisher.Publish("example.topic", msg); err != nil {
      panic(err)
    }

    time.Sleep(time.Second)
  }
}

func process(messages <-chan *message.Message) {
  for msg := range messages {
    log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
    msg.Ack()
  }
}
複製代碼

若是有自定義發佈-訂閱實現的需求,能夠參考RabbitMQ的實現:github.com/ThreeDotsLa…

總結

watermill提供豐富的功能,且預留了擴展點,可自行擴展。另外,源碼中處理goroutine建立和通訊、多種併發模式的應用都是值得一看的。官方 GitHub 上還有一個事件驅動示例:github.com/ThreeDotsLa…

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. watermill 官方文檔:watermill.io/
  2. Go 每日一庫 GitHub:github.com/darjun/go-d…

個人博客

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索