[svc]influxdb最佳實戰-監控對比

最近在搞容器的監控,遇到influxdb這個庫,搞了兩天,些許明白了些套路,作個記錄,備忘....html

小結以下:python

influxdb go語言編寫git

默認狀況influxdb建立的庫關聯autogen的RP(存儲策略),即數據會保留永久github

監控和日誌的區別

最近搞監控,所謂監控就是監控服務肉體是否健康(還活着/生病? 各項指標是否正常?)web

區分日誌蒐集: 分析服務的精神狀態是健康(服務的一個履歷/日記)sql

如何作一個監控

參考: https://segmentfault.com/a/1190000011082379shell

回想到若是是你本身去作一個監控, 可以作到記錄每分鐘 CPU 的空閒率是多少, 要怎麼作?數據庫

搞一個數據庫, 用來放數據的
    寫一個腳本, 用來獲取 CPU 的相關數據, 加上時間戳, 而後保存到數據庫
    建立一個定時任務, 一分鐘運行一次腳本
    寫一個簡單的程序, 從數據庫查到數據, 而後根據時間戳, 繪製成圖表.

telegraf蒐集器 + influxdb(存儲) + grafana(展現)
grafana 的套路基本上跟 kibana 差很少,都是根據查詢條件設置聚合規則,在合適的圖表上進行展現,多個圖表共同組建成一個 dashboard,熟悉 kibana 的用戶應該能夠很是容易上手。另外 grafana 的可視化功能比 kibana 強得多,並且 4 以上版本將集成報警功能。json

grafana主機監控效果圖:segmentfault

以前用metricbeat作的主機監控效果圖-進程級別

監控的對比influxdb vs 普羅

特性對比

參考: http://gitbook.cn/books/59395d3d5863cf478e6b50ba/index.html

influxdb集成已有的概念,好比查詢語法相似sql,引擎從LSM優化而來,學習成本相對低。
influxdb支持的類型有float,integers,strings,booleans,prometheus目前只支持float。
influxdb的時間精度是納秒,prometheus的則是毫秒。
influxdb僅僅是個數據庫,而prometheus提供的是整套監控解決方案,固然influxdb也提供了整套監控解決方案。
influxdb支持的math function比較少,prometheus相對來講更多,influxdb就目前使用上已經知足功能。
influxdb支持event log,prometheus不支持。

注: 已上對比的是普羅v1 ,如今普羅有v2版本了,據說比influxdb更強悍了. 並且influxdb集羣方案已閉源.

influxdb的特性和特色

influxdb中文翻譯官方的文檔,感受很棒
https://jasper-zhang1.gitbooks.io/influxdb/content/
https://jasper-zhang1.gitbooks.io/influxdb/content/Concepts/key_concepts.html

參考: http://www.ttlsa.com/monitor-safe/monitor/distributed-time-series-database-influxdb/

  • influxdb 它的特性
    它有三大特性:
1. Time Series (時間序列):你可使用與時間有關的相關函數(如最大,最小,求和等)
2. Metrics(度量):你能夠實時對大量數據進行計算
3. Eevents(事件):它支持任意的事件數據

時序性(Time Series):與時間相關的函數的靈活使用(例如最大、最小、求和等);
度量(Metrics):對實時大量數據進行計算;
事件(Event):支持任意的事件數據,換句話說,任意事件的數據咱們均可以作操做。
  • influxdb 它的特色
    參考: http://dbaplus.cn/news-73-1291-1.html
schemaless(無結構),能夠是任意數量的列
無特殊依賴,幾乎開箱即用(如ElasticSearch須要Java)
自帶數據過時功能;
自帶權限管理,精細到「表」級別;
原生的HTTP支持,內置HTTP API
強大的類SQL語法,支持min, max, sum, count, mean, median 等一系列函數,方便統計。

influxdb最佳實踐

1.登陸 建庫 查詢

參考: https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/getting_start.html

influx -precision rfc3339   # -precision參數代表了任何返回的時間戳的格式和精度,針對查詢時候顯示的時間格式
CREATE DATABASE mydb
SHOW DATABASES
USE mydb
INSERT cpu,host=serverA,region=us_west value=0.64
SELECT "host", "region", "value" FROM "cpu"

INSERT temperature,machine=unit42,type=assembly external=25,internal=37
SELECT * FROM "temperature"


> SELECT * FROM /.*/ LIMIT 1
> SELECT * FROM "cpu_load_short"
> SELECT * FROM "cpu_load_short" WHERE "value" > 0.9

2.瞭解influxdb基本概念

參考: http://dbaplus.cn/news-73-1291-1.html

InfluxDB中的名詞 傳統數據庫中的概念
database 數據庫
measurement 數據庫中的表
points 表裏面的一行數據

InfluxDB中特有的概念

Point至關於傳統數據庫裏的一行數據,以下表所示:
Point由時間戳(time)、數據(field)、標籤(tags)組成。

line-protocol格式

<measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]

INSERT temperature,machine=unit42,type=assembly external=25,internal=37

更多如:

cpu,host=serverA,region=us_west value=0.64
payment,device=mobile,product=Notepad,method=credit billed=33,licenses=3i 1434067467100293230
stock,symbol=AAPL bid=127.46,ask=127.48
temperature,machine=unit42,type=assembly external=25,internal=37 1434067467000000000
Tag: 被索引
上面的location和server就是tag key,us和host1是tag value,tag是可選的。不過寫入數據時最好加上tag,由於它能夠被索引。tag的類型只能是字符串。

Field: value支持的類型floats,integers,strings,booleans
上面的temperature是field key,82是field value。field value會用於展現,value支持的類型有floats,integers,strings,booleans。

Timestamp
格式是:RFC3339 UTC。默認精確到納秒,可選。

Series:
measurement, tag set, retention policy相同的數據集合算作一個 series。理解這個概念相當重要,由於這些數據存儲在內存中,若是series太多,會致使OOM

Retention Policy:
保留策略包括設置數據保存的時間以及在集羣中的副本個數。默認配置是:RP是autogen,保留時間是永久,副本爲1。這些配置在建立數據庫時能夠修改。

Continuous Query:
CQ是預先配置好的一些查詢命令,按期自動執行這些命令並將查詢結果寫入指定的measurement中,這個功能主要用於數據聚合。具體參考:CQ。

Shard:
存儲必定時間間隔的數據,每一個目錄對應一個shard,目錄的名字就是shard id。每個shard都有本身的cache、wal、tsm file以及compactor,目的就是經過時間來快速定位到要查詢數據的相關資源,加速查詢的過程,而且也讓以後的批量刪除數據的操做變得很是簡單且高效。

2.實操以下: 理解 point&measurement&series(field set)(被索引的tag set)

向庫中插入以下數據:

屬性
庫名 my_database
measurement census
field key butterflies和honeybees
tag key location和scientist
name: census
-————————————
time                                      butterflies     honeybees     location     scientist
2015-08-18T00:00:00Z      12                   23                    1                 langstroth
2015-08-18T00:00:00Z      1                     30                    1                 perpetua
2015-08-18T00:06:00Z      11                   28                    1                 langstroth
2015-08-18T00:06:00Z   3                     28                    1                 perpetua
2015-08-18T05:54:00Z      2                     11                    2                 langstroth
2015-08-18T06:00:00Z      1                     10                    2                 langstroth
2015-08-18T06:06:00Z      8                     23                    2                 perpetua
2015-08-18T06:12:00Z      7                     22                    2                 perpetua

sql語句以下

'INSERT census,location=1,scientist=langstroth butterflies=12,honeybees=23'
'INSERT census,location=1,scientist=perpetua butterflies=1,honeybees=30'
'INSERT census,location=1,scientist=langstroth butterflies=11,honeybees=28'
'INSERT census,location=1,scientist=perpetua butterflies=3,honeybees=28'
'INSERT census,location=2,scientist=langstroth butterflies=2,honeybees=11'
'INSERT census,location=2,scientist=langstroth butterflies=1,honeybees=10'
'INSERT census,location=2,scientist=perpetua butterflies=8,honeybees=23'
'INSERT census,location=2,scientist=perpetua butterflies=7,honeybees=22'
  • 造數據用到的2個腳本
    爲了模擬隔多久插入數據
    模擬插入數據時,隨機賦值
$ cat fake_data.sh 
arr=(
'INSERT orders,website=30 phone=10'
'INSERT orders,website=39 phone=12'
'INSERT orders,website=56 phone=11'
)

#while :;do
for((i=0;i<${#arr[*]};i++));do
    /usr/bin/influx -database 'my_food' -execute "${arr[i]}"
    sleep 10
#    echo "${arr[i]}"
done
#done
$ cat data.sh 
#!/bin/bash

function rand(){
    min=$1
    max=$(($2-$min+1))
    num=$(date +%s%N)
    echo $(($num%$max+$min))
}


while :;do
    /usr/bin/influx -database 'my_database' -execute "INSERT census,location=2,scientist=perpetua butterflies=$(rand 1 50),honeybees=$(rand 1 50)"
    sleep 2;
#    echo "INSERT orders,website=$(rand 1 50) phone=$(rand 1 50)"
#    break
done

field value就是你的數據,它們能夠是字符串、浮點數、整數、布爾值,由於InfluxDB是時間序列數據庫,因此field value老是和時間戳相關聯。
在示例中,field value以下:

12   23
1    30
11   28
3    28
2    11
1    10
8    23
7    22

在上面的數據中,每組field key和field value的集合組成了field set,在示例數據中,有八個field set:

butterflies = 12 honeybees = 23
butterflies = 1 honeybees = 30
butterflies = 11 honeybees = 28
butterflies = 3 honeybees = 28
butterflies = 2 honeybees = 11
butterflies = 1 honeybees = 10
butterflies = 8 honeybees = 23
butterflies = 7 honeybees = 22

注意,field是沒有索引的。若是使用field value做爲過濾條件來查詢,則必須掃描其餘條件匹配後的全部值。所以,這些查詢相對於tag上的查詢(下文會介紹tag的查詢)性能會低不少。

在上面的數據中,tag set是不一樣的每組tag key和tag value的集合,示例數據裏有四個tag set:

location = 1, scientist = langstroth
location = 2, scientist = langstroth
location = 1, scientist = perpetua
location = 2, scientist = perpetua

如今你已經熟悉了measurement,tag set和retention policy,那麼如今是討論series的時候了。 在InfluxDB中,series是共同retention policy,measurement和tag set的集合。 以上數據由四個series組成:

理解series對於設計數據schema以及對於處理InfluxDB裏面的數據都是頗有必要的。
最後,point就是具備相同timestamp的相同series的field集合。例如,這就是一個point:

name: census
-----------------
time                           butterflies     honeybees     location     scientist
2015-08-18T00:00:00Z     1                  30               1               perpetua

例子裏的series的retention policy爲autogen,measurement爲census,tag set爲location = 1, scientist = perpetua。point的timestamp爲2015-08-18T00:00:00Z。

wal(Write Ahead Log)

參考: https://jasper-zhang1.gitbooks.io/influxdb/content/Concepts/glossary.html

最近寫的點數的臨時緩存。爲了減小訪問永久存儲文件的頻率,InfluxDB將最新的數據點緩衝進WAL中,直到其總大小或時間觸發而後flush到長久的存儲空間。這樣能夠有效地將寫入batch處理到TSM中。
能夠查詢WAL中的點,而且系統重啓後仍然保留。在進程開始時,在系統接受新的寫入以前,WAL中的全部點都必須flushed。

目錄結構

參考: http://gitbook.cn/books/59395d3d5863cf478e6b50ba/index.html
InfluxDB的數據存儲有三個目錄,分別是meta、wal、data。meta用於存儲數據庫的一些元數據,meta目錄下有一個meta.db文件。wal目錄存放預寫日誌文件,以.wal結尾。data目錄存放實際存儲的數據文件,以.tsm結尾。基本結構以下:

-- wal
   -- test
       -- autogen
         -- 1
            -- _00001.wal
         -- 2
            -- _00002.wal
-- data
   -- test
      -- autogen
         -- 1
            -- 000000001-000000001.tsm
         -- 2
            -- 000000001-000000010.tsm
-- meta
     -- meta.db

數據採樣--> 理解cq和rp

Continuous Query (CQ)是在數據庫內部自動週期性跑着的一個InfluxQL的查詢,CQs須要在SELECT語句中使用一個函數,而且必定包括一個GROUP BY time()語句。+

Retention Policy (RP)是InfluxDB數據架構的一部分,它描述了InfluxDB保存數據的時間。InfluxDB會比較服務器本地的時間戳和你數據的時間戳,並刪除比你在RPs裏面用DURATION設置的更老的數據。單個數據庫中能夠有多個RPs可是每一個數據的RPs是惟一的。

實例數據:
db: food_data
mesurement: orders

name: orders
------------
time                           phone     website
2016-05-10T23:18:00Z     10        30
2016-05-10T23:18:10Z     12        39
2016-05-10T23:18:20Z     11        56

目標:

自動刪除1h以上的原始2秒間隔數據   --> rp實現
自動刪除超過5min的30s間隔數據     --> rp實現

自動將2秒間隔數據聚合到30s的間隔數據 ---> cq實現

2s中插入一次數據:(腳本參考上面fake數據)

create databaes food_data
CREATE RETENTION POLICY "a_hour" ON "food_data" DURATION 1h REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "a_week" ON "food_data" DURATION 1w REPLICATION 1

CREATE CONTINUOUS QUERY "cq_10s" ON "food_data" BEGIN SELECT mean("website") AS "mean_website",mean("phone") AS "mean_phone" INTO  "a_week"."downsampled_orders" FROM "orders" GROUP BY time(10s) END

在步驟1裏面建立數據庫時,InfluxDB會自動生成一個叫作autogen的RP,並做爲數據庫的默認RP,autogen這個RP會永遠保留數據。在輸入上面的命令以後,a_hours會取代autogen做爲food_data的默認RP。

驗證:

select * from "a_week"."downsampled_orders";
select * from "orders";

influxdb數據聚合

參考

表名均可以正則
select * from /.*/ limit 1

查詢一個表裏面的全部數據
select * from cpu_idle

查詢數據大於200的。
select * from response_times where value > 200

查詢數據裏面含有下面字符串的。 
select * from user_events where url_base = ‘friends#show’

約等於 
select line from log_lines where line =~ /paul@influx.com/

按照30m分鐘進行聚合,時間範圍是大於昨天的   主機名是server1的。 
select mean(value) from cpu_idle group by time(30m) where time > now() – 1d and hostName = ‘server1′
select column_one  from foo  where time > now() – 1h limit 1000;
select reqtime, url from web9999.httpd where reqtime > 2.5;
select reqtime, url from web9999.httpd where time > now() – 1h limit 1000;

url搜索裏面含有login的字眼,還以login開頭
select reqtime, url from web9999.httpd where url =~ /^\/login\//;

還能夠作數據的merge
select reqtime, url from web9999.httpd merge web0001.httpd;

influxdb備份恢復

參考
參考: http://stedolan.github.io/jq/

#!/bin/bash
 
function parse_options {
  function usage() {
    echo -e >&2 "Usage: $0 dump DATABASE [options...]
\t-u USERNAME\t(default: root)
\t-p PASSWORD\t(default: root)
\t-h HOST\t\t(default: localhost:8086)
\t-s\t\t(use HTTPS)"
  }
  if [ "$#" -lt 2 ]; then
    usage; exit 1;
  fi
 
  username=root
  password=root
  host=localhost:8086
  https=0
  shift
  database=$1
  shift
 
  while getopts u:p:h:s opts
  do case "${opts}" in
    u) username="${OPTARG}";;
    p) password="${OPTARG}";;
    h) host="${OPTARG}";;
    s) https=1;;
    ?) usage; exit 1;;
    esac
  done
  if [ "${https}" -eq 1 ]; then
    scheme="https"
  else
    scheme="http"
  fi
}
 
function dump {
  parse_options $@
 
  curl -s -k -G "${scheme}://${host}/db/${database}/series?u=${username}&p=${password}&chunked=true" --data-urlencode "q=select * from /.*/" \
    | jq . -c -M
  exit
}
 
function restore {
  parse_options $@
 
  while read -r line
  do
    echo >&2 "Writing..."
    curl -X POST -d "[${line}]" "${scheme}://${host}/db/${database}/series?u=${username}&p=${password}"
  done
  exit
}
 
case "$1" in
  dump)     dump $@;;
  restore)  restore $@;;
  *)      echo >&2 "Usage: $0 [dump|restore] ..."
    exit 1;;
esac

python調用influxdb實現數據增刪

utils/db.py

# - * - coding: utf-8 - * -

from influxdb import InfluxDBClient


def get_db_connection():
    db_conn = InfluxDBClient(host="192.168.x.x", database="pachongdb")
    return db_conn

main.py

#!/home/ansible/.venv/bin/python
# - * - coding: utf-8 - * -

from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
from utils import db


def insert_success_point_2db():
    db_conn = db.get_db_connection()
    # 寫入成功記錄,success字段值約定爲1
    success_point = [{
        "measurement": "wake",
        "tags": {
            "isp": "mobile",
            "region": "上海",
        },
        "fields": {
            "mobile": 159123456xx,
            "success": 1,
        }
    }]

    try:
        db_conn.write_points(success_point)
    except InfluxDBClientError as e:
        print("influxdb db client error: {0}".format(e))
    except InfluxDBServerError as e:
        print("influxdb db server error: {0}".format(e))
    except Exception as e:
        print("influxdb error: {0}".format(e))
    finally:
        if db_conn is not None:
            db_conn.close()


def insert_fail_point_2db():
    db_conn = db.get_db_connection()
    # 寫入失敗記錄,fail字段值約定爲0
    fail_point = [{
        "measurement": "wake",
        "tags": {
            "isp": "mobile",
            "region": "上海",
        },
        "fields": {
            "mobile": 1591234xxxx,
            "fail": 0,
        }
    }]
    try:
        db_conn.write_points(fail_point)
    except InfluxDBClientError as e:
        print("influxdb db client error: {0}".format(e))
    except InfluxDBServerError as e:
        print("influxdb db server error: {0}".format(e))
    except Exception as e:
        print("influxdb error: {0}".format(e))
    finally:
        if db_conn is not None:
            db_conn.close()


def main():
    insert_success_point_2db()
    insert_fail_point_2db()


if __name__ == '__main__':
    main()

requirements.txt

certifi==2017.11.5
influxdb==5.0.0

相關文章
相關標籤/搜索