influxDB+grafana 日誌監控平臺(Golang)

influxdb

InfluxDB 是一個開源分佈式時序、事件和指標數據庫。使用 Go 語言編寫,無需外部依賴。其設計目標是實現分佈式和水平伸縮擴展。
html


influxDB啓動流程:linux


 1  用docker下拉influxdb的鏡像 nginx

docker pull tutum/influxdb docekrgit


2 Docker環境下運行influxdbgithub

docker run -d -p 8083:8083 -p8086:8086 --expose 8090 --expose 8099 --name influxsrv tutum/influxdb
web

各個參數含義:正則表達式

-d:容器在後臺運行
docker

-p:將容器內端口映射到宿主機端口,格式爲 宿主機端口:容器內端口;數據庫

8083是influxdb的web管理工具端口json

8086是influxdb的HTTP API端口

--expose:能夠讓容器接受外部傳入的數據

--name:容器名稱 最後是鏡像名稱+tag,鏡像爲tutum/influxdb,tag的值0.8.8指定了要運行的版本,默認是latest。


3 啓動influxdb後,influxdb會啓動一個內部的HTTP server管理工具,用戶能夠經過接入該web服務器來操做influxdb。

固然,也能夠經過CLI即命令行的方式訪問influxdb。

打開瀏覽器,輸入http://127.0.0.1:8083,訪問管理工具的主頁


4 Influxdb客戶端 能夠參考裏面例子

https://github.com/influxdata/influxdb/tree/master/client

PS. Influxdb原理詳解

https://www.linuxdaxue.com/influxdb-principle.html



Grafana

Grafana 是一個開源的時序性統計和監控平臺,支持例如 elasticsearch、graphite、influxdb 等衆多的數據源,並以功能強大的界面編輯器著稱。

官網:https://grafana.com/


grafana啓動流程:

1 docker 拉取鏡像

docker run -d --name=grafana -p 3000:3000 grafana/grafana


2 訪問管理工具的主頁

瀏覽器127.0.0.1:3000 ,  登陸 grafana的默認端口是3000,用戶名和密碼爲 admin / admin,配置文件/etc/grafana/grafana.ini,更改配置文件後須要重啓grafana。


3. 建立數據庫,綁定influxdb


4. 建立一個新的面板

home —> New Dashboard —> Graph —> 點擊,Edit


5 Edit中的Metrics就是構造一個SQL的查詢語句



Golang打點

監控日誌程序經過 influxdb 將須要的內容打點到influxdb 

1.導入 github.com/influxdata/influxdb/client/v2


2.建立influxdb client

// Create a new HTTPClient
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr:     addr,
		Username: username,
		Password: password,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()複製代碼


3.建立須要打的點的格式,類型

// Create a new point batch
		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  database,
			Precision: precision,
		})
		if err != nil {
			log.Fatal(err)
		}

		複製代碼


4.建立點,將點添加進influxdb數據庫

// Create a point and add to batch
		//Tags:Path,Method,Scheme,Status
		tags := map[string]string{
			"Path": v.Path,
			"Method": v.Method,
			"Scheme": v.Scheme,
			"Status": v.Status,
			}

		fields := map[string]interface{}{
			"UpstreamTime": v.UpstreamTime,
			"RequestTime":  v.RequestTime,
			"BytesSent":    v.BytesSent,
		}

pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
		if err != nil {
			log.Fatal(err)
		}
		bp.AddPoint(pt)

		// Write the batch
		if err := c.Write(bp); err != nil {
			log.Fatal(err)
		}複製代碼

Golang 完整代碼

imooc.log日誌格式以下:

172.0.0.12 - - [02/May/2018:17:17:35 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854

172.0.0.12 - - [02/May/2018:17:17:36 +0000] http "POST /bar?query=t HTTP/1.0" 300 2133 "-" "KeepAliveClient" "-" 1.025 1.854


代碼邏輯主要是  經過讀取模塊讀取imooc.log日誌文件中日誌,而後經過正則表達式,一行一行解析獲取數據,並經過寫入模塊將數據經過influxdb客戶端打點,最後經過grafana去顯示數據圖形.


package main

import (
	"bufio"
	"fmt"
	"github.com/influxdata/influxdb/client/v2"
	"io"
	"net/url"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"

	"flag"
	"log"
	"net/http"
	"encoding/json"
)

const (
	TypeHandleLine = 0
	TypeErrNum = 1
	TpsIntervalTime = 5
)

var TypeMonitorChan = make(chan int,200)

type Message struct {
	TimeLocal                    time.Time
	BytesSent                    int
	Path, Method, Scheme, Status string
	UpstreamTime, RequestTime    float64
}

//系統狀態監控
type SystemInfo struct {
	HandleLine    int     `json:"handleLine"`   //總處理日誌行數
	Tps           float64 `json:"tps"`          //系統吞吐量
	ReadChanLen   int     `json:"readChanLen"`  //read channel 長度
	WriterChanLen int     `json:"writeChanLen"` //write channel 長度
	RunTime       string  `json:"ruanTime"`     //運行總時間
	ErrNum        int     `json:"errNum"`       //錯誤數
}

type Monitor struct {
	startTime time.Time
	data SystemInfo
	tpsSli []int
	tps float64
}

func (m *Monitor)start(lp *LogProcess)  {

	go func() {
		for n := range TypeMonitorChan  {
			switch n {
			case TypeErrNum:
				m.data.ErrNum += 1

			case TypeHandleLine:
				m.data.HandleLine += 1
			}
		}
	}()


	ticker := time.NewTicker(time.Second *TpsIntervalTime)
	go func() {
		for {
			<-ticker.C
			m.tpsSli = append(m.tpsSli,m.data.HandleLine)
			if len(m.tpsSli) > 2 {
				m.tpsSli = m.tpsSli[1:]
				m.tps =  float64(m.tpsSli[1] - m.tpsSli[0])/TpsIntervalTime
			}
		}
	}()


	http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
		m.data.RunTime = time.Now().Sub(m.startTime).String()
		m.data.ReadChanLen = len(lp.rc)
		m.data.WriterChanLen = len(lp.wc)
		m.data.Tps = m.tps

		ret ,_ := json.MarshalIndent(m.data,"","\t")
		io.WriteString(writer,string(ret))
	})


	http.ListenAndServe(":9193",nil)
}


type Reader interface {
	Read(rc chan []byte)
}

type Writer interface {
	Writer(wc chan *Message)
}

type LogProcess struct {
	rc    chan []byte
	wc    chan *Message
	read  Reader
	write Writer
}

type ReadFromFile struct {
	path string //讀取文件的路徑
}

//讀取模塊
func (r *ReadFromFile) Read(rc chan []byte) {

	//打開文件
	f, err := os.Open(r.path)
	fmt.Println(r.path)
	if err != nil {
		panic(fmt.Sprintf("open file err :", err.Error()))
	}

	//從文件末尾開始逐行讀取文件內容
	f.Seek(0, 2) //2,表明將指正移動到末尾

	rd := bufio.NewReader(f)

	for {
		line, err := rd.ReadBytes('\n') //連續讀取內容知道須要'\n'結束
		if err == io.EOF {
			time.Sleep(5000 * time.Microsecond)
			continue
		} else if err != nil {
			panic(fmt.Sprintf("ReadBytes err :", err.Error()))
		}

		TypeMonitorChan <- TypeHandleLine
		rc <- line[:len(line)-1]
	}

}

type WriteToinfluxDB struct {
	influxDBDsn string //influx data source
}

//寫入模塊
/**
    1.初始化influxdb client
	2. 從Write Channel中讀取監控數據
	3. 構造數據並寫入influxdb
*/
func (w *WriteToinfluxDB) Writer(wc chan *Message) {

	infSli := strings.Split(w.influxDBDsn, "@")
	addr := infSli[0]
	username := infSli[1]
	password := infSli[2]
	database := infSli[3]
	precision := infSli[4]

	// Create a new HTTPClient
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr:     addr,
		Username: username,
		Password: password,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	for v := range wc {
		// Create a new point batch
		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  database,
			Precision: precision,
		})
		if err != nil {
			log.Fatal(err)
		}

		// Create a point and add to batch
		//Tags:Path,Method,Scheme,Status
		tags := map[string]string{
			"Path": v.Path,
			"Method": v.Method,
			"Scheme": v.Scheme,
			"Status": v.Status,
			}

		fields := map[string]interface{}{
			"UpstreamTime": v.UpstreamTime,
			"RequestTime":  v.RequestTime,
			"BytesSent":    v.BytesSent,
		}

		fmt.Println("taps:",tags)
		fmt.Println("fields:",fields)

		pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
		if err != nil {
			log.Fatal(err)
		}
		bp.AddPoint(pt)

		// Write the batch
		if err := c.Write(bp); err != nil {
			log.Fatal(err)
		}

		// Close client resources
		if err := c.Close(); err != nil {
			log.Fatal(err)
		}

		log.Println("write success")
	}

}

//解析模塊
func (l *LogProcess) Process() {

	/**
	172.0.012 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-"
	"KeepAliveClient" "-" 1.005 1.854

	([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+) */ r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`) for v := range l.rc { ret := r.FindStringSubmatch(string(v)) if len(ret) != 14 { TypeMonitorChan <- TypeErrNum fmt.Println("FindStringSubmatch fail:", string(v)) fmt.Println(len(ret)) continue } message := &Message{} //時間: [04/Mar/2018:13:49:52 +0000] loc, _ := time.LoadLocation("Asia/Shanghai") t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc) if err != nil { TypeMonitorChan <- TypeErrNum fmt.Println("ParseInLocation fail:", err.Error(), ret[4]) } message.TimeLocal = t //字符串長度: 2133 byteSent, _ := strconv.Atoi(ret[8]) message.BytesSent = byteSent //"GET /foo?query=t HTTP/1.0" reqSli := strings.Split(ret[6], " ") if len(reqSli) != 3 { TypeMonitorChan <- TypeErrNum fmt.Println("strings.Split fail:", ret[6]) continue } message.Method = reqSli[0] u, err := url.Parse(reqSli[1]) if err != nil { TypeMonitorChan <- TypeErrNum fmt.Println("url parse fail:", err) continue } message.Path = u.Path //http message.Scheme = ret[5] //code: 200 message.Status = ret[7] //1.005 upstreamTime, _ := strconv.ParseFloat(ret[12], 64) message.UpstreamTime = upstreamTime //1.854 requestTime, _ := strconv.ParseFloat(ret[13], 64) message.RequestTime = requestTime //fmt.Println(message) l.wc <- message } } /** 分析監控需求: 某個協議下的某個請求在某個請求方法的 QPS&響應時間&流量 */ func main() { var path, influDsn string flag.StringVar(&path, "path", "./imooc.log", "read file path") flag.StringVar(&influDsn, "influxDsn", "http://127.0.01:8086@imooc@imoocpass@imooc@s", "influx data source") flag.Parse() r := &ReadFromFile{ path: path, } w := &WriteToinfluxDB{ influxDBDsn: influDsn, } lp := &LogProcess{ rc: make(chan []byte,200), wc: make(chan *Message), read: r, write: w, } go lp.read.Read(lp.rc) for i:=1;i<2 ; i++ { go lp.Process() } for i:=1;i<4 ; i++ { go lp.write.Writer(lp.wc) } fmt.Println("begin !!!") m:= &Monitor{ startTime:time.Now(), data:SystemInfo{}, } m.start(lp) } 複製代碼
相關文章
相關標籤/搜索