Druid 是 MetaMarket 公司研發,專爲海量數據集上的作高性能 OLAP (OnLine Analysis Processing)而設計的數據存儲和分析系統,目前Druid 已經在Apache基金會下孵化。Druid的主要特性:html
Druid常見應用的領域:java
有贊做爲一家 SaaS 公司,有不少的業務的場景和很是大量的實時數據和離線數據。在沒有是使用 Druid 以前,一些 OLAP 場景的場景分析,開發的同窗都是使用 SparkStreaming 或者 Storm 作的。用這類方案會除了須要寫實時任務以外,還須要爲了查詢精心設計存儲。帶來問題是:開發的週期長;初期的存儲設計很難知足需求的迭代發展;不可擴展。
在使用 Druid 以後,開發人員只須要填寫一個數據攝取的配置,指定維度和指標,就能夠完成數據的攝入;從上面描述的 Druid 特性中咱們知道,Druid 支持 SQL,應用 APP 能夠像使用普通 JDBC 同樣來查詢數據。經過有贊自研OLAP平臺的幫助,數據的攝取配置變得更加簡單方便,一個實時任務建立僅僅須要10來分鐘,大大的提升了開發效率。git
Druid 的架構是 Lambda 架構,分紅實時層( Overlord、 MiddleManager )和批處理層( Broker 和 Historical )。主要的節點包括(PS: Druid 的全部功能都在同一個軟件包中,經過不一樣的命令啓動):github
4.1 有贊 OLAP 平臺的主要目標:apache
4.2 有贊 OLAP 平臺架構安全
有贊 OLAP 平臺是用來管理 Druid 和周圍組件管理系統,OLAP平臺主要的功能:bash
OLAP 平臺採用的數據攝取方式是 Tranquility工具,根據流量大小對每一個 DataSource 分配不一樣 Tranquility 實例數量; DataSource 的配置會被推送到 Agent-Master 上,Agent-Master 會收集每臺服務器的資源使用狀況,選擇資源豐富的機器啓動 Tranquility 實例,目前只要考慮服務器的內存資源。同時 OLAP 平臺還支持 Tranquility 實例的啓停,擴容和縮容等功能。服務器
流式數據處理框架都會有時間窗口,遲於窗口期到達的數據會被丟棄。如何保證遲到的數據能被構建到 Segment 中,又避免實時任務窗口長期不能關閉。咱們研發了 Druid 數據補償功能,經過 OLAP 平臺配置流式 ETL 將原始的數據存儲在 HDFS 上,基於 Flume 的流式 ETL 能夠保證按照 Event 的時間,同一小時的數據都在同一個文件路徑下。再經過 OLAP 平臺手動或者自動觸發 Hadoop-Batch 任務,從離線構建 Segment。網絡
基於 Flume 的 ETL 採用了 HDFS Sink 同步數據,實現了 Timestamp 的 Interceptor,按照 Event 的時間戳字段來建立文件(每小時建立一個文件夾),延遲的數據能正確歸檔到相應小時的文件中。架構
隨着接入的業務增長和長期的運行時間,數據規模也愈來愈大。Historical 節點加載了大量 Segment 數據,觀察發現大部分查詢都集中在最近幾天,換句話說最近幾天的熱數據很容易被查詢到,所以數據冷熱分離對提升查詢效率很重要。Druid 提供了Historical 的 Tier 分組機制與數據加載 Rule 機制,經過配置能很好的將數據進行冷熱分離。 首先將 Historical 羣進行分組,默認的分組是"_default_tier",規劃少許的 Historical 節點,使用 SATA 盤;把大量的 Historical 節點規劃到 "hot" 分組,使用 SSD 盤。而後爲每一個 DataSource 配置加載 Rule :
{"type":"loadByPeriod","tieredReplicants":{"hot":1}, "period":"P30D"}
{"type":"loadByPeriod","tieredReplicants":{"_default_tier":2}, "period":"P6M"}
{"type":"dropForever"}
複製代碼
提升 "hot"分組集羣的 druid.server.priority 值(默認是0),熱數據的查詢都會落到 "hot" 分組。
Druid 架構中的各個組件都有很好的容錯性,單點故障時集羣依然能對外提供服務:Coordinator 和 Overlord 有 HA 保障;Segment 是多副本存儲在HDFS/S3上;同時 Historical 加載的 Segment 和 Peon 節點攝取的實時部分數據能夠設置多副本提供服務。同時爲了能在節點/集羣進入不良狀態或者達到容量極限時,儘快的發出報警信息。和其餘的大數據框架同樣,咱們也對 Druid 作了詳細的監控和報警項,分紅了2個級別:
Historical 集羣的部署和4.4節中描述的數據冷熱分離相對應,用 SSD 集羣存儲最近的N天的熱數據(可調節 Load 的天數),用相對廉價的 Sata 機型存儲更長時間的歷史冷數據,同時充分利用 Sata 的 IO 能力,把 Segment Load到不一樣磁盤上;在有贊有不少的收費業務,咱們在硬件層面作隔離,保證這些業務在查詢端有足夠的資源;在接入層,使用 Router 作路由,避免了 Broker 單點問題,也能很大的程度集羣查詢吞吐量;在 MiddleManager 集羣,除了部署有 Index 任務(內存型任務)外,咱們還混合部署了部分流量高 Tranquility 任務(CPU型任務),提升了 MiddleManager 集羣的資源利用率。
在有贊業務查詢方式通常是 SQL On Broker/Router,咱們發現一旦有少許慢查詢的狀況,客戶端會出現查詢不響應的狀況,並且鏈接愈來愈難獲取到。登陸到Broker 的服務端後發現,可用鏈接數量急劇減小至被耗盡,同時出現了大量的 TCP Close_Wait。用 jstack 工具排查以後發現有 deadlock 的狀況,具體的 Stack 請查看 ISSUE-6867。
通過源碼排查以後發現,DruidConnection爲每一個 Statement 註冊了回調。在正常的狀況下 Statement 結束以後,執行回調函數從 DruidConnection 的 statements 中 remove 掉本身的狀態;若是有慢查詢的狀況(超過最長鏈接時間或者來自客戶端的Kill),connection 會被強制關閉,同時關閉其下的全部 statements ,2個線程(關閉connection的線程和正在退出 statement 的線程)各自擁有一把鎖,等待對方釋放鎖,就會產生死鎖現象,鏈接就會被立刻耗盡。
// statement線程退出時執行的回調函數
final DruidStatement statement = new DruidStatement(
connectionId,
statementId,
ImmutableSortedMap.copyOf(sanitizedContext),
() -> {
// onClose function for the statement
synchronized (statements) {
log.debug("Connection[%s] closed statement[%s].", connectionId, statementId);
statements.remove(statementId);
}
}
);
複製代碼
// 超過最長鏈接時間的自動kill
return connection.sync(
exec.schedule(
() -> {
log.debug("Connection[%s] timed out.", connectionId);
closeConnection(new ConnectionHandle(connectionId));
},
new Interval(DateTimes.nowUtc(), config.getConnectionIdleTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
)
);
複製代碼
在排查清楚問題以後,咱們也向社區提了 PR-6868 。目前已經成功合併到 Master 分支中,將會 0.14.0 版本中發佈。若是讀者們也遇到這個問題,能夠直接把該PR cherry-pick 到本身的分支中進行修復。
目前比較經常使用的數據攝取方案是:KafkaIndex 和 Tranquility 。咱們採用的是 Tranquility 的方案,目前 Tranquility 支持了 Kafka 和 Http 方式攝取數據,攝取方式並不豐富;Tranquility 也是 MetaMarket 公司開源的項目,更新速度比較緩慢,很多功能缺失,最關鍵的是監控功能缺失,咱們不能監控到實例的運行狀態,攝取速率、積壓、丟失等信息。 目前咱們對 Tranquility 的實例管理支持啓停,擴容縮容等操做,實現的方式和 Druid 的 MiddleManager 管理 Peon 節點是同樣的。把 Tranquility 或者自研攝取工具轉換成 Yarn 應用或者 Docker 應用,就能把資源調度和實例管理交給更可靠的調度器來作。
Druid 目前並不沒有支持 JOIN查詢,全部的聚合查詢都被限制在單 DataSource 內進行。可是實際的使用場景中,咱們常常須要幾個 DataSource 作 JOIN 查詢才能獲得所需的結果。這是咱們面臨的難題,也是 Druid 開發團隊遇到的難題。
對於 C 端的 OLAP 查詢場景,RT 要求比較高。因爲 Druid 會在整點建立當前小時的 Index 任務,若是查詢正好落到新建的 Index 任務上,查詢的毛刺很大,以下圖所示:
咱們已經進行了一些優化和調整,首先調整 warmingPeriod 參數,整點前啓動 Druid 的 Index 任務;對於一些 TPS 低,可是 QPS 很高的 DataSource ,調大 SegmentGranularity,大部分 Query 都是查詢最近24小時的數據,保證查詢的數據都在內存中,減小新建 Index 任務的,查詢毛刺有了很大的改善。儘管如此,離咱們想要的目標仍是必定的差距,接下去咱們去優化一下源碼。
如今大部分 DataSource 的 Segment 粒度( SegmentGranularity )都是小時級的,存儲在 HDFS 上就是每小時一個Segment。當須要查詢時間跨度比較大的時候,會致使Query很慢,佔用大量的 Historical 資源,甚至出現 Broker OOM 的狀況。若是建立一個 Hadoop-Batch 任務,把一週前(舉例)的數據按照天粒度 Rull-Up 而且 從新構建 Index,應該會在壓縮存儲和提高查詢性能方面有很好的效果。關於歷史數據 Rull-Up 咱們已經處於實踐階段了,以後會專門博文來介紹。
最後打個小廣告,有贊大數據團隊基礎設施團隊,主要負責有讚的數據平臺 (DP), 實時計算 (Storm, Spark Streaming, Flink),離線計算 (HDFS, YARN, HIVE, SPARK SQL),在線存儲(HBase),實時 OLAP (Druid) 等數個技術產品,歡迎感興趣的小夥伴聯繫 zhaojiandong@youzan.com