goim 文章系列(共5篇):php
- goim 架構與定製
- 從goim定製, 淺談 golang 的 interface 解耦合與gRPC
- goim中的 bilibili/discovery (eureka)基本概念及應用
- goim 的 data flow 數據流
- goim的業務集成(分享會小結與QA)
有個 slack 頻道, 很多朋友在交流 goim , 歡迎加入slack #goimjava
繼上一篇文章 goim 架構與定製 , 再談 goim 的定製擴展, 這一次談一彈 goim 從 kafka 轉到 natsnode
github 上的 issue 在這裏github.com/Terry-Mao/g…python
簡要說明一下 golang 的 interface: 在 吳德寶AllenWu 文章Golang interface接口深刻理解 中這樣寫到:git
爲何要用接口呢?在Gopher China 上的分享中,有大神給出了下面的理由:github
writing generic algorithm (相似泛型編程)golang
hiding implementation detail (隱藏具體實現)redis
providing interception points (提供攔截點-----> 也可稱叫提供 HOOKS , 一個插入其餘業務邏輯的鉤子)sql
在QQ羣"golang中國" 中, 有關於 de-couple 解耦合的話題中, 閃俠這樣說到:docker
這裏, 就來看看 interface 如何實現 goim 從 kafka 轉到 NATS
看圖, 不說話, 哈哈
上圖中,
那咱們的目標很簡單了, 換了!!! ----------> 等等.......能保留原有 kafka 實現不? 在必要時, 可使用開關項, 切換 nats 或 kafka ??
固然......能夠!
下面就比較簡單, 看碼
先看源代碼( 注意下面代碼中的註釋)
代碼在 github.com/Terry-Mao/g… 大約第33行
// PushMids push a message by mid.
func (l *Logic) PushMids(c context.Context, op int32, mids []int64, msg []byte) (err error) {
keyServers, _, err := l.dao.KeysByMids(c, mids)
if err != nil {
return
}
keys := make(map[string][]string)
for key, server := range keyServers {
if key == "" || server == "" {
log.Warningf("push key:%s server:%s is empty", key, server)
continue
}
keys[server] = append(keys[server], key)
}
for server, keys := range keys {
//
// 主要向 kafka 發送消息, 是下面這一行
// l.dao.PushMsg(c, op, server, keys, msg)
// 方法名是 PushMsg
//
if err = l.dao.PushMsg(c, op, server, keys, msg); err != nil {
return
}
}
return
}
複製代碼
再看一下 dao 是什麼:
代碼在 github.com/Terry-Mao/g… 大約第20行
// Logic struct
type Logic struct {
c *conf.Config
dis *naming.Discovery
//
//
// 下面這個 dao.Dao 提供了 PushMsg 方法
// 帶個星, 這是個引用
//
//
dao *dao.Dao
// online
totalIPs int64
totalConns int64
roomCount map[string]int32
// load balancer
nodes []*naming.Instance
loadBalancer *LoadBalancer
regions map[string]string // province -> region
}
複製代碼
最後, 重點來了, 查到 dao 源頭實現
下面是咱們須要擴展的地方, 在 github.com/Terry-Mao/g…中 dao, 這名稱很 java (DAO-------> Data Access Objects 數據存取對象), 這裏也說明了 bilibili 們在代碼紡織上, 挺規範
代碼在 github.com/Terry-Mao/g… 大約第10行開始
// Dao dao.
type Dao struct {
c *conf.Config
//
// ******************************************************************
// 下面這個 kafkaPub 很清楚, 是 kafka 的同步發佈者 kafka.SyncProducer
//
// 這個是咱們要換成 interface 的地方
//
// ******************************************************************
//
kafkaPub kafka.SyncProducer
redis *redis.Pool
redisExpire int32
}
// New new a dao and return.
func New(c *conf.Config) *Dao {
d := &Dao{
c: c,
//
// ******************************************************************
// 下面這個 newKafkaPub(c.Kafka) 便是初始化 kafka
// 也就是鏈接上 kafka
// 下面, 咱們先改寫一下這個函數, 變通一下代碼形式
//
// ******************************************************************
//
kafkaPub: newKafkaPub(c.Kafka),
redis: newRedis(c.Redis),
redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),
}
return d
}
// 這是鏈接 kafka 的初化函數( function )
//
func newKafkaPub(c *conf.Kafka) kafka.SyncProducer {
kc := kafka.NewConfig()
kc.Producer.RequiredAcks = kafka.WaitForAll // Wait for all in-sync replicas to ack the message
kc.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
kc.Producer.Return.Successes = true
pub, err := kafka.NewSyncProducer(c.Brokers, kc)
if err != nil {
panic(err)
}
return pub
}
複製代碼
這裏, 先小改一下 func New(c *conf.Config) *Dao 這個函數 改爲以下代碼形式
// New new a dao and return.
func New(c *conf.Config) *Dao {
d := &Dao{
c: c,
//
//
// 注意, 下面這行被移出去
// kafkaPub: newKafkaPub(c.Kafka),
//
//
redis: newRedis(c.Redis),
redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),
}
//
// 變成這樣了, 功能沒變化
//
d.kafkaPub = newKafkaPub(c.Kafka)
return d
}
複製代碼
仍是看源代碼
代碼在 github.com/Terry-Mao/g… 大約第13行開始
// PushMsg push a message to databus.
func (d *Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) {
pushMsg := &pb.PushMsg{
Type: pb.PushMsg_PUSH,
Operation: op,
Server: server,
Keys: keys,
Msg: msg,
}
b, err := proto.Marshal(pushMsg)
if err != nil {
return
}
//
// ********************************
//
// 實際發佈消息, 就是下面這個幾行語句
// 1. 組織一下須要發送的信息, 以 kafka 的發佈接口要求的形式
// 2. 嘗試發佈信息, 處理髮布信息可能的錯誤
//
// 重點注意下面這幾行, 後面會改掉
// 重點注意下面這幾行, 後面會改掉
// 重點注意下面這幾行, 後面會改掉
//
// ********************************
//
m := &sarama.ProducerMessage{
Key: sarama.StringEncoder(keys[0]),
Topic: d.c.Kafka.Topic,
Value: sarama.ByteEncoder(b),
}
if _, _, err = d.kafkaPub.SendMessage(m); err != nil {
log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
}
return
}
// BroadcastRoomMsg push a message to databus.
func (d *Dao) BroadcastRoomMsg(c context.Context, op int32, room string, msg []byte) (err error) {
pushMsg := &pb.PushMsg{
Type: pb.PushMsg_ROOM,
Operation: op,
Room: room,
Msg: msg,
}
b, err := proto.Marshal(pushMsg)
if err != nil {
return
}
m := &sarama.ProducerMessage{
Key: sarama.StringEncoder(room),
Topic: d.c.Kafka.Topic,
Value: sarama.ByteEncoder(b),
}
//
// ********************************
// 實際發佈消息, 就是下面這個語句
// ********************************
//
if _, _, err = d.kafkaPub.SendMessage(m); err != nil {
log.Errorf("PushMsg.send(broadcast_room pushMsg:%v) error(%v)", pushMsg, err)
}
return
}
複製代碼
先上代碼, 代碼會說話( golang 簡單就在這裏, 代碼會說話 ) , 後加說明
// PushMsg interface for kafka / nats
// ******************** 這裏是新加的 interface 定義 *****************
type PushMsg interface {
PublishMessage(topic, ackInbox string, key string, msg []byte) error // ****** 這裏小改了個方法名!!! 注意
Close() error
}
// Dao dao.
type Dao struct {
c *conf.Config
push PushMsg // ******************** 看這裏 *****************
redis *redis.Pool
redisExpire int32
}
// New new a dao and return.
func New(c *conf.Config) *Dao {
d := &Dao{
c: c,
redis: newRedis(c.Redis),
redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),
}
if c.UseNats { // ******************** 在配置中加一個 bool 布爾值的開關項 *****************
d.push = NewNats(c) // ******************** 這裏支持 nats *****************
} else {
d.push = NewKafka(c) //// ******************** 這裏是原來的 kafka *****************
}
return d
}
複製代碼
kafka 實現 interface 接口的代碼
// Dao dao.
type kafkaDao struct {
c *conf.Config
push kafka.SyncProducer
}
// New new a dao and return.
func NewKafka(c *conf.Config) *kafkaDao {
d := &kafkaDao{
c: c,
push: newKafkaPub(c.Kafka),
}
return d
}
// PublishMessage push message to kafka
func (d *kafkaDao) PublishMessage(topic, ackInbox string, key string, value []byte) error {
m := &kafka.ProducerMessage{
Key: sarama.StringEncoder(key),
Topic: d.c.Kafka.Topic,
Value: sarama.ByteEncoder(value),
}
_, _, err := d.push.SendMessage(m)
return err
}
複製代碼
nats 對 interface 的實現
// natsDao dao for nats
type natsDao struct {
c *conf.Config
push *nats.Conn
}
// New new a dao and return.
func NewNats(c *conf.Config) *natsDao {
conn, err := newNatsClient(c.Nats.Brokers, c.Nats.Topic, c.Nats.TopicID)
if err != nil {
return nil
}
d := &natsDao{
c: c,
push: conn,
}
return d
}
// PublishMessage push message to nats
func (d *natsDao) PublishMessage(topic, ackInbox string, key string, value []byte) error {
if d.push == nil {
return errors.New("nats error")
}
msg := &nats.Msg{Subject: topic, Reply: ackInbox, Data: value}
return d.push.PublishMsg(msg)
}
複製代碼
最後, 調用 interface 的變動
// PushMsg push a message to databus.
func (d *Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) {
pushMsg := &pb.PushMsg{
Type: pb.PushMsg_PUSH,
Operation: op,
Server: server,
Keys: keys,
Msg: msg,
}
b, err := proto.Marshal(pushMsg)
if err != nil {
return
}
//
// ********************************
//
// 實際發佈消息, 就是下面這個幾行語句
// 1. 組織一下須要發送的信息, 以 kafka 的發佈接口要求的形式
// 2. 嘗試發佈信息, 處理髮布信息可能的錯誤
//
// 重點注意下面這幾行, 實際更改
// 重點注意下面這幾行, 實際更改
// 重點注意下面這幾行, 實際更改
//
// ********************************
if err = d.push.PublishMessage(d.c.Kafka.Topic, d.c.Nats.AckInbox, keys[0], b); err != nil {
log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
}
return
}
複製代碼
OK, 修改完成
簡明來講, interface 接口定義一下名稱, 再定義接口中要實現的方法 method ( 方法集合 )
// PushMsg interface for kafka / nats
// ******************** 這裏是新加的 interface 定義 *****************
type PushMsg interface {
PublishMessage(topic, ackInbox string, key string, msg []byte) error // ****** 這裏小改了個方法名!!! 注意
Close() error
}
// Dao dao.
type Dao struct {
c *conf.Config
push PushMsg // ******************** 看這裏 *****************
redis *redis.Pool
redisExpire int32
}
複製代碼
上面 定義了 PushMsg 這個interface , 這是一個 方法( method)集合
- topic 這是 kafka 或 nats 裏的主題, 也就是 pub/sub 發佈/訂閱的頻道
- ackInbox 這是 publish 發佈的 confirm 確認頻道
- key 消息體( payload ) 的鍵
- msg 這是消息體 payload
這就是一個接口定義, 方法名/ 輸入/ 輸出, 至於方法的具體實現, 交由下面的實體去實現( 能夠看 kafka / nats 中分別對應的 PublishMessage 的方法實現)
很清楚, 方法是由具體實現來完成, 下面就是實例化方法
是用哪個具體實現呢, 就看實例化哪個了, interface 最終落地, 就在這裏
if c.UseNats { // ******************** 在配置中加一個 bool 布爾值的開關項 *****************
d.push = NewNats(c) // ******************** 這裏支持 nats *****************
} else {
d.push = NewKafka(c) //// ******************** 這裏是原來的 kafka *****************
}
複製代碼
而在 func (d *Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) 中, 則簡單調用 interface 定義的方法
與其餘方法 method 或函數 function 是同樣的, 沒什麼特別的
// ********************************
if err = d.push.PublishMessage(d.c.Kafka.Topic, d.c.Nats.AckInbox, keys[0], b); err != nil {
log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
}
複製代碼
再一次回看,
在 吳德寶AllenWu 文章Golang interface接口深刻理解 中這樣寫到:
爲何要用接口呢?在Gopher China 上的分享中,有大神給出了下面的理由:
writing generic algorithm (相似泛型編程)
hiding implementation detail (隱藏具體實現)
providing interception points (提供攔截點-----> 也可稱叫提供 HOOKS , 一個插入其餘業務邏輯的鉤子)
interface 確是隱藏了具體實現, 能讓咱們很容易的把 goim 對 kafka 的依賴, 切換到 nats , 而且經過一個開關項, 來肯定使用哪個具體實現
擴展一下, 這個 interface 也能夠實現從 kafka 切換到 rabbitMQ / activeMQ / redis (pub/sub) .... 只要簡單實現 PushMsg 這個 interface 就好啦
另有 goim 在 job 網元上的 subscribe 訂閱接口, 支持 interface 代碼是一路子方法, 直接看源碼吧, 有交流討論再另寫.
注: job 代碼中, 我把某個方法( method ) 拆解成了函數( function ), 有興趣的朋友能夠查一下, 有些小區別,但效果同樣.
goim 源代碼在github.com/Terry-Mao/g…
我寫的代碼在github.com/tsingson/go…
下面是 2019/04/23 補充內容:
經網上交流, 另外一位朋友 weisd 改寫的 goim, 支持 nsq 的 interface, 代碼組織得比我好啊:
代碼在這裏 github.com/weisd/goim
gRPC , 就是 google 的 RPC ( Remote Procedure Call) , 看一下 gRPC 以 go 實現的 interface 定義
protobuf 是 gRPC 中默認的 接口定義, 就像 愛立信 ICE ( 開源版本是 zeroICE ) 的 slice , apache 的 thrift
在 goim 中, 網元間用 gRPC 通信, 再看圖
看圖上的 grpc 標示, 注意, 圖上標示箭頭不徹底準確:grpc 同時支持
- 普通 Client / Server 調用(北向)接口
- Client 向 Server 的流式(北向)流式接口
- Server 向 Cinet 調用(南向)流式接口
- 以及 Server / Client 雙向流式接口
網上文章不少, 不一一展開了. 咱們重點關注一下, golang 中對 gRPC 的實現, 也就是 golang 如何把 protobuf 定義的接口, 定義爲 golang 中的 interface , 以及如何具體實現 interface .
看碼, 看碼, 看碼:
syntax = "proto3";
package goim.comet;
option go_package = "grpc";
//......
//
// ************************
// 這裏定義 input 輸入
message PushMsgReq {
repeated string keys = 1;
int32 protoOp = 3;
Proto proto = 2;
}
//
// ************************
// 這裏定義 output 輸出
message PushMsgReply {}
//.........
service Comet {
// ..........
//PushMsg push by key or mid
//
// ************************
// 這裏定義接口, 這個接口能夠由
// golang / java / rust / js / python / php ...實現
//
// 這是解耦合的極致啊!!!!!!!!!!!!!!!!
//
// ************************
//
rpc PushMsg(PushMsgReq) returns (PushMsgReply);
// Broadcast send to every enrity
// ...........
}
複製代碼
注意, 下面的源碼是 protobuf 自動生成的, 不須要編輯更改, 註釋是方便溝通額外加的
// Server API for Comet service
// ************************
// 這裏定義接口, golang 實現服務器端
// ************************
type CometServer interface {
...
// PushMsg push by key or mid
//
// ************************
// 這裏定義接口, golang 的接口中的方法
// ************************
//
PushMsg(context.Context, *PushMsgReq) (*PushMsgReply, error)
...
}
複製代碼
最後, 具體實例化代碼實現, 在
代碼會說話兒, 這裏就不展現了.
謝謝朋友們看到最後, 寫碼掙錢的朋友都是有一說一, 這裏聲明一下:
代碼中把 kafka 寫成可用 nats 替換, 只是技術上的學習與嘗試, 並非建議或推薦使用 nats:
因此, case by case , 具體業務場景具體分析, 商用項目的選型, 是一個慎重而嚴謹的事兒
請自行評估風險/成本
.
.
感謝 www.bilibili.com & 毛劍 及衆多開源社區的朋友們
歡迎交流與批評..... .
有朋友問了些不太相關問題, 公開加一下:
發一張老圖兒(幾年前的項目了), omnigraffle 畫的, 這軟件挺好用( 只有 mac 版本 )
網名 tsingson (三明智, 江湖人稱3爺)
原 ustarcom IPTV/OTT 事業部播控產品線技術架構溼/解決方案工程溼角色(8年), 自由職業者,
喜歡音樂(口琴,是第三/四/五屆廣東國際口琴嘉年華的主策劃人之一), 攝影與越野,
喜歡 golang 語言 (商用項目中主要用 postgres + golang )