本文參考 Druid 官方文檔。
Apache Druid 是一個集時間序列數據庫、數據倉庫和全文檢索系統特色於一體的分析性數據平臺(OLAP)。Druid 做爲一個高可用、高性能和多特性的 OLAP 平臺,使用場景豐富。mysql
許多互聯網公司基於 Druid 搭建 OLAP 數據分析和 BI 平臺。如:sql
此前 Druid 系列文章已經詳解過 Druid 的特性、使用場景、架構和實現原理。能夠參考:shell
本文將指導讀者完整定義一個完整 Spec
,並指出關鍵注意事項。Spec
是 Druid 數據攝入的配置信息,使用 json
格式,使用 Druid 時能夠經過界面配置,最後生成 Spec
文件,也能夠直接編寫 Spec
文件,而後上傳配置。不管使用哪一種方式,深刻了解 Spec
的編寫既是開始使用 Druid 的第一步,也是深刻了解 Druid 各類概念,繼而深刻了解 Druid 原理的必經之路。數據庫
假設咱們有如下網絡流量數據:json
srcIP
: 發送端 IP 地址srcPort
: 發送端端口號dstIP
: 接收端 IP 地址dstPort
: 接收端端口號protocol
: IP 協議號packets
: 傳輸的包的數量bytes
: 傳輸字節數cost
: 傳輸耗費的時間{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4} {"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1} {"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4} {"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9} {"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2} {"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3} {"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4} {"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5} {"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}
將上面 JSON 內容保存到 $druid_root\quickstart\
目錄下的 ingestion-tutorial-data.json
文件中。網絡
下面咱們開始編寫一個 Spect
將上面的數據寫入 Druid。架構
在本教程中,咱們將使用本地批處理indexing
任務。若是使用其餘任務類型,攝入規範的某些地方將會不同,咱們將在教程中指出。併發
下面將詳細講解 Spec
配置,你將瞭解如下內容:高併發
Druid 攝入 spec 的核心元素是 dataSchema
。dataSchema
定義如何將輸入的數據解析成 Druid 可以存儲的列集合。post
咱們從一個空的dataSchema
開始,並按教程一步步添加字段。
在quickstart/
目錄下建立 ingestion-tutorial-index.json
文件,將如下內容寫入文件:
"dataSchema" : {}
隨着教程的進行,咱們將不斷的修改此 spec 文件。
數據源名稱經過dataSchema
下的 dataSource
參數指定。dataSource
相似於 RDBMS 的 Table Name,寫入的數據經過此名稱查詢,如:select * from $dataSource
。
"dataSchema" : { "dataSource" : "ingestion-tutorial", }
讓咱們將教程中的數據源命名爲 ingestion-tutorial
。
dataSchema
須要知道如何從輸入的數據中提取主時間字段。Druid 的數據必須有時間字段,Druid 底層按時間分 segment 來存儲數據,詳情能夠參考《Apache Druid 的集羣設計與工做流程》。
咱們數據中的時間戳列是"ts",它是一個 ISO 8601
規範的時間戳,咱們將配置此字段的 timestampSpec
信息加到 dataSchema
下:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" } }
如今,咱們已經定義了時間列,讓咱們看一下其餘列的定義。
Druid 支持如下列類型:String,Long,Float,Double。下面章節中咱們將看到這些類型如何被使用。
在咱們講如何定義其餘非時間列以前,先討論一下 rollup
。
在攝入數據時,咱們須要考慮是否須要 rollup。
在此教程中,咱們開啓 rollup。在 dataSchema
的 granularitySpec
中指定:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "granularitySpec" : { "rollup" : true } }
在此教程中,咱們按如下方式劃分維度列和指標列:
這些維度是一組屬性,用以標識一組網絡流量數據,而指標表明按此維度組合的網絡流量的實際狀況。
讓咱們看看如何在 spec 中定義維度和指標吧。
維度由 dataSchema
中的 dimensionsSpec
參數指定。
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "granularitySpec" : { "rollup" : true } }
每一個維度都有一個 name
和 一個 type
,其中 type
能夠是"long", "float", "double", or "string"。
注意,srcIP
是一個 "string" 維度。對於字符串維度,只須要指定維度的名稱就能夠了,由於它的類型默認爲"string"。
也請注意, protocol
在輸入數據中是數字類型,但咱們以 "string" 列類型提取它,因此 Druid 在攝入數據時會將其強制由 long 類型轉換成 string 類型。
數字類型的數據應該做爲數字維度仍是字符串維度?
數字維度相對於字符串維度有如下優點和劣勢:
指標經過 dataSchema
中的 metricsSpec
參數指定:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "rollup" : true } }
在定義指標時,必須指定當前列在 rollup 時應該執行的聚合類型。
這裏,咱們在 packets
和 bytes
兩個指標列上定義了 long 類型的 sum 聚合,在 cost
列上定義了一個 double 的 sum 聚合。
注意 metricsSpec
與 dimensionSpec
和 parseSpec
的嵌套層級不同。它和 dataSchema
中的 parser
在同一嵌套層級。
注意,咱們也定義了一個 count
聚合器。這個計數聚合器將統計原始數據攝入的行數。
若是咱們不使用 rollup,將在 dimensionsSpec
中指定全部列,如:
"dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" }, { "name" : "packets", "type" : "long" }, { "name" : "bytes", "type" : "long" }, { "name" : "srcPort", "type" : "double" } ] },
此時,咱們已經完成了 parser
和 metricSpec
的定義,並幾乎要完成 Spec 的 dataSchema
。
咱們還須要在 granularitySpec
中設置一些額外的參數:
uniform
和 arbitrary
。在本教程中,咱們將使用 uniform
,這樣全部的 segment 將有統一的時間範圍大小(本示例中,全部 segment 覆蓋一個小時的數據量)。DAY
,WEEK
。queryGranularity
)。segment 粒度經過 granularitySpec
中的 segmentGranularity
屬性配置。此文檔中,咱們將建立 hourly 粒度的 segment:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "rollup" : true } }
咱們的輸入數據包含兩個小時的事件,因此此任務將生成兩個 segment。
查詢粒度經過 granularitySpec
中的 queryGranularity
屬性配置。此教程中,咱們使用 minute 級粒度:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "rollup" : true } }
爲了查看查詢粒度配置的效果,讓咱們從原始輸入數據中查看這一行:
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
當使用 minute 粒度攝入這行數據時,Druid 將把這行數據的時間戳 floor 成 minute 桶的時間:
{"ts":"2018-01-01T01:03:00Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
對於批量任務,必須定義時間範圍。在時間範圍以外的輸入數據將不被攝入。
這個時間範圍也在 granularitySpec
中指定:
"dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } }
如今咱們已經完成了 dataSchema
的定義。接下來要作的就是將咱們建立的 dataSchema
放入一個數據攝入任務中,並指定輸入的源。
dataSchema
適用於全部類型的任務,但每種任務類型都有自生的規範格式。在本教程中,咱們使用本地批量數據攝入任務類型(the native ingestion task):
{ "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } } } }
如今,讓咱們來定義咱們本身的輸入源,它在 ioConfig
對象中指定。每種任務類型都有本身的 ioConfig
類型。爲了讀取輸入數據,咱們須要指定一個 inputSource
。咱們以前保存的網絡流量數據須要從一個本地文件讀取,其配置以下:
"ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" } }
由於咱們的數據是 JSON 字符串形式的,咱們使用 inputFormat
json
格式化數據(還支持 csv、protobuf 等數據類型):
"ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" }, "inputFormat" : { "type" : "json" } }
{ "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } }, "ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" }, "inputFormat" : { "type" : "json" } } } }
每個攝取任務都有 tuningConfig
配置項,它容許用戶調整各類攝取參數。
舉例來講,讓咱們添加一個tuningConfig
,以設置本次批量攝取任務的目標 segment 大小:
"tuningConfig" : { "type" : "index_parallel", "maxRowsPerSegment" : 5000000 }
如今咱們已經定義完成了一個攝取規範,它如今看起來以下所示:
{ "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "ingestion-tutorial", "timestampSpec" : { "format" : "iso", "column" : "ts" }, "dimensionsSpec" : { "dimensions": [ "srcIP", { "name" : "srcPort", "type" : "long" }, { "name" : "dstIP", "type" : "string" }, { "name" : "dstPort", "type" : "long" }, { "name" : "protocol", "type" : "string" } ] }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "longSum", "name" : "packets", "fieldName" : "packets" }, { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }, { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "HOUR", "queryGranularity" : "MINUTE", "intervals" : ["2018-01-01/2018-01-02"], "rollup" : true } }, "ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/", "filter" : "ingestion-tutorial-data.json" }, "inputFormat" : { "type" : "json" } }, "tuningConfig" : { "type" : "index_parallel", "maxRowsPerSegment" : 5000000 } } }
在包根目錄下,運行下面命令:
bin/post-index-task --file quickstart/ingestion-tutorial-index.json --url http://localhost:8081
腳本執行完成後,咱們來查詢數據。
讓咱們運行 bin/dsql
併發送一個 select * from "ingestion-tutorial";
語句,查詢已經被寫入的數據:
$ bin/dsql Welcome to dsql, the command-line client for Druid SQL. Type "\h" for help. dsql> select * from "ingestion-tutorial"; ┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐ │ __time │ bytes │ cost │ count │ dstIP │ dstPort │ packets │ protocol │ srcIP │ srcPort │ ├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤ │ 2018-01-01T01:01:00.000Z │ 6000 │ 4.9 │ 3 │ 2.2.2.2 │ 3000 │ 60 │ 6 │ 1.1.1.1 │ 2000 │ │ 2018-01-01T01:02:00.000Z │ 9000 │ 18.1 │ 2 │ 2.2.2.2 │ 7000 │ 90 │ 6 │ 1.1.1.1 │ 5000 │ │ 2018-01-01T01:03:00.000Z │ 6000 │ 4.3 │ 1 │ 2.2.2.2 │ 7000 │ 60 │ 6 │ 1.1.1.1 │ 5000 │ │ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │ 2 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │ │ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │ 1 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │ └──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘ Retrieved 5 rows in 0.12s. dsql>
若是以爲閱讀後對你有幫助,但願分享、點贊、在看三連哦。
關注 【碼哥字節】解鎖更多硬核。
推薦閱讀
如下幾篇文章閱讀量與讀者反饋都很好,推薦你們閱讀:
公衆號後臺回覆 」加羣「,加入讀者技術羣,裏面有阿里、騰訊的小夥伴一塊兒探討技術。