從Spec編寫深刻了解 Druid

本文參考 Druid 官方文檔。

Apache Druid 是一個集時間序列數據庫、數據倉庫和全文檢索系統特色於一體的分析性數據平臺(OLAP)。Druid 做爲一個高可用、高性能和多特性的 OLAP 平臺,使用場景豐富。mysql

image

許多互聯網公司基於 Druid 搭建 OLAP 數據分析和 BI 平臺。如:sql

此前 Druid 系列文章已經詳解過 Druid 的特性、使用場景、架構和實現原理。能夠參考:shell

本文將指導讀者完整定義一個完整 Spec,並指出關鍵注意事項。Spec 是 Druid 數據攝入的配置信息,使用 json 格式,使用 Druid 時能夠經過界面配置,最後生成 Spec 文件,也能夠直接編寫 Spec 文件,而後上傳配置。不管使用哪一種方式,深刻了解 Spec 的編寫既是開始使用 Druid 的第一步,也是深刻了解 Druid 各類概念,繼而深刻了解 Druid 原理的必經之路。數據庫

示例數據

假設咱們有如下網絡流量數據:json

  • srcIP: 發送端 IP 地址
  • srcPort: 發送端端口號
  • dstIP: 接收端 IP 地址
  • dstPort: 接收端端口號
  • protocol: IP 協議號
  • packets: 傳輸的包的數量
  • bytes: 傳輸字節數
  • cost: 傳輸耗費的時間
{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4}
{"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1}
{"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4}
{"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9}
{"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2}
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
{"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4}
{"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5}
{"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}

將上面 JSON 內容保存到 $druid_root\quickstart\目錄下的 ingestion-tutorial-data.json 文件中。網絡

下面咱們開始編寫一個 Spect 將上面的數據寫入 Druid。架構

在本教程中,咱們將使用本地批處理indexing任務。若是使用其餘任務類型,攝入規範的某些地方將會不同,咱們將在教程中指出。併發

下面將詳細講解 Spec 配置,你將瞭解如下內容:高併發

image

定義 schema

Druid 攝入 spec 的核心元素是 dataSchemadataSchema 定義如何將輸入的數據解析成 Druid 可以存儲的列集合。post

咱們從一個空的dataSchema 開始,並按教程一步步添加字段。

quickstart/ 目錄下建立 ingestion-tutorial-index.json 文件,將如下內容寫入文件:

"dataSchema" : {}

隨着教程的進行,咱們將不斷的修改此 spec 文件。

數據源名稱

數據源名稱經過dataSchema 下的 dataSource 參數指定。dataSource 相似於 RDBMS 的 Table Name,寫入的數據經過此名稱查詢,如:select * from $dataSource

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
}

讓咱們將教程中的數據源命名爲 ingestion-tutorial

時間列

dataSchema 須要知道如何從輸入的數據中提取主時間字段。Druid 的數據必須有時間字段,Druid 底層按時間分 segment 來存儲數據,詳情能夠參考《Apache Druid 的集羣設計與工做流程》

咱們數據中的時間戳列是"ts",它是一個 ISO 8601 規範的時間戳,咱們將配置此字段的 timestampSpec信息加到 dataSchema 下:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  }
}

列類型

如今,咱們已經定義了時間列,讓咱們看一下其餘列的定義。

Druid 支持如下列類型:String,Long,Float,Double。下面章節中咱們將看到這些類型如何被使用。

在咱們講如何定義其餘非時間列以前,先討論一下 rollup

Rollup

在攝入數據時,咱們須要考慮是否須要 rollup。

  • 若是開啓 rollup,須要將輸入數據列分紅兩種類型,維度(dimension)和指標(metric)。維度是 rollup 的 grouping 列(用於 group by,filtering),指標是被聚合計算的列。
  • 若是不開啓 rollup,全部列都被視爲維度,將不會進行預聚合。

在此教程中,咱們開啓 rollup。在 dataSchemagranularitySpec中指定:

"dataSchema" : {
 "dataSource" : "ingestion-tutorial",
 "timestampSpec" : {
   "format" : "iso",
   "column" : "ts"
 },
 "granularitySpec" : {
   "rollup" : true
 }
}

選擇維度和指標

在此教程中,咱們按如下方式劃分維度列和指標列:

  • Dimensions: srcIP, srcPort, dstIP, dstPort, protocol
  • Metrics: packets, bytes, cost

這些維度是一組屬性,用以標識一組網絡流量數據,而指標表明按此維度組合的網絡流量的實際狀況。

讓咱們看看如何在 spec 中定義維度和指標吧。

維度

維度由 dataSchema 中的 dimensionsSpec 參數指定。

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "granularitySpec" : {
    "rollup" : true
  }
}

每一個維度都有一個 name 和 一個 type ,其中 type 能夠是"long", "float", "double", or "string"。

注意,srcIP 是一個 "string" 維度。對於字符串維度,只須要指定維度的名稱就能夠了,由於它的類型默認爲"string"。

也請注意, protocol 在輸入數據中是數字類型,但咱們以 "string" 列類型提取它,因此 Druid 在攝入數據時會將其強制由 long 類型轉換成 string 類型。

Strings vs Numbers

數字類型的數據應該做爲數字維度仍是字符串維度?

數字維度相對於字符串維度有如下優點和劣勢:

  • 優點:數字須要更小的存儲空間,而且在讀取該列時須要更小的開銷。
  • 劣勢:數字維度沒有索引,因此按此列 filter 的操做會比字符串類型的維度(這種維度有索引)更慢。

指標

指標經過 dataSchema 中的 metricsSpec 參數指定:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "rollup" : true
  }
}

在定義指標時,必須指定當前列在 rollup 時應該執行的聚合類型。

這裏,咱們在 packetsbytes 兩個指標列上定義了 long 類型的 sum 聚合,在 cost 列上定義了一個 double 的 sum 聚合。

注意 metricsSpecdimensionSpecparseSpec 的嵌套層級不同。它和 dataSchema 中的 parser在同一嵌套層級。

注意,咱們也定義了一個 count 聚合器。這個計數聚合器將統計原始數據攝入的行數。

No rollup

若是咱們不使用 rollup,將在 dimensionsSpec 中指定全部列,如:

"dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" },
          { "name" : "packets", "type" : "long" },
          { "name" : "bytes", "type" : "long" },
          { "name" : "srcPort", "type" : "double" }
        ]
      },

定義粒度

此時,咱們已經完成了 parsermetricSpec 的定義,並幾乎要完成 Spec 的 dataSchema

咱們還須要在 granularitySpec 中設置一些額外的參數:

  • granularitySpec 類型:支持兩種類型——uniformarbitrary 。在本教程中,咱們將使用 uniform,這樣全部的 segment 將有統一的時間範圍大小(本示例中,全部 segment 覆蓋一個小時的數據量)。
  • segment 粒度:設置單個 segment 應該包含多大時間範圍的數據,如:DAYWEEK
  • 時間列中時間戳的 buckting 粒度(稱爲查詢粒度 queryGranularity )。

Segment 粒度

segment 粒度經過 granularitySpec 中的 segmentGranularity 屬性配置。此文檔中,咱們將建立 hourly 粒度的 segment:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "rollup" : true
  }
}

咱們的輸入數據包含兩個小時的事件,因此此任務將生成兩個 segment。

查詢粒度

查詢粒度經過 granularitySpec 中的 queryGranularity 屬性配置。此教程中,咱們使用 minute 級粒度:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "queryGranularity" : "MINUTE",
    "rollup" : true
  }
}

爲了查看查詢粒度配置的效果,讓咱們從原始輸入數據中查看這一行:

{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}

當使用 minute 粒度攝入這行數據時,Druid 將把這行數據的時間戳 floor 成 minute 桶的時間:

{"ts":"2018-01-01T01:03:00Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}

定義時間範圍

對於批量任務,必須定義時間範圍。在時間範圍以外的輸入數據將不被攝入。

這個時間範圍也在 granularitySpec 中指定:

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "queryGranularity" : "MINUTE",
    "intervals" : ["2018-01-01/2018-01-02"],
    "rollup" : true
  }
}

定義任務類型

如今咱們已經完成了 dataSchema 的定義。接下來要作的就是將咱們建立的 dataSchema 放入一個數據攝入任務中,並指定輸入的源。

dataSchema 適用於全部類型的任務,但每種任務類型都有自生的規範格式。在本教程中,咱們使用本地批量數據攝入任務類型(the native ingestion task):

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    }
  }
}

定義輸入源

如今,讓咱們來定義咱們本身的輸入源,它在 ioConfig 對象中指定。每種任務類型都有本身的 ioConfig 類型。爲了讀取輸入數據,咱們須要指定一個 inputSource。咱們以前保存的網絡流量數據須要從一個本地文件讀取,其配置以下:

"ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      }
    }

定義數據格式

由於咱們的數據是 JSON 字符串形式的,咱們使用 inputFormat json 格式化數據(還支持 csv、protobuf 等數據類型):

"ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    }
{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    }
  }
}

其餘調整

每個攝取任務都有 tuningConfig 配置項,它容許用戶調整各類攝取參數。

舉例來講,讓咱們添加一個tuningConfig,以設置本次批量攝取任務的目標 segment 大小:

"tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000
    }

最終 spec

如今咱們已經定義完成了一個攝取規範,它如今看起來以下所示:

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000
    }
  }
}

提交任務並查詢數據

在包根目錄下,運行下面命令:

bin/post-index-task --file quickstart/ingestion-tutorial-index.json --url http://localhost:8081

腳本執行完成後,咱們來查詢數據。

讓咱們運行 bin/dsql 併發送一個 select * from "ingestion-tutorial"; 語句,查詢已經被寫入的數據:

$ bin/dsql
Welcome to dsql, the command-line client for Druid SQL.
Type "\h" for help.
dsql> select * from "ingestion-tutorial";

┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐
│ __time                   │ bytes │ cost │ count │ dstIP   │ dstPort │ packets │ protocol │ srcIP   │ srcPort │
├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │  6000 │  4.9 │     3 │ 2.2.2.2 │    3000 │      60 │ 6        │ 1.1.1.1 │    2000 │
│ 2018-01-01T01:02:00.000Z │  9000 │ 18.1 │     2 │ 2.2.2.2 │    7000 │      90 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T01:03:00.000Z │  6000 │  4.3 │     1 │ 2.2.2.2 │    7000 │      60 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │     2 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
│ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │     1 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
└──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘
Retrieved 5 rows in 0.12s.

dsql>

若是以爲閱讀後對你有幫助,但願分享、點贊、在看三連哦。

關注 【碼哥字節】解鎖更多硬核。

推薦閱讀

如下幾篇文章閱讀量與讀者反饋都很好,推薦你們閱讀:

公衆號後臺回覆 」加羣「,加入讀者技術羣,裏面有阿里、騰訊的小夥伴一塊兒探討技術。

MageByte

相關文章
相關標籤/搜索