open-falcon-agent源碼學習

最近學習falcon,看了源碼和極客學院的視頻解析,畫了調用結構、關係,對主要的代碼進行了註釋git

代碼地址:https://github.com/beyondskyw...github

標籤(空格分隔): falcon goshell


監控數據

  • 機器性能指標:cpu,mem,網卡,磁盤……json

  • 業務監控網絡

  • 開源軟件狀態:Nginx,Redis,MySQL數據結構

  • snmp採集網絡設備指標app

設計原理

  • 自發現採集值tcp

  • 不一樣類型數據採集分不一樣goroutine函數

  • 進程和端口經過用戶配置進行監控性能

配置文件

  • hostname和ip默認留空,agent自動探測

  • hbs和transfer都是配置其rpc地址

  • collector網卡採集前綴

  • ignore爲true時取消上報

組織結構

  • cron:間隔執行的代碼,即定時任務

  • funcs:信息採集

  • g:全局數據結構

  • http:簡單的dashboard的server,獲取單機監控指標數據

  • plugins:插件處理機制

  • public:靜態資源文件

心跳機制

  • 瞭解agent、plugin版本信息,方便升級

  • 獲取監聽的進程和端口

  • 獲取本機執行的插件列表

與HBS、Transfer交互

此處輸入圖片的描述

調用關係

此處輸入圖片的描述

代碼解讀

  • main入口

go cron.InitDataHistory()
// 上報本機狀態
cron.ReportAgentStatus()
// 同步插件
cron.SyncMinePlugins()
// 同步監控端口、路徑、進程和URL
cron.SyncBuiltinMetrics()
// 後門調試agent,容許執行shell指令的ip列表
cron.SyncTrustableIps()
// 開始數據次採集
cron.Collect()
// 啓動dashboard server
go http.Start()
  • ReportAgentStatus:彙報agent自己狀態

// 判斷hbs配置是否正常,正常則上報agent狀態
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
    // 根據配置的interval間隔上報信息
    go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second)
}

func reportAgentStatus(interval time.Duration) {
    for {
        // 獲取hostname, 出錯則錯誤賦值給hostname
        hostname, err := g.Hostname()
        if err != nil {
            hostname = fmt.Sprintf("error:%s", err.Error())
        }
        // 請求發送信息
        req := model.AgentReportRequest{
            Hostname:      hostname,
            IP:            g.IP(),
            AgentVersion:  g.VERSION,
            // 經過shell指令獲取plugin版本,可否go實現
            PluginVersion: g.GetCurrPluginVersion(),
        }

        var resp model.SimpleRpcResponse
        // 調用rpc接口
        err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
        if err != nil || resp.Code != 0 {
            log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
        }

        time.Sleep(interval)
    }
}
  • SyncMinePlugins:同步插件

func syncMinePlugins() {
    var (
        timestamp  int64 = -1
        pluginDirs []string
    )

    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

    for {
        time.Sleep(duration)

        hostname, err := g.Hostname()
        if err != nil {
            continue
        }

        req := model.AgentHeartbeatRequest{
            Hostname: hostname,
        }

        var resp model.AgentPluginsResponse
        // 調用rpc接口,返回plugin
        err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }
        // 保證時間順序正確
        if resp.Timestamp <= timestamp {
            continue
        }

        pluginDirs = resp.Plugins
        // 存放時間保證最新
        timestamp = resp.Timestamp

        if g.Config().Debug {
            log.Println(&resp)
        }
        // 無插件則清空plugin
        if len(pluginDirs) == 0 {
            plugins.ClearAllPlugins()
        }

        desiredAll := make(map[string]*plugins.Plugin)
        // 讀取全部plugin
        for _, p := range pluginDirs {
            underOneDir := plugins.ListPlugins(strings.Trim(p, "/"))
            for k, v := range underOneDir {
                desiredAll[k] = v
            }
        }
        // 中止不須要的插件,啓動增長的插件
        plugins.DelNoUsePlugins(desiredAll)
        plugins.AddNewPlugins(desiredAll)
    }
}
  • SyncBuiltinMetrics:同步內置metric,包括端口、目錄和進程信息

func syncBuiltinMetrics() {
    var timestamp int64 = -1
    var checksum string = "nil"

    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

    for {
        time.Sleep(duration)
        // 監控端口、目錄大小、進程
        var ports = []int64{}
        var paths = []string{}
        var procs = make(map[string]map[int]string)
        var urls = make(map[string]string)

        hostname, err := g.Hostname()
        if err != nil {
            continue
        }

        req := model.AgentHeartbeatRequest{
            Hostname: hostname,
            Checksum: checksum,
        }

        var resp model.BuiltinMetricResponse
        err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        if resp.Timestamp <= timestamp {
            continue
        }

        if resp.Checksum == checksum {
            continue
        }

        timestamp = resp.Timestamp
        checksum = resp.Checksum

        for _, metric := range resp.Metrics {

            if metric.Metric == g.URL_CHECK_HEALTH {
                arr := strings.Split(metric.Tags, ",")
                if len(arr) != 2 {
                    continue
                }
                url := strings.Split(arr[0], "=")
                if len(url) != 2 {
                    continue
                }
                stime := strings.Split(arr[1], "=")
                if len(stime) != 2 {
                    continue
                }
                if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
                    urls[url[1]] = stime[1]
                } else {
                    log.Println("metric ParseInt timeout failed:", err)
                }
            }
            // {metric: net.port.listen, tags: port=22}
            if metric.Metric == g.NET_PORT_LISTEN {
                arr := strings.Split(metric.Tags, "=")
                if len(arr) != 2 {
                    continue
                }

                if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
                    ports = append(ports, port)
                } else {
                    log.Println("metrics ParseInt failed:", err)
                }

                continue
            }
            // metric: du.bs tags: path=/home/works/logs
            // du -bs /home/works/logs
            if metric.Metric == g.DU_BS {
                arr := strings.Split(metric.Tags, "=")
                if len(arr) != 2 {
                    continue
                }

                paths = append(paths, strings.TrimSpace(arr[1]))
                continue
            }
            //mereic: proc.num tags: name=crond
            //或者metric: proc.num tags: cmdline=cfg.json
            if metric.Metric == g.PROC_NUM {
                arr := strings.Split(metric.Tags, ",")

                tmpMap := make(map[int]string)

                for i := 0; i < len(arr); i++ {
                    if strings.HasPrefix(arr[i], "name=") {
                        tmpMap[1] = strings.TrimSpace(arr[i][5:])
                    } else if strings.HasPrefix(arr[i], "cmdline=") {
                        tmpMap[2] = strings.TrimSpace(arr[i][8:])
                    }
                }

                procs[metric.Tags] = tmpMap
            }
        }

        g.SetReportUrls(urls)
        g.SetReportPorts(ports)
        g.SetReportProcs(procs)
        g.SetDuPaths(paths)

    }
}
  • SyncTrustableIps:同步可信IP列表
    請求獲取遠程訪問執行shell命令的IP白名單,在經過http/run.go調用shell命令是會判斷請求IP是否可信

func syncTrustableIps() {
    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

    for {
        time.Sleep(duration)

        var ips string
        err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)
        if err != nil {
            log.Println("ERROR: call Agent.TrustableIps fail", err)
            continue
        }
        // 設置到本地可信IP列表
        g.SetTrustableIps(ips)
    }
}
  • FuncsAndInterval:拆分不一樣的採集函數集,方便經過不一樣goroutine運行

// 間隔internal時間執行fs中的函數
type FuncsAndInterval struct {
    Fs       []func() []*model.MetricValue
    Interval int
}

var Mappers []FuncsAndInterval

// 根據調用指令類型和是否容易被掛起而分類(經過不一樣的goroutine去執行,避免相互之間的影響)
func BuildMappers() {
    interval := g.Config().Transfer.Interval
    Mappers = []FuncsAndInterval{
        FuncsAndInterval{
            Fs: []func() []*model.MetricValue{
                AgentMetrics,
                CpuMetrics,
                NetMetrics,
                KernelMetrics,
                LoadAvgMetrics,
                MemMetrics,
                DiskIOMetrics,
                IOStatsMetrics,
                NetstatMetrics,
                ProcMetrics,
                UdpMetrics,
            },
            Interval: interval,
        },
        // 容易出問題
        FuncsAndInterval{
            Fs: []func() []*model.MetricValue{
                DeviceMetrics,
            },
            Interval: interval,
        },
        // 調用相同指令
        FuncsAndInterval{
            Fs: []func() []*model.MetricValue{
                PortMetrics,
                SocketStatSummaryMetrics,
            },
            Interval: interval,
        },
        FuncsAndInterval{
            Fs: []func() []*model.MetricValue{
                DuMetrics,
            },
            Interval: interval,
        },
        FuncsAndInterval{
            Fs: []func() []*model.MetricValue{
                UrlMetrics,
            },
            Interval: interval,
        },
    }
}
  • Colleet:配置信息讀取,讀取Mapper中的FuncsAndInterval,根據func調用採集函數,採集全部信息(並不是先過濾採集項),從全部採集到的數據中過濾ignore的項,並上報到transfer。

func Collect() {
    // 配置信息判斷
    if !g.Config().Transfer.Enabled {
        return
    }

    if len(g.Config().Transfer.Addrs) == 0 {
        return
    }
    // 讀取mapper中的FuncsAndInterval集,並經過不一樣的goroutine運行
    for _, v := range funcs.Mappers {
        go collect(int64(v.Interval), v.Fs)
    }
}

// 間隔採集信息
func collect(sec int64, fns []func() []*model.MetricValue) {
    // 啓動斷續器,間隔執行
    t := time.NewTicker(time.Second * time.Duration(sec)).C
    for {
        <-t

        hostname, err := g.Hostname()
        if err != nil {
            continue
        }

        mvs := []*model.MetricValue{}
        // 讀取忽略metric名單
        ignoreMetrics := g.Config().IgnoreMetrics
        // 從funcs的list中取出每一個採集函數
        for _, fn := range fns {
            // 執行採集函數
            items := fn()
            if items == nil {
                continue
            }

            if len(items) == 0 {
                continue
            }
            // 讀取採集數據,根據忽略的metric忽略部分採集數據
            for _, mv := range items {
                if b, ok := ignoreMetrics[mv.Metric]; ok && b {
                    continue
                } else {
                    mvs = append(mvs, mv)
                }
            }
        }
        // 獲取上報時間
        now := time.Now().Unix()
        // 設置上報採集項的間隔、agent主機、上報時間
        for j := 0; j < len(mvs); j++ {
            mvs[j].Step = sec
            mvs[j].Endpoint = hostname
            mvs[j].Timestamp = now
        }
        // 調用transfer發送採集數據
        g.SendToTransfer(mvs)
    }
}
  • 採集信息結構

type MetricValue struct {
    Endpoint  string      // 主機名
    Metric    string      // 信息標識cpu.idle、mem.memtotal等
    Value     interface{} // 採集結果
    Step      int64       // 該項上報間隔
    Type      string      // GAUGE或COUNTER
    Tags      string      // 配置報警策略
    Timestamp int64       // 這次上報時間
}
  • 採集信息組成metricValue結構

func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue {
    mv := model.MetricValue{
        Metric: metric,
        Value:  val,
        Type:   dataType,
    }

    size := len(tags)

    if size > 0 {
        mv.Tags = strings.Join(tags, ",")
    }

    return &mv
}
// 原值類型
func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue {
    return NewMetricValue(metric, val, "GAUGE", tags...)
}

// 計數器類型
func CounterValue(metric string, val interface{}, tags ...string) *model.MetricValue {
    return NewMetricValue(metric, val, "COUNTER", tags...)
}
  • rpc組件

// 簡單封裝rpc.Cilent
type SingleConnRpcClient struct {
    sync.Mutex
    rpcClient *rpc.Client
    RpcServer string
    Timeout   time.Duration
}

// 關閉rpc
func (this *SingleConnRpcClient) close() {
    if this.rpcClient != nil {
        this.rpcClient.Close()
        this.rpcClient = nil
    }
}

// 保證rpc存在,爲空則從新建立, 若是server宕機, 死循環????
func (this *SingleConnRpcClient) insureConn() {
    if this.rpcClient != nil {
        return
    }

    var err error
    var retry int = 1

    for {
        if this.rpcClient != nil {
            return
        }
        // 根據timeout和server地址去鏈接rpc的server
        this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
        if err == nil {
            return
        }

        log.Printf("dial %s fail: %v", this.RpcServer, err)

        if retry > 6 {
            retry = 1
        }

        time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)

        retry++
    }
}

// rpc client調用hbs函數
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
    // 加鎖保證一個agent只與server有一個鏈接,保證性能
    this.Lock()
    defer this.Unlock()
    // 保證rpc鏈接可用
    this.insureConn()

    timeout := time.Duration(50 * time.Second)
    done := make(chan error)

    go func() {
        err := this.rpcClient.Call(method, args, reply)
        done <- err
    }()
    // 超時控制
    select {
    case <-time.After(timeout):
        log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
        this.close()
    case err := <-done:
        if err != nil {
            this.close()
            return err
        }
    }
    return nil
}
  • Transfer部件

// 定義transfer的rpcClient對應Map, transferClients讀寫鎖
var (
    TransferClientsLock *sync.RWMutex                   = new(sync.RWMutex)
    TransferClients     map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}
)

// 發送數據到隨機的transfer
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
    rand.Seed(time.Now().UnixNano())
    // 隨機transferClient發送數據,直到發送成功
    for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
        addr := Config().Transfer.Addrs[i]
        if _, ok := TransferClients[addr]; !ok {
            initTransferClient(addr)
        }
        if updateMetrics(addr, metrics, resp) {
            break
        }
    }
}

// 初始化addr對應的transferClient
func initTransferClient(addr string) {
    TransferClientsLock.Lock()
    defer TransferClientsLock.Unlock()
    TransferClients[addr] = &SingleConnRpcClient{
        RpcServer: addr,
        Timeout:   time.Duration(Config().Transfer.Timeout) * time.Millisecond,
    }
}

// 調用rpc接口發送metric
func updateMetrics(addr string, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
    TransferClientsLock.RLock()
    defer TransferClientsLock.RUnlock()
    err := TransferClients[addr].Call("Transfer.Update", metrics, resp)
    if err != nil {
        log.Println("call Transfer.Update fail", addr, err)
        return false
    }
    return true
}
  • 採集插件同步

// 插件信息: 路徑、修改時間、運行週期(來自plugin插件)
type Plugin struct {
    FilePath string
    MTime    int64
    Cycle    int
}

// 插件map和調度器map
var (
    Plugins              = make(map[string]*Plugin)
    PluginsWithScheduler = make(map[string]*PluginScheduler)
)

// 刪除不須要的plugin
func DelNoUsePlugins(newPlugins map[string]*Plugin) {
    for currKey, currPlugin := range Plugins {
        newPlugin, ok := newPlugins[currKey]
        if !ok || currPlugin.MTime != newPlugin.MTime {
            deletePlugin(currKey)
        }
    }
}

// 添加同步時增長的plugin
func AddNewPlugins(newPlugins map[string]*Plugin) {
    for fpath, newPlugin := range newPlugins {
        // 去除重複插件
        if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
            continue
        }
        // 爲新添加的插件新建調度器
        Plugins[fpath] = newPlugin
        sch := NewPluginScheduler(newPlugin)
        PluginsWithScheduler[fpath] = sch
        // 啓動plugin調度
        sch.Schedule()
    }
}

func ClearAllPlugins() {
    for k := range Plugins {
        deletePlugin(k)
    }
}

func deletePlugin(key string) {
    v, ok := PluginsWithScheduler[key]
    if ok {
        // 暫停調度plugin
        v.Stop()
        delete(PluginsWithScheduler, key)
    }
    delete(Plugins, key)
}
  • 插件調度策略

// 持續間隔執行plugin
type PluginScheduler struct {
    Ticker *time.Ticker
    Plugin *Plugin
    Quit   chan struct{}
}

// 根據plugin建立新的schedule
func NewPluginScheduler(p *Plugin) *PluginScheduler {
    scheduler := PluginScheduler{Plugin: p}
    scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second)
    scheduler.Quit = make(chan struct{})
    return &scheduler
}

// plugin調度,間隔執行PluginRun,除非收到quit消息
func (this *PluginScheduler) Schedule() {
    go func() {
        for {
            select {
            case <-this.Ticker.C:
                PluginRun(this.Plugin)
            case <-this.Quit:
                this.Ticker.Stop()
                return
            }
        }
    }()
}

// 中止plugin調度
func (this *PluginScheduler) Stop() {
    close(this.Quit)
}

// 執行插件,讀取插件運行返回數據並上報transfer
func PluginRun(plugin *Plugin) {

    timeout := plugin.Cycle*1000 - 500
    fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)

    if !file.IsExist(fpath) {
        log.Println("no such plugin:", fpath)
        return
    }

    debug := g.Config().Debug
    if debug {
        log.Println(fpath, "running...")
    }

    cmd := exec.Command(fpath)
    var stdout bytes.Buffer
    cmd.Stdout = &stdout
    var stderr bytes.Buffer
    cmd.Stderr = &stderr
    cmd.Start()

    err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)

    errStr := stderr.String()
    if errStr != "" {
        logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
        if _, err = file.WriteString(logFile, errStr); err != nil {
            log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
        }
    }

    if isTimeout {
        // has be killed
        if err == nil && debug {
            log.Println("[INFO] timeout and kill process", fpath, "successfully")
        }

        if err != nil {
            log.Println("[ERROR] kill process", fpath, "occur error:", err)
        }

        return
    }

    if err != nil {
        log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
        return
    }

    // exec successfully
    data := stdout.Bytes()
    if len(data) == 0 {
        if debug {
            log.Println("[DEBUG] stdout of", fpath, "is blank")
        }
        return
    }

    var metrics []*model.MetricValue
    err = json.Unmarshal(data, &metrics)
    if err != nil {
        log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
        return
    }

    g.SendToTransfer(metrics)
}
相關文章
相關標籤/搜索