海量日誌實時收集系統架構設計與go語言實現

日誌收集系統應該說是到達必定規模的公司的標配了,一個能知足業務需求、運維成本低、穩定的日誌收集系統對於運維的同窗和日誌使用方的同窗都是很是nice的。然而這時理想中的日誌收集系統,現實每每不是這樣的...本篇的主要內容是:首先吐槽一下公司之前的日誌收集和上傳;介紹新的實時日誌收集系統架構;用go語言實現。澄清一下,並非用go語言實現所有,好比用到卡夫卡確定不能重寫一個kafka吧...python

logagent全部代碼已上傳到github:https://github.com/zingp/logagentandroid

1 老系統吐槽

我司之前的日誌收集系統概述以下:nginx

日誌收集的頻率有每小時收集一次、每5分鐘收集一次、實時收集三種。大部分狀況是每小時收集上傳一次。
(1)每5分鐘上傳一次和每小時上傳一次的狀況是這樣的:
每臺機器上都須要部署一個日誌收集agengt,部署一個日誌上傳agent,每臺機器都須要掛載hadoop集羣的客戶端。
日誌收集agent負責切割日誌,上傳agent整點的時候啓動利用hadoop客戶端,將切割好的前1小時或前5分鐘日誌打包上傳到hadoop集羣。
(2)實時傳輸的狀況是這樣的
每臺機器上部署另外一個agent,該agent實時收集日誌傳輸到kafka。git

看到這裏你可能都看不下去了,這麼複雜臃腫費勁的日誌收集系統是怎麼設計出來的?額...先辯解一下,這套系統有4年以上的歷史了,當時的解決方案確實有限。辯解完以後仍是得吐槽一下系統存在的問題:
(1)首先部署在每臺機器上的agent沒有作統一的配置入口,須要根據不一樣業務到不一樣機器上配置,運維成本太大;十臺機器也就罷了,問題是如今有幾萬臺機器,幾千個服務。
(2)最無語的是針對不一樣的hadoop集羣,須要掛載多個hadoop客戶端,也就是存在一臺機器上部署幾個hadoop客戶端的狀況。運維成本太大...
(3)沒作限流,整點的時候傳輸壓力變大。某些機器有不少日誌,一到整點壓力就上來了。無圖無真相,咱們來看下:github

CPU:看綠色的線條web

 

負載:json

網卡:架構

這組機器比較典型(這就是前文說的有多個hadoop客戶端的狀況),截圖是凌晨至上午的時間段,還未到真正的高峯期。不過整體上可看出整點的壓力是明顯比非正點高不少的,已經到了不能忍的地步。併發

(4)省略n條吐槽...app

2 新系統架構

首先日誌收集大可沒必要在客戶端分爲1小時、5分鐘、實時這幾種頻率,只須要實時一種就能知足前面三種需求。

其次能夠砍掉在機器上掛載hadoop客戶端,放在其餘地方作日誌上傳hadoop流程。

第三,作統一的配置管理系統,提供友好的web界面,用戶只須要在web界面上配置一組service須要收集的日誌,即可通知該組service下的全部機器上的日誌收集agent。

第四,流量削峯。應該說實時收集能夠避免舊系統整點負載過大狀況,但依舊應該作限流功能,防止高峯期agent過分消耗資源影響業務。

第五,日誌補傳...

實際上公司有的部門在用flume作日誌收集,但以爲過重。通過一段時間調研和結合自身業務特色,利用開源軟件在適當作些開發會比較好。go應該擅長作這個事,並且方便運維。好了,附上架構圖。

將用go實現logagent,Web,transfer這個三個部分。

logagent主要負責按照配置實時收集日誌發送到kafka,此外還需watch etcd中的配置,如改變,須要熱更新。

web部分主要用於更新etcd中的配置,etcd已提供接口,咱們只須要集成到資源管理系統或CMDB系統的管理界面中去便可。

transfer 作的是消費kafka隊列中的日誌,發送到es/hadoop/storm中去。

3 實現logagent

3.1 配置設計

首先思考下logagent的配置文件內容:

etcd_addr = 10.134.123.183:2379         # etcd 地址
etcd_timeout = 5                        # 鏈接etcd超時時間
etcd_watch_key = /logagent/%s/logconfig    # etcd key 格式

kafka_addr = 10.134.123.183:9092           # 卡夫卡地址

thread_num = 4                             # 線程數
log = ./log/logagent.log                   # agent的日誌文件
level = debug                              # 日誌級別

# 監聽哪些日誌,日誌限流大小,發送到卡夫卡的哪一個topic  這個部分能夠放到etcd中去。 

如上所說,監聽哪些日誌,日誌限流大小,發送到卡夫卡的哪一個topic 這個部分能夠放到etcd中去。etcd中存儲的value格式設計以下:

`[
	{
	"service":"test_service",        
	"log_path": "/search/nginx/logs/ping-android.shouji.sogou.com_access_log",   "topic": "nginx_log",
	"send_rate": 1000
	},
	{
	"service":"srv.android.shouji.sogou.com",
	"log_path": "/search/nginx/logs/srv.android.shouji.sogou.com_access_log","topic": "nginx_log",
	"send_rate": 2000
	}
]`

    - "service":"服務名稱",        
    - "log_path": "應該監聽的日誌文件",   
    - "topic": "kfk topic",
    - "send_rate": "日誌條數限制"  

 其實能夠將更多的配置放入etcd中,根據自身業務狀況可自行定義,本次就作如此設計,接下來能夠寫解析配置文件的代碼了。

config.go

package main

import (
	"fmt"
	"github.com/astaxie/beego/config"
)

type AppConfig struct {
	EtcdAddr     string
	EtcdTimeOut  int
	EtcdWatchKey string

	KafkaAddr string

	ThreadNum int
	LogFile   string
	LogLevel  string
}

var appConf = &AppConfig{}

func initConfig(file string) (err error) {
	conf, err := config.NewConfig("ini", file)
	if err != nil {
		fmt.Println("new config failed, err:", err)
		return
	}
	appConf.EtcdAddr = conf.String("etcd_addr")
	appConf.EtcdTimeOut = conf.DefaultInt("etcd_timeout", 5)
	appConf.EtcdWatchKey = conf.String("etcd_watch_key")

	appConf.KafkaAddr = conf.String("kafka_addr")

	appConf.ThreadNum = conf.DefaultInt("thread_num", 4)
	appConf.LogFile = conf.String("log")
	appConf.LogLevel = conf.String("level")
	return
} 

代碼主要定義了一個AppConf結構體,而後讀取配置文件,存放到結構體中。

此外,還有部分配置在etcd中,須要作兩件事,第一次啓動程序時將配置從etcd拉取下來;而後啓動一個協程去watch etcd中的配置是否更改,若是更改須要拉取並更新到內存中。代碼以下:

etcd.go:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/astaxie/beego/logs"
	client "github.com/coreos/etcd/clientv3"
)

var (
	confChan  = make(chan string, 10)
	cli       *client.Client
	waitGroup sync.WaitGroup
)

func initEtcd(addr []string, keyFormat string, timeout time.Duration) (err error) {
	// init a global var cli and can not close
	cli, err = client.New(client.Config{
		Endpoints:   addr,
		DialTimeout: timeout,
	})
	if err != nil {
		fmt.Println("connect etcd error:", err)
		return
	}
	logs.Debug("init etcd success")
	// defer cli.Close()   //can not close

	var etcdKeys []string
	ips, err := getLocalIP()
	if err != nil {
		fmt.Println("get local ip error:", err)
		return
	}
	for _, ip := range ips {
		key := fmt.Sprintf(keyFormat, ip)
		etcdKeys = append(etcdKeys, key)
	}

	// first, pull conf from etcd
	for _, key := range etcdKeys {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		resp, err := cli.Get(ctx, key)
		cancel()
		if err != nil {
			fmt.Println("get etcd key failed, error:", err)
			continue
		}

		for _, ev := range resp.Kvs {
			// return result is not string
			confChan <- string(ev.Value)
			fmt.Printf("etcd key = %s , etcd value = %s", ev.Key, ev.Value)
		}
	}

	waitGroup.Add(1)
	// second, start a goroutine to watch etcd
	go etcdWatch(etcdKeys)
	return
}

// watch etcd
func etcdWatch(keys []string) {
	defer waitGroup.Done()

	var watchChans []client.WatchChan
	for _, key := range keys {
		rch := cli.Watch(context.Background(), key)
		watchChans = append(watchChans, rch)
	}

	for {
		for _, watchC := range watchChans {
			select {
			case wresp := <-watchC:
				for _, ev := range wresp.Events {
					confChan <- string(ev.Kv.Value)
					logs.Debug("etcd key = %s , etcd value = %s", ev.Kv.Key, ev.Kv.Value)
				}
			default:
			}
		}
		time.Sleep(time.Second)
	}
}

//GetEtcdConfChan is func get etcd conf add to chan
func GetEtcdConfChan() chan string {
	return confChan
}  

 其中,有一個比較個性化的設計,就是一臺主機對應的etcd 中的key咱們設置成/logagent/本機ip/logconfig的格式,所以還須要一個獲取本機IP的功能,注意一臺機器可能存在多個IP。

ip.go:

package main

import (
	"fmt"
	"net"
)

// var a slice for ip addr
var ipArray []string

func getLocalIP() (ips []string, err error) {
	ifaces, err := net.Interfaces()
	if err != nil {
		fmt.Println("get ip interfaces error:", err)
		return
	}

	for _, i := range ifaces {
		addrs, errRet := i.Addrs()
		if errRet != nil {
			continue
		}

		for _, addr := range addrs {
			var ip net.IP
			switch v := addr.(type) {
			case *net.IPNet:
				ip = v.IP
				if ip.IsGlobalUnicast() {
					ips = append(ips, ip.String())
				}
			}
		}
	}
	return
}

3.2 初始化kafka

初始化kafka很簡單,就是建立kafka實例,提供發送日誌功能。只不過發送是併發的。

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

var kafkaSend = &KafkaSend{}

type Message struct {
	line  string
	topic string
}

type KafkaSend struct {
	client   sarama.SyncProducer
	lineChan chan *Message
}

func initKafka(kafkaAddr string, threadNum int) (err error) {
	kafkaSend, err = NewKafkaSend(kafkaAddr, threadNum)
	return
}

// NewKafkaSend is 
func NewKafkaSend(kafkaAddr string, threadNum int) (kafka *KafkaSend, err error) {
	kafka = &KafkaSend{
		lineChan: make(chan *Message, 10000),
	}

	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // wait kafka ack
	config.Producer.Partitioner = sarama.NewRandomPartitioner // random partition
	config.Producer.Return.Successes = true

	client, err := sarama.NewSyncProducer([]string{kafkaAddr}, config)
	if err != nil {
		logs.Error("init kafka client err: %v", err)
		return
	}
	kafka.client = client

	for i := 0; i < threadNum; i++ {
		fmt.Println("start to send kfk")
		waitGroup.Add(1)
		go kafka.sendMsgToKfk()
	}
	return
}

func (k *KafkaSend) sendMsgToKfk() {
	defer waitGroup.Done()

	for v := range k.lineChan {
		msg := &sarama.ProducerMessage{}
		msg.Topic = v.topic
		msg.Value = sarama.StringEncoder(v.line)

		_, _, err := k.client.SendMessage(msg)
		if err != nil {
			logs.Error("send massage to kafka error: %v", err)
			return
		}
	}
}

func (k *KafkaSend) addMessage(line string, topic string) (err error) {
	k.lineChan <- &Message{line: line, topic: topic}
	return
}

3.3 實時讀取日誌,發送到kafka

用到第三方包:"github.com/hpcloud/tail"。將每一個監聽的日誌,都抽象成一個對象。
package main

import (
	"encoding/json"
	"fmt"
	"strings"
	"sync"

	"github.com/astaxie/beego/logs"
	"github.com/hpcloud/tail"
)

// TailObj is TailMgr's instance
type TailObj struct {
	tail     *tail.Tail
	offset   int64
	logConf  LogConfig
	secLimit *SecondLimit
	exitChan chan bool
}

var tailMgr *TailMgr

//TailMgr to manage tailObj
type TailMgr struct {
	tailObjMap map[string]*TailObj
	lock       sync.Mutex
}

// NewTailMgr init TailMgr obj
func NewTailMgr() *TailMgr {
	return &TailMgr{
		tailObjMap: make(map[string]*TailObj, 16),
	}
}

//AddLogFile to Add tail obj
func (t *TailMgr) AddLogFile(conf LogConfig) (err error) {
	t.lock.Lock()
	defer t.lock.Unlock()

	_, ok := t.tailObjMap[conf.LogPath]
	if ok {
		err = fmt.Errorf("duplicate filename:%s", conf.LogPath)
		return
	}

	tail, err := tail.TailFile(conf.LogPath, tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // read to tail
		MustExist: false,  //file does not exist, it does not return an error 
		Poll:      true,
	})
	if err != nil {
		fmt.Println("tail file err:", err)
		return
	}

	tailObj := &TailObj{
		tail:     tail,
		offset:   0,
		logConf:  conf,
		secLimit: NewSecondLimit(int32(conf.SendRate)),
		exitChan: make(chan bool, 1),
	}
	t.tailObjMap[conf.LogPath] = tailObj

	waitGroup.Add(1)
	go tailObj.readLog()
	return
}

func (t *TailMgr) reloadConfig(logConfArr []LogConfig) (err error) {
	for _, conf := range logConfArr {
		tailObj, ok := t.tailObjMap[conf.LogPath]
		if !ok {
			err = t.AddLogFile(conf)
			if err != nil {
				logs.Error("add log file failed:%v", err)
				continue
			}
			continue
		}
		tailObj.logConf = conf
		tailObj.secLimit.limit = int32(conf.SendRate)
		t.tailObjMap[conf.LogPath] = tailObj
	}

	for key, tailObj := range t.tailObjMap {
		var found = false
		for _, newValue := range logConfArr {
			if key == newValue.LogPath {
				found = true
				break
			}
		}
		if found == false {
			logs.Warn("log path :%s is remove", key)
			tailObj.exitChan <- true
			delete(t.tailObjMap, key)
		}
	}
	return
}

// Process hava two func get new log conf and reload conf
func (t *TailMgr) Process() {
	for conf := range GetEtcdConfChan() {
		logs.Debug("log conf: %v", conf)

		var logConfArr []LogConfig
		err := json.Unmarshal([]byte(conf), &logConfArr)
		if err != nil {
			logs.Error("unmarshal failed, err: %v conf :%s", err, conf)
			continue
		}

		err = t.reloadConfig(logConfArr)
		if err != nil {
			logs.Error("reload config from etcd failed: %v", err)
			continue
		}
		logs.Debug("reload config from etcd success")
	}
}

func (t *TailObj) readLog() {

	for line := range t.tail.Lines {
		if line.Err != nil {
			logs.Error("read line error:%v ", line.Err)
			continue
		}

		lineStr := strings.TrimSpace(line.Text)
		if len(lineStr) == 0 || lineStr[0] == '\n' {
			continue
		}

		kafkaSend.addMessage(line.Text, t.logConf.Topic)
		t.secLimit.Add(1)
		t.secLimit.Wait()

		select {
		case <-t.exitChan:
			logs.Warn("tail obj is exited: config:", t.logConf)
			return
		default:
		}
	}
	waitGroup.Done()
}

func runServer() {
	tailMgr = NewTailMgr()
	tailMgr.Process()
	waitGroup.Wait()
} 

此處設計了一個限流功能,邏輯大概以下:設置閾值A,如閾值爲1000條,若是這秒鐘已經發送1000條,那麼這一秒剩下的時間就sleep。limit.go代碼以下:

package main

import (
	"sync/atomic"
	"time"

	"github.com/astaxie/beego/logs"
)
// SecondLimit to limit num in one second
type SecondLimit struct {
	unixSecond int64
	curCount   int32
	limit      int32
}

// NewSecondLimit to init a SecondLimit obj
func NewSecondLimit(limit int32) *SecondLimit {
	secLimit := &SecondLimit{
		unixSecond: time.Now().Unix(),
		curCount:   0,
		limit:      limit,
	}

	return secLimit
}

// Add is func to 
func (s *SecondLimit) Add(count int) {
	sec := time.Now().Unix()
	if sec == s.unixSecond {
		atomic.AddInt32(&s.curCount, int32(count))
		return
	}

	atomic.StoreInt64(&s.unixSecond, sec)
	atomic.StoreInt32(&s.curCount, int32(count))
}

// Wait to limit num
func (s *SecondLimit) Wait() bool {
	for {
		sec := time.Now().Unix()
		if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount >= s.limit {
			time.Sleep(time.Millisecond)
			logs.Debug("limit is runing, limit: %d s.curCount:%d", s.limit, s.curCount)
			continue
		}

		if sec != atomic.LoadInt64(&s.unixSecond) {
			atomic.StoreInt64(&s.unixSecond, sec)
			atomic.StoreInt32(&s.curCount, 0)
		}
		logs.Debug("limit is exited")
		return false
	}
}

此外,寫日誌的代碼非主要代碼,這裏就不介紹了。全部代碼均上傳到github上,若有興趣可前去clone,地址已經在文章開頭處給出。

transfer將在下一篇文章中介紹。文中涉及kafka,etcd等搭建,可參考官網搭建單機版用於測試。

個人博客即將搬運同步至騰訊雲+社區,邀請你們一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=18j46a3goe9gk

相關文章
相關標籤/搜索