極速精簡 Go 版 Logstash

前言

今天來介紹 go-zero 生態的另外一個組件 go-stash。這是一個 logstash 的 Go 語言替代版,咱們用 go-stash 相比原先的 logstash 節省了2/3的服務器資源。若是你在用 logstash,不妨試試,也能夠看看基於 go-zero 實現這樣的工具是多麼的容易,這個工具做者僅用了兩天時間。git

總體架構

先從它的配置中,咱們來看看設計架構。github

Clusters:
  - Input:
      Kafka:
        # Kafka 配置 --> 聯動 go-queue
    Filters:
        # filter action
      - Action: drop            
      - Action: remove_field
      - Action: transfer      
    Output:
      ElasticSearch:
        # es 配置 {host, index}

看配置名:kafka 是數據輸出端,es 是數據輸入端,filter 抽象了數據處理過程。json

對,整個 go-stash 就是如 config 配置中顯示的,所見即所得。服務器

image.png

啓動

stash.go 的啓動流程大體分爲幾個部分。由於能夠配置多個 cluster,那從一個 cluster 分析:微信

  1. 創建與 es 的鏈接【傳入 es 配置】
  2. 構建 filter processorses 前置處理器,作數據過濾以及處理,能夠設置多個】
  3. 完善對 es 中 索引配置,啓動 handle ,同時將 filter 加入handle【處理輸入輸出】
  4. 鏈接下游的 kafka,將上面建立的 handle 傳入,完成 kafkaes 之間的數據消費和數據寫入

MessageHandler

在上面架構圖中,中間的 filter 只是從 config 中看到,其實更詳細是 MessageHandler 的一部分,作數據過濾和轉換,下面來講說這塊。架構

如下代碼: https://github.com/tal-tech/g...
type MessageHandler struct {
    writer  *es.Writer
    indexer *es.Index
    filters []filter.FilterFunc
}

這個就對應上面說的,filter 只是其中一部分,在結構上 MessageHandler 是對接下游 es ,可是沒有看到對 kafka 的操做。微服務

別急,從接口設計上 MessageHandler 實現了 go-queueConsumeHandler 接口。工具

這裏,上下游就串聯了:性能

  1. MessageHandler 接管了 es 的操做,負責數據處理到數據寫入
  2. 對上實現了 kafkaConsume 操做。這樣在消費過程當中執行 handler 的操做,從而寫入 es

實際上,Consume() 也是這麼處理的:url

func (mh *MessageHandler) Consume(_, val string) error {
    var m map[string]interface{}
  // 反序列化從 kafka 中的消息
    if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
        return err
    }
    // es 寫入index配置
    index := mh.indexer.GetIndex(m)
  // filter 鏈式處理【由於沒有泛型,整個處理都是 `map進map出`】
    for _, proc := range mh.filters {
        if m = proc(m); m == nil {
            return nil
        }
    }
    bs, err := jsoniter.Marshal(m)
    if err != nil {
        return err
    }
    // es 寫入
    return mh.writer.Write(index, string(bs))
}

數據流

說完了數據處理,以及上下游的鏈接點。可是數據要從 kafka -> es ,數據流出這個動做從 kafka 角度看,應該是由開發者主動 pull data from kafka

那麼數據流是怎麼動起來?咱們回到主程序 https://github.com/tal-tech/g...

其實 啓動 整個流程中,其實就是一個組合模式:

func main() {
    // 解析命令行參數,啓動優雅退出
    ...
  // service 組合模式
    group := service.NewServiceGroup()
    defer group.Stop()

    for _, processor := range c.Clusters {
        // 鏈接es
    ...
        // filter processors 構建
    ...
    // 準備es的寫入操做 {寫入的index, 寫入器writer}
        handle := handler.NewHandler(writer, indexer)
        handle.AddFilters(filters...)
        handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
    // 按照配置啓動kafka,並將消費操做傳入,同時加入組合器
        for _, k := range toKqConf(processor.Input.Kafka) {
            group.Add(kq.MustNewQueue(k, handle))
        }
    }
    // 啓動這個組合器
    group.Start()
}

整個數據流,就和這個 group 組合器有關了。

group.Start()
    |- group.doStart()
        |- [service.Start() for service in group.services]

那麼說明加入 groupservice 都是實現 Start()。也就是說 kafka 端的啓動邏輯在 Start()

func (q *kafkaQueue) Start() {
    q.startConsumers()
    q.startProducers()

    q.producerRoutines.Wait()
    close(q.channel)
    q.consumerRoutines.Wait()
}
  1. 啓動 kafka 消費程序
  2. 啓動 kafka 消費拉取端【可能會被名字迷惑,其實是從 kafka 拉取消息到 q.channel
  3. 消費程序終止,收尾工做

而咱們傳入 kafka 中的 handler,上文說過實際上是 Consume,而這個方法就是在 q.startConsumers() 中執行的:

q.startConsumers()
    |- [q.consumeOne(key, value) for msg in q.channel]
        |- q.handler.Consume(key, value)

這樣整個數據流就完全串起來了:

image.png

總結

做爲 go-stash 第一篇文章,本篇從架構和設計上總體介紹 go-stash ,有關性能和爲何咱們要開發一個這樣的組件,咱們下篇文章逐漸揭曉。

https://github.com/tal-tech/g...

關於 go-zero 更多的設計和實現文章,能夠持續關注咱們。

https://github.com/tal-tech/g...

歡迎使用 go-zero 並 star 支持咱們!

微信交流羣

關注『微服務實踐』公衆號並回復 進羣 獲取社區羣二維碼。

go-zero 系列文章見『微服務實踐』公衆號
相關文章
相關標籤/搜索