InfluxDB 是一個開源分佈式時序、事件和指標數據庫。使用 Go 語言編寫,無需外部依賴。其設計目標是實現分佈式和水平伸縮擴展。
html
influxDB啓動流程:linux
1 用docker下拉influxdb的鏡像 nginx
docker pull tutum/influxdb docekrgit
2 Docker環境下運行influxdbgithub
docker run -d -p 8083:8083 -p8086:8086 --expose 8090 --expose 8099 --name influxsrv tutum/influxdb
web
各個參數含義:正則表達式
-d:容器在後臺運行
docker
-p:將容器內端口映射到宿主機端口,格式爲 宿主機端口:容器內端口;數據庫
8083是influxdb的web管理工具端口json
8086是influxdb的HTTP API端口
--expose:能夠讓容器接受外部傳入的數據
--name:容器名稱 最後是鏡像名稱+tag,鏡像爲tutum/influxdb,tag的值0.8.8指定了要運行的版本,默認是latest。
3 啓動influxdb後,influxdb會啓動一個內部的HTTP server管理工具,用戶能夠經過接入該web服務器來操做influxdb。
固然,也能夠經過CLI即命令行的方式訪問influxdb。
打開瀏覽器,輸入http://127.0.0.1:8083,訪問管理工具的主頁
4 Influxdb客戶端 能夠參考裏面例子
https://github.com/influxdata/influxdb/tree/master/client
PS. Influxdb原理詳解
https://www.linuxdaxue.com/influxdb-principle.html
Grafana 是一個開源的時序性統計和監控平臺,支持例如 elasticsearch、graphite、influxdb 等衆多的數據源,並以功能強大的界面編輯器著稱。
官網:https://grafana.com/
grafana啓動流程:
1 docker 拉取鏡像
docker run -d --name=grafana -p 3000:3000 grafana/grafana
2 訪問管理工具的主頁
瀏覽器127.0.0.1:3000 , 登陸 grafana的默認端口是3000,用戶名和密碼爲 admin / admin,配置文件/etc/grafana/grafana.ini,更改配置文件後須要重啓grafana。
3. 建立數據庫,綁定influxdb
4. 建立一個新的面板
home —> New Dashboard —> Graph —> 點擊,Edit
5 Edit中的Metrics就是構造一個SQL的查詢語句
監控日誌程序經過 influxdb 將須要的內容打點到influxdb
1.導入 github.com/influxdata/influxdb/client/v2
2.建立influxdb client
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: addr,
Username: username,
Password: password,
})
if err != nil {
log.Fatal(err)
}
defer c.Close()複製代碼
3.建立須要打的點的格式,類型
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: database,
Precision: precision,
})
if err != nil {
log.Fatal(err)
}
複製代碼
4.建立點,將點添加進influxdb數據庫
// Create a point and add to batch
//Tags:Path,Method,Scheme,Status
tags := map[string]string{
"Path": v.Path,
"Method": v.Method,
"Scheme": v.Scheme,
"Status": v.Status,
}
fields := map[string]interface{}{
"UpstreamTime": v.UpstreamTime,
"RequestTime": v.RequestTime,
"BytesSent": v.BytesSent,
}
pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}複製代碼
imooc.log日誌格式以下:
172.0.0.12 - - [02/May/2018:17:17:35 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
172.0.0.12 - - [02/May/2018:17:17:36 +0000] http "POST /bar?query=t HTTP/1.0" 300 2133 "-" "KeepAliveClient" "-" 1.025 1.854
代碼邏輯主要是 經過讀取模塊讀取imooc.log日誌文件中日誌,而後經過正則表達式,一行一行解析獲取數據,並經過寫入模塊將數據經過influxdb客戶端打點,最後經過grafana去顯示數據圖形.
package main
import (
"bufio"
"fmt"
"github.com/influxdata/influxdb/client/v2"
"io"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
"flag"
"log"
"net/http"
"encoding/json"
)
const (
TypeHandleLine = 0
TypeErrNum = 1
TpsIntervalTime = 5
)
var TypeMonitorChan = make(chan int,200)
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
//系統狀態監控
type SystemInfo struct {
HandleLine int `json:"handleLine"` //總處理日誌行數
Tps float64 `json:"tps"` //系統吞吐量
ReadChanLen int `json:"readChanLen"` //read channel 長度
WriterChanLen int `json:"writeChanLen"` //write channel 長度
RunTime string `json:"ruanTime"` //運行總時間
ErrNum int `json:"errNum"` //錯誤數
}
type Monitor struct {
startTime time.Time
data SystemInfo
tpsSli []int
tps float64
}
func (m *Monitor)start(lp *LogProcess) {
go func() {
for n := range TypeMonitorChan {
switch n {
case TypeErrNum:
m.data.ErrNum += 1
case TypeHandleLine:
m.data.HandleLine += 1
}
}
}()
ticker := time.NewTicker(time.Second *TpsIntervalTime)
go func() {
for {
<-ticker.C
m.tpsSli = append(m.tpsSli,m.data.HandleLine)
if len(m.tpsSli) > 2 {
m.tpsSli = m.tpsSli[1:]
m.tps = float64(m.tpsSli[1] - m.tpsSli[0])/TpsIntervalTime
}
}
}()
http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
m.data.RunTime = time.Now().Sub(m.startTime).String()
m.data.ReadChanLen = len(lp.rc)
m.data.WriterChanLen = len(lp.wc)
m.data.Tps = m.tps
ret ,_ := json.MarshalIndent(m.data,"","\t")
io.WriteString(writer,string(ret))
})
http.ListenAndServe(":9193",nil)
}
type Reader interface {
Read(rc chan []byte)
}
type Writer interface {
Writer(wc chan *Message)
}
type LogProcess struct {
rc chan []byte
wc chan *Message
read Reader
write Writer
}
type ReadFromFile struct {
path string //讀取文件的路徑
}
//讀取模塊
func (r *ReadFromFile) Read(rc chan []byte) {
//打開文件
f, err := os.Open(r.path)
fmt.Println(r.path)
if err != nil {
panic(fmt.Sprintf("open file err :", err.Error()))
}
//從文件末尾開始逐行讀取文件內容
f.Seek(0, 2) //2,表明將指正移動到末尾
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n') //連續讀取內容知道須要'\n'結束
if err == io.EOF {
time.Sleep(5000 * time.Microsecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes err :", err.Error()))
}
TypeMonitorChan <- TypeHandleLine
rc <- line[:len(line)-1]
}
}
type WriteToinfluxDB struct {
influxDBDsn string //influx data source
}
//寫入模塊
/**
1.初始化influxdb client
2. 從Write Channel中讀取監控數據
3. 構造數據並寫入influxdb
*/
func (w *WriteToinfluxDB) Writer(wc chan *Message) {
infSli := strings.Split(w.influxDBDsn, "@")
addr := infSli[0]
username := infSli[1]
password := infSli[2]
database := infSli[3]
precision := infSli[4]
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: addr,
Username: username,
Password: password,
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
for v := range wc {
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: database,
Precision: precision,
})
if err != nil {
log.Fatal(err)
}
// Create a point and add to batch
//Tags:Path,Method,Scheme,Status
tags := map[string]string{
"Path": v.Path,
"Method": v.Method,
"Scheme": v.Scheme,
"Status": v.Status,
}
fields := map[string]interface{}{
"UpstreamTime": v.UpstreamTime,
"RequestTime": v.RequestTime,
"BytesSent": v.BytesSent,
}
fmt.Println("taps:",tags)
fmt.Println("fields:",fields)
pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}
// Close client resources
if err := c.Close(); err != nil {
log.Fatal(err)
}
log.Println("write success")
}
}
//解析模塊
func (l *LogProcess) Process() {
/**
172.0.012 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-"
"KeepAliveClient" "-" 1.005 1.854
([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+) */ r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`) for v := range l.rc { ret := r.FindStringSubmatch(string(v)) if len(ret) != 14 { TypeMonitorChan <- TypeErrNum fmt.Println("FindStringSubmatch fail:", string(v)) fmt.Println(len(ret)) continue } message := &Message{} //時間: [04/Mar/2018:13:49:52 +0000] loc, _ := time.LoadLocation("Asia/Shanghai") t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc) if err != nil { TypeMonitorChan <- TypeErrNum fmt.Println("ParseInLocation fail:", err.Error(), ret[4]) } message.TimeLocal = t //字符串長度: 2133 byteSent, _ := strconv.Atoi(ret[8]) message.BytesSent = byteSent //"GET /foo?query=t HTTP/1.0" reqSli := strings.Split(ret[6], " ") if len(reqSli) != 3 { TypeMonitorChan <- TypeErrNum fmt.Println("strings.Split fail:", ret[6]) continue } message.Method = reqSli[0] u, err := url.Parse(reqSli[1]) if err != nil { TypeMonitorChan <- TypeErrNum fmt.Println("url parse fail:", err) continue } message.Path = u.Path //http message.Scheme = ret[5] //code: 200 message.Status = ret[7] //1.005 upstreamTime, _ := strconv.ParseFloat(ret[12], 64) message.UpstreamTime = upstreamTime //1.854 requestTime, _ := strconv.ParseFloat(ret[13], 64) message.RequestTime = requestTime //fmt.Println(message) l.wc <- message } } /** 分析監控需求: 某個協議下的某個請求在某個請求方法的 QPS&響應時間&流量 */ func main() { var path, influDsn string flag.StringVar(&path, "path", "./imooc.log", "read file path") flag.StringVar(&influDsn, "influxDsn", "http://127.0.01:8086@imooc@imoocpass@imooc@s", "influx data source") flag.Parse() r := &ReadFromFile{ path: path, } w := &WriteToinfluxDB{ influxDBDsn: influDsn, } lp := &LogProcess{ rc: make(chan []byte,200), wc: make(chan *Message), read: r, write: w, } go lp.read.Read(lp.rc) for i:=1;i<2 ; i++ { go lp.Process() } for i:=1;i<4 ; i++ { go lp.write.Writer(lp.wc) } fmt.Println("begin !!!") m:= &Monitor{ startTime:time.Now(), data:SystemInfo{}, } m.start(lp) } 複製代碼