publish和subscribe模式使用場景仍是不少的,記得之前面試的時候有被問到若是要你作一個版本自動升級的,你會怎麼作?當時正想在項目中引入etcd,而後考慮了下以爲也能夠用publish和subscribe的思路去解決這個問題。git
publish是gitlab產生的代碼merge到master消息,能夠考慮配置webhook來實現,固然用jenkins也沒問題,在構建完成以後,jenkins經過curl發送最終的構建結果。github
subscribe是各個節點服務器須要安裝的一個agent程序,這個agent程序watch本身關注的項目的更新,一旦發現更新以後,拉取最新版本的程序進行更新。web
publish能夠經過長連接也能夠經過http接口來實現,問題不大。subscribe若是要保證明時性,須要經過長連接來實現,若是對實時性要求不高,設定時間間隔輪訓調用http接口也徹底沒問題。面試
整個功能分紅三個模塊:json
簡單畫一個草圖: bash
大體就這麼個思路。而後考慮了下,好像整個模型代碼寫起來也不是很複雜,而後想就動手寫起來了。服務器
首先是broker,broker要服務於publisher和subscriber,publisher能夠採用標準的htt接口,subscriber能夠採用websocket或者本身手寫一個長鏈接,也能夠用開源的一些長鏈接保活的庫,像smux是我常常會用到的一個tcp庫。websocket
websocket一般狀況不用處理編解碼的問題,可是還須要添加心跳保活,smux不須要處理心跳保活(庫自己提供有心跳機制),可是編解碼要本身另外處理。app
publisher能夠不用實現,提供接口規範就行。curl
subscriber實現一個長鏈接的客戶端,讀到消息就打印消息內容便可(不考慮版本更新這一過程設計)
首先開始設計broker和subscribe的通訊協議 proto.go
package proto
const (
CMD_S2B_HEARTBEAT = iota
CMD_B2S_HEARTBEAT
CMD_B2S_MSG
)
type B2SBody S2BBody
type S2BBody struct {
Cmd int `json:"cmd"`
Data interface{} `json:"data"`
}
type S2BSubscribe struct {
Topics []string `json:"topics"`
}
type S2BHeartbeat struct{}
type B2SHeartbeat struct{}
複製代碼
ok,而後能夠開始寫broker代碼了,broker和subscriber最終採用websocket來進行, broker和publisher採用http協議。
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"sync"
"github.com/ICKelin/pubsub/proto"
"github.com/gorilla/websocket"
)
type Broker struct {
addr string
muEntry sync.RWMutex
entry map[string][]*subscriber
done chan struct{}
}
func NewBroker(pubaddr, subaddr string) *Broker {
return &Broker{
addr: pubaddr,
entry: make(map[string][]*subscriber),
done: make(chan struct{}),
}
}
func (b *Broker) Run() {
http.HandleFunc("/pub", b.onPublish)
http.HandleFunc("/sub", b.onSubscriber)
http.ListenAndServe(b.addr, nil)
}
type pubBody struct {
Topic string `json:"topic"`
Msg interface{} `json:"msg"`
}
func (b *Broker) onPublish(w http.ResponseWriter, r *http.Request) {
bytes, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
return
}
body := &pubBody{}
err = json.Unmarshal(bytes, &body)
if err != nil {
log.Println(err)
return
}
topic := body.Topic
msg := body.Msg
log.Println("publish topic ", topic, msg)
b.muEntry.RLock()
subscribers := b.entry[topic]
b.muEntry.RUnlock()
if subscribers == nil {
// drop message once no subscriber
// TODO: store msg
return
}
for _, s := range subscribers {
s.push(msg)
}
}
type subscribeMsg struct {
Topics []string `json:"topics"`
}
func (b *Broker) onSubscriber(w http.ResponseWriter, r *http.Request) {
upgrade, err := websocket.Upgrade(w, r, nil, 1024, 1024)
if err != nil {
log.Println(err)
return
}
subMsg := proto.S2BSubscribe{}
upgrade.ReadJSON(&subMsg)
s := newSubscriber(subMsg.Topics, upgrade.RemoteAddr().String())
b.muEntry.Lock()
for _, topic := range s.topics {
b.entry[topic] = append(b.entry[topic], s)
}
b.muEntry.Unlock()
s.serveSubscriber(upgrade)
upgrade.Close()
b.muEntry.Lock()
for _, topic := range s.topics {
for i, s := range b.entry[topic] {
if s.raddr == upgrade.RemoteAddr().String() {
log.Println("remove subscriber: ", s.raddr, " from topic ", topic)
if i == len(b.entry[topic])-1 {
b.entry[topic] = b.entry[topic][:i]
} else {
b.entry[topic] = append(b.entry[topic][:i], b.entry[topic][i+1:]...)
}
break
}
}
}
b.muEntry.Unlock()
}
複製代碼
broker的subscriber處理
import (
"log"
"time"
"github.com/ICKelin/pubsub/proto"
"github.com/gorilla/websocket"
)
type subscriber struct {
topics []string
raddr string
done chan struct{}
writebuf chan *proto.B2SBody
// parent string
// children []*subscriber
}
func newSubscriber(topics []string, raddr string) *subscriber {
return &subscriber{
topics: topics,
raddr: raddr,
done: make(chan struct{}),
writebuf: make(chan *proto.B2SBody),
}
}
func (s *subscriber) serveSubscriber(conn *websocket.Conn) {
go s.reader(conn)
s.writer(conn)
}
func (s *subscriber) reader(conn *websocket.Conn) {
defer close(s.done)
for {
var obj proto.S2BBody
err := conn.ReadJSON(&obj)
if err != nil {
log.Println(err)
break
}
switch obj.Cmd {
case proto.CMD_S2B_HEARTBEAT:
conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
conn.WriteJSON(&proto.S2BBody{Cmd: proto.CMD_B2S_HEARTBEAT})
conn.SetWriteDeadline(time.Time{})
}
}
}
func (s *subscriber) writer(conn *websocket.Conn) {
for {
select {
case buf := <-s.writebuf:
conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
conn.WriteJSON(buf)
conn.SetWriteDeadline(time.Time{})
// drop msg once writejson fail
case <-s.done:
return
}
}
}
func (s *subscriber) push(data interface{}) {
s.writebuf <- &proto.B2SBody{Cmd: proto.CMD_B2S_MSG, Data: data}
}
複製代碼
這兩段代碼還有幾個很是明顯的問題
爲了解決上面三個問題,可能須要考慮數據持久化以及消息ack等機制,這個也是一大難題。
再來看看subscriber的實現,subscriber能夠很簡單實現,一個goroutine發心跳包,一個goroutine收數據包,收到的消息以後調用回調函數進行相應的處理。
package main
import (
"log"
"time"
"github.com/ICKelin/pubsub/proto"
"github.com/gorilla/websocket"
)
type subscriber struct {
topics []string
broker string
cb func(msg interface{})
}
func newSubscriber(topics []string, broker string, cb func(msg interface{})) *subscriber {
return &subscriber{
topics: topics,
broker: broker,
cb: cb,
}
}
func (s *subscriber) Run() error {
conn, _, err := websocket.DefaultDialer.Dial(s.broker, nil)
if err != nil {
return err
}
defer conn.Close()
subMsg := proto.S2BSubscribe{Topics: s.topics}
err = conn.WriteJSON(&subMsg)
if err != nil {
return err
}
go func() {
for {
body := &proto.S2BBody{
Cmd: proto.CMD_S2B_HEARTBEAT,
Data: &proto.S2BHeartbeat{},
}
conn.WriteJSON(body)
time.Sleep(time.Second * 3)
}
}()
for {
var body = proto.S2BBody{}
conn.ReadJSON(&body)
switch body.Cmd {
case proto.CMD_B2S_HEARTBEAT:
log.Println("hb from broker")
case proto.CMD_B2S_MSG:
s.cb(body.Data)
}
}
}
func main() {
s := newSubscriber([]string{"gtun", "https://www.notr.tech"}, "ws://127.0.0.1:10002/sub", func(msg interface{}) {
log.Println(msg)
})
s.Run()
}
複製代碼