websocket pub/sub 模型

publish和subscribe模式使用場景仍是不少的,記得之前面試的時候有被問到若是要你作一個版本自動升級的,你會怎麼作?當時正想在項目中引入etcd,而後考慮了下以爲也能夠用publish和subscribe的思路去解決這個問題。git

publish是gitlab產生的代碼merge到master消息,能夠考慮配置webhook來實現,固然用jenkins也沒問題,在構建完成以後,jenkins經過curl發送最終的構建結果。github

subscribe是各個節點服務器須要安裝的一個agent程序,這個agent程序watch本身關注的項目的更新,一旦發現更新以後,拉取最新版本的程序進行更新。web

publish能夠經過長連接也能夠經過http接口來實現,問題不大。subscribe若是要保證明時性,須要經過長連接來實現,若是對實時性要求不高,設定時間間隔輪訓調用http接口也徹底沒問題。面試

整個功能分紅三個模塊:json

  1. broker: 做爲pub和sub的中轉
  2. publisher:消息發佈者
  3. subscriber:消息訂閱者

簡單畫一個草圖: bash

pub/sub/broker

大體就這麼個思路。而後考慮了下,好像整個模型代碼寫起來也不是很複雜,而後想就動手寫起來了。服務器

首先是broker,broker要服務於publisher和subscriber,publisher能夠採用標準的htt接口,subscriber能夠採用websocket或者本身手寫一個長鏈接,也能夠用開源的一些長鏈接保活的庫,像smux是我常常會用到的一個tcp庫。websocket

websocket一般狀況不用處理編解碼的問題,可是還須要添加心跳保活,smux不須要處理心跳保活(庫自己提供有心跳機制),可是編解碼要本身另外處理。app

publisher能夠不用實現,提供接口規範就行。curl

subscriber實現一個長鏈接的客戶端,讀到消息就打印消息內容便可(不考慮版本更新這一過程設計)

首先開始設計broker和subscribe的通訊協議 proto.go

package proto

const (
	CMD_S2B_HEARTBEAT = iota
	CMD_B2S_HEARTBEAT
	CMD_B2S_MSG
)

type B2SBody S2BBody

type S2BBody struct {
	Cmd  int         `json:"cmd"`
	Data interface{} `json:"data"`
}

type S2BSubscribe struct {
	Topics []string `json:"topics"`
}

type S2BHeartbeat struct{}

type B2SHeartbeat struct{}


複製代碼

ok,而後能夠開始寫broker代碼了,broker和subscriber最終採用websocket來進行, broker和publisher採用http協議。

import (
	"encoding/json"
	"io/ioutil"
	"log"
	"net/http"
	"sync"

	"github.com/ICKelin/pubsub/proto"
	"github.com/gorilla/websocket"
)

type Broker struct {
	addr    string
	muEntry sync.RWMutex
	entry   map[string][]*subscriber
	done    chan struct{}
}

func NewBroker(pubaddr, subaddr string) *Broker {
	return &Broker{
		addr:  pubaddr,
		entry: make(map[string][]*subscriber),
		done:  make(chan struct{}),
	}
}

func (b *Broker) Run() {
	http.HandleFunc("/pub", b.onPublish)
	http.HandleFunc("/sub", b.onSubscriber)
	http.ListenAndServe(b.addr, nil)
}

type pubBody struct {
	Topic string      `json:"topic"`
	Msg   interface{} `json:"msg"`
}

func (b *Broker) onPublish(w http.ResponseWriter, r *http.Request) {
	bytes, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Println(err)
		return
	}

	body := &pubBody{}
	err = json.Unmarshal(bytes, &body)
	if err != nil {
		log.Println(err)
		return
	}

	topic := body.Topic
	msg := body.Msg

	log.Println("publish topic ", topic, msg)

	b.muEntry.RLock()
	subscribers := b.entry[topic]
	b.muEntry.RUnlock()

	if subscribers == nil {
		// drop message once no subscriber
		// TODO: store msg
		return
	}

	for _, s := range subscribers {
		s.push(msg)
	}
}

type subscribeMsg struct {
	Topics []string `json:"topics"`
}

func (b *Broker) onSubscriber(w http.ResponseWriter, r *http.Request) {
	upgrade, err := websocket.Upgrade(w, r, nil, 1024, 1024)
	if err != nil {
		log.Println(err)
		return
	}

	subMsg := proto.S2BSubscribe{}
	upgrade.ReadJSON(&subMsg)

	s := newSubscriber(subMsg.Topics, upgrade.RemoteAddr().String())

	b.muEntry.Lock()
	for _, topic := range s.topics {
		b.entry[topic] = append(b.entry[topic], s)
	}
	b.muEntry.Unlock()

	s.serveSubscriber(upgrade)
	upgrade.Close()

	b.muEntry.Lock()
	for _, topic := range s.topics {
		for i, s := range b.entry[topic] {
			if s.raddr == upgrade.RemoteAddr().String() {
				log.Println("remove subscriber: ", s.raddr, " from topic ", topic)

				if i == len(b.entry[topic])-1 {
					b.entry[topic] = b.entry[topic][:i]
				} else {
					b.entry[topic] = append(b.entry[topic][:i], b.entry[topic][i+1:]...)
				}
				break
			}
		}
	}
	b.muEntry.Unlock()
}

複製代碼

broker的subscriber處理

import (
	"log"
	"time"

	"github.com/ICKelin/pubsub/proto"
	"github.com/gorilla/websocket"
)

type subscriber struct {
	topics   []string
	raddr    string
	done     chan struct{}
	writebuf chan *proto.B2SBody
	// parent string
	// children []*subscriber
}

func newSubscriber(topics []string, raddr string) *subscriber {
	return &subscriber{
		topics:   topics,
		raddr:    raddr,
		done:     make(chan struct{}),
		writebuf: make(chan *proto.B2SBody),
	}
}

func (s *subscriber) serveSubscriber(conn *websocket.Conn) {
	go s.reader(conn)
	s.writer(conn)
}

func (s *subscriber) reader(conn *websocket.Conn) {
	defer close(s.done)
	for {
		var obj proto.S2BBody
		err := conn.ReadJSON(&obj)
		if err != nil {
			log.Println(err)
			break
		}

		switch obj.Cmd {
		case proto.CMD_S2B_HEARTBEAT:
			conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
			conn.WriteJSON(&proto.S2BBody{Cmd: proto.CMD_B2S_HEARTBEAT})
			conn.SetWriteDeadline(time.Time{})
		}
	}
}

func (s *subscriber) writer(conn *websocket.Conn) {
	for {
		select {
		case buf := <-s.writebuf:
			conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
			conn.WriteJSON(buf)
			conn.SetWriteDeadline(time.Time{})
			// drop msg once writejson fail

		case <-s.done:
			return
		}
	}
}

func (s *subscriber) push(data interface{}) {
	s.writebuf <- &proto.B2SBody{Cmd: proto.CMD_B2S_MSG, Data: data}
}

複製代碼

這兩段代碼還有幾個很是明顯的問題

  1. 若是沒有subscriber,那麼這條publish的消息會被丟棄
  2. 若是從broker到subscriber發送的消息超時,消息同樣會被丟棄
  3. 發送成功,並不表明對端成功接收到了,如何保證消息真的發佈成功

爲了解決上面三個問題,可能須要考慮數據持久化以及消息ack等機制,這個也是一大難題。

再來看看subscriber的實現,subscriber能夠很簡單實現,一個goroutine發心跳包,一個goroutine收數據包,收到的消息以後調用回調函數進行相應的處理。

package main

import (
	"log"
	"time"

	"github.com/ICKelin/pubsub/proto"
	"github.com/gorilla/websocket"
)

type subscriber struct {
	topics []string
	broker string
	cb     func(msg interface{})
}

func newSubscriber(topics []string, broker string, cb func(msg interface{})) *subscriber {
	return &subscriber{
		topics: topics,
		broker: broker,
		cb:     cb,
	}
}

func (s *subscriber) Run() error {
	conn, _, err := websocket.DefaultDialer.Dial(s.broker, nil)
	if err != nil {
		return err
	}

	defer conn.Close()
	subMsg := proto.S2BSubscribe{Topics: s.topics}

	err = conn.WriteJSON(&subMsg)
	if err != nil {
		return err
	}

	go func() {
		for {
			body := &proto.S2BBody{
				Cmd:  proto.CMD_S2B_HEARTBEAT,
				Data: &proto.S2BHeartbeat{},
			}
			conn.WriteJSON(body)
			time.Sleep(time.Second * 3)
		}
	}()

	for {
		var body = proto.S2BBody{}
		conn.ReadJSON(&body)

		switch body.Cmd {
		case proto.CMD_B2S_HEARTBEAT:
			log.Println("hb from broker")

		case proto.CMD_B2S_MSG:
			s.cb(body.Data)
		}
	}
}

func main() {
	s := newSubscriber([]string{"gtun", "https://www.notr.tech"}, "ws://127.0.0.1:10002/sub", func(msg interface{}) {
		log.Println(msg)
	})
	s.Run()
}

複製代碼
相關文章
相關標籤/搜索