今天來介紹 go-zero
生態的另外一個組件 go-stash
。這是一個 logstash
的 Go 語言替代版,咱們用 go-stash
相比原先的 logstash
節省了2/3的服務器資源。若是你在用 logstash
,不妨試試,也能夠看看基於 go-zero
實現這樣的工具是多麼的容易,這個工具做者僅用了兩天時間。git
先從它的配置中,咱們來看看設計架構。github
Clusters: - Input: Kafka: # Kafka 配置 --> 聯動 go-queue Filters: # filter action - Action: drop - Action: remove_field - Action: transfer Output: ElasticSearch: # es 配置 {host, index}
看配置名:kafka
是數據輸出端,es
是數據輸入端,filter
抽象了數據處理過程。json
對,整個 go-stash
就是如 config 配置中顯示的,所見即所得。服務器
從 stash.go
的啓動流程大體分爲幾個部分。由於能夠配置多個 cluster
,那從一個 cluster
分析:微信
es
的鏈接【傳入 es
配置】filter processors
【es
前置處理器,作數據過濾以及處理,能夠設置多個】es
中 索引配置,啓動 handle
,同時將 filter
加入handle【處理輸入輸出】kafka
,將上面建立的 handle
傳入,完成 kafka
和 es
之間的數據消費和數據寫入在上面架構圖中,中間的 filter
只是從 config 中看到,其實更詳細是 MessageHandler
的一部分,作數據過濾和轉換,下面來講說這塊。架構
如下代碼: https://github.com/tal-tech/g...
type MessageHandler struct { writer *es.Writer indexer *es.Index filters []filter.FilterFunc }
這個就對應上面說的,filter
只是其中一部分,在結構上 MessageHandler
是對接下游 es
,可是沒有看到對 kafka
的操做。微服務
別急,從接口設計上 MessageHandler
實現了 go-queue
中 ConsumeHandler
接口。工具
這裏,上下游就串聯了:性能
MessageHandler
接管了 es
的操做,負責數據處理到數據寫入kafka
的 Consume
操做。這樣在消費過程當中執行 handler
的操做,從而寫入 es
實際上,Consume()
也是這麼處理的:url
func (mh *MessageHandler) Consume(_, val string) error { var m map[string]interface{} // 反序列化從 kafka 中的消息 if err := jsoniter.Unmarshal([]byte(val), &m); err != nil { return err } // es 寫入index配置 index := mh.indexer.GetIndex(m) // filter 鏈式處理【由於沒有泛型,整個處理都是 `map進map出`】 for _, proc := range mh.filters { if m = proc(m); m == nil { return nil } } bs, err := jsoniter.Marshal(m) if err != nil { return err } // es 寫入 return mh.writer.Write(index, string(bs)) }
說完了數據處理,以及上下游的鏈接點。可是數據要從 kafka -> es
,數據流出這個動做從 kafka
角度看,應該是由開發者主動 pull data from kafka
。
那麼數據流是怎麼動起來?咱們回到主程序 https://github.com/tal-tech/g...
其實 啓動 整個流程中,其實就是一個組合模式:
func main() { // 解析命令行參數,啓動優雅退出 ... // service 組合模式 group := service.NewServiceGroup() defer group.Stop() for _, processor := range c.Clusters { // 鏈接es ... // filter processors 構建 ... // 準備es的寫入操做 {寫入的index, 寫入器writer} handle := handler.NewHandler(writer, indexer) handle.AddFilters(filters...) handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) // 按照配置啓動kafka,並將消費操做傳入,同時加入組合器 for _, k := range toKqConf(processor.Input.Kafka) { group.Add(kq.MustNewQueue(k, handle)) } } // 啓動這個組合器 group.Start() }
整個數據流,就和這個 group
組合器有關了。
group.Start() |- group.doStart() |- [service.Start() for service in group.services]
那麼說明加入 group
的 service
都是實現 Start()
。也就是說 kafka
端的啓動邏輯在 Start()
:
func (q *kafkaQueue) Start() { q.startConsumers() q.startProducers() q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() }
kafka
消費程序kafka
消費拉取端【可能會被名字迷惑,其實是從 kafka
拉取消息到 q.channel
】而咱們傳入 kafka
中的 handler
,上文說過實際上是 Consume
,而這個方法就是在 q.startConsumers()
中執行的:
q.startConsumers() |- [q.consumeOne(key, value) for msg in q.channel] |- q.handler.Consume(key, value)
這樣整個數據流就完全串起來了:
做爲 go-stash
第一篇文章,本篇從架構和設計上總體介紹 go-stash
,有關性能和爲何咱們要開發一個這樣的組件,咱們下篇文章逐漸揭曉。
https://github.com/tal-tech/g...
關於 go-zero
更多的設計和實現文章,能夠持續關注咱們。
https://github.com/tal-tech/g...
歡迎使用 go-zero 並 star 支持咱們!
關注『微服務實踐』公衆號並回復 進羣 獲取社區羣二維碼。
go-zero 系列文章見『微服務實踐』公衆號