最近學習falcon,看了源碼和極客學院的視頻解析,畫了調用結構、關係,對主要的代碼進行了註釋git
代碼地址:https://github.com/beyondskyw...github
標籤(空格分隔): falcon goshell
機器性能指標:cpu,mem,網卡,磁盤……json
業務監控網絡
開源軟件狀態:Nginx,Redis,MySQL數據結構
snmp採集網絡設備指標app
自發現採集值tcp
不一樣類型數據採集分不一樣goroutine函數
進程和端口經過用戶配置進行監控性能
hostname和ip默認留空,agent自動探測
hbs和transfer都是配置其rpc地址
collector網卡採集前綴
ignore爲true時取消上報
cron:間隔執行的代碼,即定時任務
funcs:信息採集
g:全局數據結構
http:簡單的dashboard的server,獲取單機監控指標數據
plugins:插件處理機制
public:靜態資源文件
瞭解agent、plugin版本信息,方便升級
獲取監聽的進程和端口
獲取本機執行的插件列表
main入口
go cron.InitDataHistory() // 上報本機狀態 cron.ReportAgentStatus() // 同步插件 cron.SyncMinePlugins() // 同步監控端口、路徑、進程和URL cron.SyncBuiltinMetrics() // 後門調試agent,容許執行shell指令的ip列表 cron.SyncTrustableIps() // 開始數據次採集 cron.Collect() // 啓動dashboard server go http.Start()
ReportAgentStatus:彙報agent自己狀態
// 判斷hbs配置是否正常,正常則上報agent狀態 if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { // 根據配置的interval間隔上報信息 go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second) } func reportAgentStatus(interval time.Duration) { for { // 獲取hostname, 出錯則錯誤賦值給hostname hostname, err := g.Hostname() if err != nil { hostname = fmt.Sprintf("error:%s", err.Error()) } // 請求發送信息 req := model.AgentReportRequest{ Hostname: hostname, IP: g.IP(), AgentVersion: g.VERSION, // 經過shell指令獲取plugin版本,可否go實現 PluginVersion: g.GetCurrPluginVersion(), } var resp model.SimpleRpcResponse // 調用rpc接口 err = g.HbsClient.Call("Agent.ReportStatus", req, &resp) if err != nil || resp.Code != 0 { log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp) } time.Sleep(interval) } }
SyncMinePlugins:同步插件
func syncMinePlugins() { var ( timestamp int64 = -1 pluginDirs []string ) duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) hostname, err := g.Hostname() if err != nil { continue } req := model.AgentHeartbeatRequest{ Hostname: hostname, } var resp model.AgentPluginsResponse // 調用rpc接口,返回plugin err = g.HbsClient.Call("Agent.MinePlugins", req, &resp) if err != nil { log.Println("ERROR:", err) continue } // 保證時間順序正確 if resp.Timestamp <= timestamp { continue } pluginDirs = resp.Plugins // 存放時間保證最新 timestamp = resp.Timestamp if g.Config().Debug { log.Println(&resp) } // 無插件則清空plugin if len(pluginDirs) == 0 { plugins.ClearAllPlugins() } desiredAll := make(map[string]*plugins.Plugin) // 讀取全部plugin for _, p := range pluginDirs { underOneDir := plugins.ListPlugins(strings.Trim(p, "/")) for k, v := range underOneDir { desiredAll[k] = v } } // 中止不須要的插件,啓動增長的插件 plugins.DelNoUsePlugins(desiredAll) plugins.AddNewPlugins(desiredAll) } }
SyncBuiltinMetrics:同步內置metric,包括端口、目錄和進程信息
func syncBuiltinMetrics() { var timestamp int64 = -1 var checksum string = "nil" duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) // 監控端口、目錄大小、進程 var ports = []int64{} var paths = []string{} var procs = make(map[string]map[int]string) var urls = make(map[string]string) hostname, err := g.Hostname() if err != nil { continue } req := model.AgentHeartbeatRequest{ Hostname: hostname, Checksum: checksum, } var resp model.BuiltinMetricResponse err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp) if err != nil { log.Println("ERROR:", err) continue } if resp.Timestamp <= timestamp { continue } if resp.Checksum == checksum { continue } timestamp = resp.Timestamp checksum = resp.Checksum for _, metric := range resp.Metrics { if metric.Metric == g.URL_CHECK_HEALTH { arr := strings.Split(metric.Tags, ",") if len(arr) != 2 { continue } url := strings.Split(arr[0], "=") if len(url) != 2 { continue } stime := strings.Split(arr[1], "=") if len(stime) != 2 { continue } if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil { urls[url[1]] = stime[1] } else { log.Println("metric ParseInt timeout failed:", err) } } // {metric: net.port.listen, tags: port=22} if metric.Metric == g.NET_PORT_LISTEN { arr := strings.Split(metric.Tags, "=") if len(arr) != 2 { continue } if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil { ports = append(ports, port) } else { log.Println("metrics ParseInt failed:", err) } continue } // metric: du.bs tags: path=/home/works/logs // du -bs /home/works/logs if metric.Metric == g.DU_BS { arr := strings.Split(metric.Tags, "=") if len(arr) != 2 { continue } paths = append(paths, strings.TrimSpace(arr[1])) continue } //mereic: proc.num tags: name=crond //或者metric: proc.num tags: cmdline=cfg.json if metric.Metric == g.PROC_NUM { arr := strings.Split(metric.Tags, ",") tmpMap := make(map[int]string) for i := 0; i < len(arr); i++ { if strings.HasPrefix(arr[i], "name=") { tmpMap[1] = strings.TrimSpace(arr[i][5:]) } else if strings.HasPrefix(arr[i], "cmdline=") { tmpMap[2] = strings.TrimSpace(arr[i][8:]) } } procs[metric.Tags] = tmpMap } } g.SetReportUrls(urls) g.SetReportPorts(ports) g.SetReportProcs(procs) g.SetDuPaths(paths) } }
SyncTrustableIps:同步可信IP列表
請求獲取遠程訪問執行shell命令的IP白名單,在經過http/run.go調用shell命令是會判斷請求IP是否可信
func syncTrustableIps() { duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) var ips string err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips) if err != nil { log.Println("ERROR: call Agent.TrustableIps fail", err) continue } // 設置到本地可信IP列表 g.SetTrustableIps(ips) } }
FuncsAndInterval:拆分不一樣的採集函數集,方便經過不一樣goroutine運行
// 間隔internal時間執行fs中的函數 type FuncsAndInterval struct { Fs []func() []*model.MetricValue Interval int } var Mappers []FuncsAndInterval // 根據調用指令類型和是否容易被掛起而分類(經過不一樣的goroutine去執行,避免相互之間的影響) func BuildMappers() { interval := g.Config().Transfer.Interval Mappers = []FuncsAndInterval{ FuncsAndInterval{ Fs: []func() []*model.MetricValue{ AgentMetrics, CpuMetrics, NetMetrics, KernelMetrics, LoadAvgMetrics, MemMetrics, DiskIOMetrics, IOStatsMetrics, NetstatMetrics, ProcMetrics, UdpMetrics, }, Interval: interval, }, // 容易出問題 FuncsAndInterval{ Fs: []func() []*model.MetricValue{ DeviceMetrics, }, Interval: interval, }, // 調用相同指令 FuncsAndInterval{ Fs: []func() []*model.MetricValue{ PortMetrics, SocketStatSummaryMetrics, }, Interval: interval, }, FuncsAndInterval{ Fs: []func() []*model.MetricValue{ DuMetrics, }, Interval: interval, }, FuncsAndInterval{ Fs: []func() []*model.MetricValue{ UrlMetrics, }, Interval: interval, }, } }
Colleet:配置信息讀取,讀取Mapper中的FuncsAndInterval,根據func調用採集函數,採集全部信息(並不是先過濾採集項),從全部採集到的數據中過濾ignore的項,並上報到transfer。
func Collect() { // 配置信息判斷 if !g.Config().Transfer.Enabled { return } if len(g.Config().Transfer.Addrs) == 0 { return } // 讀取mapper中的FuncsAndInterval集,並經過不一樣的goroutine運行 for _, v := range funcs.Mappers { go collect(int64(v.Interval), v.Fs) } } // 間隔採集信息 func collect(sec int64, fns []func() []*model.MetricValue) { // 啓動斷續器,間隔執行 t := time.NewTicker(time.Second * time.Duration(sec)).C for { <-t hostname, err := g.Hostname() if err != nil { continue } mvs := []*model.MetricValue{} // 讀取忽略metric名單 ignoreMetrics := g.Config().IgnoreMetrics // 從funcs的list中取出每一個採集函數 for _, fn := range fns { // 執行採集函數 items := fn() if items == nil { continue } if len(items) == 0 { continue } // 讀取採集數據,根據忽略的metric忽略部分採集數據 for _, mv := range items { if b, ok := ignoreMetrics[mv.Metric]; ok && b { continue } else { mvs = append(mvs, mv) } } } // 獲取上報時間 now := time.Now().Unix() // 設置上報採集項的間隔、agent主機、上報時間 for j := 0; j < len(mvs); j++ { mvs[j].Step = sec mvs[j].Endpoint = hostname mvs[j].Timestamp = now } // 調用transfer發送採集數據 g.SendToTransfer(mvs) } }
採集信息結構
type MetricValue struct { Endpoint string // 主機名 Metric string // 信息標識cpu.idle、mem.memtotal等 Value interface{} // 採集結果 Step int64 // 該項上報間隔 Type string // GAUGE或COUNTER Tags string // 配置報警策略 Timestamp int64 // 這次上報時間 }
採集信息組成metricValue結構
func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue { mv := model.MetricValue{ Metric: metric, Value: val, Type: dataType, } size := len(tags) if size > 0 { mv.Tags = strings.Join(tags, ",") } return &mv } // 原值類型 func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue { return NewMetricValue(metric, val, "GAUGE", tags...) } // 計數器類型 func CounterValue(metric string, val interface{}, tags ...string) *model.MetricValue { return NewMetricValue(metric, val, "COUNTER", tags...) }
rpc組件
// 簡單封裝rpc.Cilent type SingleConnRpcClient struct { sync.Mutex rpcClient *rpc.Client RpcServer string Timeout time.Duration } // 關閉rpc func (this *SingleConnRpcClient) close() { if this.rpcClient != nil { this.rpcClient.Close() this.rpcClient = nil } } // 保證rpc存在,爲空則從新建立, 若是server宕機, 死循環???? func (this *SingleConnRpcClient) insureConn() { if this.rpcClient != nil { return } var err error var retry int = 1 for { if this.rpcClient != nil { return } // 根據timeout和server地址去鏈接rpc的server this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout) if err == nil { return } log.Printf("dial %s fail: %v", this.RpcServer, err) if retry > 6 { retry = 1 } time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second) retry++ } } // rpc client調用hbs函數 func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error { // 加鎖保證一個agent只與server有一個鏈接,保證性能 this.Lock() defer this.Unlock() // 保證rpc鏈接可用 this.insureConn() timeout := time.Duration(50 * time.Second) done := make(chan error) go func() { err := this.rpcClient.Call(method, args, reply) done <- err }() // 超時控制 select { case <-time.After(timeout): log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer) this.close() case err := <-done: if err != nil { this.close() return err } } return nil }
Transfer部件
// 定義transfer的rpcClient對應Map, transferClients讀寫鎖 var ( TransferClientsLock *sync.RWMutex = new(sync.RWMutex) TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{} ) // 發送數據到隨機的transfer func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) { rand.Seed(time.Now().UnixNano()) // 隨機transferClient發送數據,直到發送成功 for _, i := range rand.Perm(len(Config().Transfer.Addrs)) { addr := Config().Transfer.Addrs[i] if _, ok := TransferClients[addr]; !ok { initTransferClient(addr) } if updateMetrics(addr, metrics, resp) { break } } } // 初始化addr對應的transferClient func initTransferClient(addr string) { TransferClientsLock.Lock() defer TransferClientsLock.Unlock() TransferClients[addr] = &SingleConnRpcClient{ RpcServer: addr, Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond, } } // 調用rpc接口發送metric func updateMetrics(addr string, metrics []*model.MetricValue, resp *model.TransferResponse) bool { TransferClientsLock.RLock() defer TransferClientsLock.RUnlock() err := TransferClients[addr].Call("Transfer.Update", metrics, resp) if err != nil { log.Println("call Transfer.Update fail", addr, err) return false } return true }
採集插件同步
// 插件信息: 路徑、修改時間、運行週期(來自plugin插件) type Plugin struct { FilePath string MTime int64 Cycle int } // 插件map和調度器map var ( Plugins = make(map[string]*Plugin) PluginsWithScheduler = make(map[string]*PluginScheduler) ) // 刪除不須要的plugin func DelNoUsePlugins(newPlugins map[string]*Plugin) { for currKey, currPlugin := range Plugins { newPlugin, ok := newPlugins[currKey] if !ok || currPlugin.MTime != newPlugin.MTime { deletePlugin(currKey) } } } // 添加同步時增長的plugin func AddNewPlugins(newPlugins map[string]*Plugin) { for fpath, newPlugin := range newPlugins { // 去除重複插件 if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime { continue } // 爲新添加的插件新建調度器 Plugins[fpath] = newPlugin sch := NewPluginScheduler(newPlugin) PluginsWithScheduler[fpath] = sch // 啓動plugin調度 sch.Schedule() } } func ClearAllPlugins() { for k := range Plugins { deletePlugin(k) } } func deletePlugin(key string) { v, ok := PluginsWithScheduler[key] if ok { // 暫停調度plugin v.Stop() delete(PluginsWithScheduler, key) } delete(Plugins, key) }
插件調度策略
// 持續間隔執行plugin type PluginScheduler struct { Ticker *time.Ticker Plugin *Plugin Quit chan struct{} } // 根據plugin建立新的schedule func NewPluginScheduler(p *Plugin) *PluginScheduler { scheduler := PluginScheduler{Plugin: p} scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second) scheduler.Quit = make(chan struct{}) return &scheduler } // plugin調度,間隔執行PluginRun,除非收到quit消息 func (this *PluginScheduler) Schedule() { go func() { for { select { case <-this.Ticker.C: PluginRun(this.Plugin) case <-this.Quit: this.Ticker.Stop() return } } }() } // 中止plugin調度 func (this *PluginScheduler) Stop() { close(this.Quit) } // 執行插件,讀取插件運行返回數據並上報transfer func PluginRun(plugin *Plugin) { timeout := plugin.Cycle*1000 - 500 fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath) if !file.IsExist(fpath) { log.Println("no such plugin:", fpath) return } debug := g.Config().Debug if debug { log.Println(fpath, "running...") } cmd := exec.Command(fpath) var stdout bytes.Buffer cmd.Stdout = &stdout var stderr bytes.Buffer cmd.Stderr = &stderr cmd.Start() err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond) errStr := stderr.String() if errStr != "" { logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log") if _, err = file.WriteString(logFile, errStr); err != nil { log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err) } } if isTimeout { // has be killed if err == nil && debug { log.Println("[INFO] timeout and kill process", fpath, "successfully") } if err != nil { log.Println("[ERROR] kill process", fpath, "occur error:", err) } return } if err != nil { log.Println("[ERROR] exec plugin", fpath, "fail. error:", err) return } // exec successfully data := stdout.Bytes() if len(data) == 0 { if debug { log.Println("[DEBUG] stdout of", fpath, "is blank") } return } var metrics []*model.MetricValue err = json.Unmarshal(data, &metrics) if err != nil { log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String()) return } g.SendToTransfer(metrics) }