基於ClickHouse造實時計算引擎,百億數據秒級響應!

前言

爲了可以實時地瞭解線上業務數據,京東算法智能應用部打造了一款基於ClickHouse的實時計算分析引擎,給業務團隊提供實時數據支持,並經過預警功能發現潛在的問題。java

本文結合了引擎開發過程當中對資源位數據進行聚合計算業務場景,對數據實時聚合計算實現秒級查詢的技術方案進行概述。ClickHouse是整個引擎的基礎,故下文首先介紹了ClickHouse的相關特性和適合的業務場景,以及最基礎的表引擎MergeTree。接下來詳細的講述了技術方案,包括Kafka數據消費到數據寫入、結合ClickHouse特性建表、完整的數據監控,以及從幾十億數據就偶現查詢超時到幾百億數據也能秒級響應的優化過程。nginx

ClickHouse

  • ClickHouse是Yandex公司內部業務驅動產出的列式存儲數據庫。爲了更好地幫助自身及用戶分析網絡流量,開發了ClickHouse用於在線流量分析,一步一步最終造成了如今的ClickHouse。在存儲數據達到20萬億行的狀況下,也能作到90%的查詢可以在1秒內返回結果。算法

  • ClickHouse可以實現實時聚合,一切查詢都是動態、實時的,用戶發起查詢的那一刻起,整個過程須要能作到在一秒內完成並返回結果。ClickHouse的實時聚合能力和咱們面對的業務場景很是符合。數據庫

  • ClickHouse支持完整的DBMS。支持動態建立、修改或刪除數據庫、表和視圖,能夠動態查詢、插入、修改或刪除數據。緩存

  • ClickHouse採用列式存儲,數據按列進行組織,屬於同一列的數據會被保存在一塊兒,這是後續實現秒級查詢的基礎。服務器

列式存儲可以減小數據掃描範圍,數據按列組織,數據庫能夠直接獲取查詢字段的數據。而按行存逐行掃描,獲取每行數據的全部字段,再從每一行數據中返回須要的字段,雖然只須要部分字段仍是掃描了全部的字段,按列存儲避免了多餘的數據掃描。微信

另外列式存儲壓縮率高,數據在網絡中傳輸更快,對網絡帶寬和磁盤IO的壓力更小。網絡

除了完整的DBMS、列式存儲外,還支持在線實時查詢、擁有完善的SQL支持和函數、擁有多樣化的表引擎知足各種業務場景。架構

正由於ClickHouse的這些特性,在它適合的場景下可以實現動態、實時的秒級別查詢。app

適合的場景

讀多於寫。數據一次寫入,屢次查詢,從各個角度對數據進行挖掘,發現數據的價值。

大寬表,讀大量行聚合少許列。選擇少許的維度列和指標列,對大寬表的數據作聚合計算,得出少許的結果集。

數據批量寫入,不須要常常更新、刪除。數據寫入完成後,相關業務不要求常常對數據更新或刪除,主要用於查詢分析數據的價值。

ClickHouse適合用於商業智能領域,普遍應用於廣告流量、App流量、物聯網等衆多領域。藉助ClickHouse能夠實時計算線上業務數據,如資源位的點擊狀況,以及並對各資源位進行bi預警。

MergeTree

MergeTree系列引擎是最基礎的表引擎,提供了主鍵索引、數據分區等基本能力。瞭解這部份內容,是後續開發和優化的基礎和方向。

分區

指定表數據分區方式,支持多個列,但單個列分區查詢效果最好。有數據寫入時屬於同一分區的數據最終會被合併到同一個分區目錄,不一樣分區的數據永遠不會被合併在一塊兒。結合業務場景設置合理的分區能夠減小查詢時數據文件的掃描範圍。

排序

在一個數據片斷內,數據以何種方式排序。當使用多個字段排序時ORDER BY(T1,T2),先按照T1排序,相同值再按照T2排序。

MergeTree存儲結構

一張數據表的完整物理結構依次是數據表、分區以及各分區下具體的數據文件。分區下具體的數據文件包括一級索引、每列壓縮文件、每列字段標記文件,瞭解他們的存儲和查詢原理,爲後面建表、聚合計算的優化提供方向。

  • 一級索引文件,存放稀疏索引,經過ORDER BY或PRIMARY KEY聲明,使用少許的索引可以記錄大量數據的區間位置信息,內容生成規則跟排序字段有關,且索引數據常駐內存,取用速度快。藉助稀疏索引,能夠排除主鍵範圍外的數據文件,從而有效減小數據掃描範圍,加速查詢速度;

  • 每列壓縮數據文件,存儲每一列的數據,每一列字段都有獨立的數據文件;

  • 每列字段標記文件,每一列都有對應的標記文件,保存了列壓縮文件中數據的偏移量信息,與稀疏索引對齊,又與壓縮文件對應,創建了稀疏索引與數據文件的映射關係。不能常駐內存,使用LRU緩存策略加快其取用速度。

在讀取數據時,需經過標記數據的位置信息纔可以找到所須要的數據,分爲讀取壓縮數據塊和讀取數據塊兩個步驟。

掌握數據存儲和查詢的過程,後續建表和查詢有理論支持。

1)數據寫入

每批數據的寫入,都會生成一個新的分區目錄,後續會異步的將相同分區的目錄進行合併。按照索引粒度,會分別生成一級索引文件、每一個字段的標記和壓縮數據文件。寫入過程以下圖:

【數據寫入】

2)查詢過程

查詢過程經過指定WHERE條件,不斷縮小數據範圍。藉助分區能找到數據所在的數據塊,一級索引查找具體的行數區間信息,從標記文件中獲取數據壓縮文件中的壓縮文件信息。查詢過程以下圖:

【數據查詢】

查詢語句若是沒有匹配到任務索引,會掃描全部分區目錄,這種操做給整個集羣形成較大壓力。

引用官方文檔中的例子對查詢過程進行說明。以(CounterID, Date) 爲主鍵,排序好的索引的圖示會是下面這樣:

【索引圖式】
  • 指定查詢以下:

  • CounterID in ('a', 'h'),服務器會讀取標記號在[0, 3)和[6, 8) 區間中的數據。

  • CounterID IN ('a', 'h') AND Date = 3,服務器會讀取標記號在[1, 3)和[7, 8)區間中的數據。

  • Date = 3,服務器會讀取標記號在[1, 10]區間中的數據。

ClickHouse支持集羣部署,在查詢分佈式表時,集羣會將每一個節點的數據進行合併,獲得全部節點的數據後返回結果。MergeTree系列表引擎支持副本,如ReplicatedMergeTree表引擎建表存放明細數據,接下來介紹的兩種表引擎都繼承自MergeTree,但又有各自的特殊功能。

  • ReplacingMergeTree實現數據去重

在建表時設置ORDER BY排序字段做爲判斷重複數據的惟一鍵,在合併分區的時候會觸發刪除重複數據,可以必定程度上解決數據重複的問題。

  • AggregatingMergeTree

在合併分區的時候按照定義的條件聚合數據,將須要聚合的數據預先計算出來,在聚合查詢時直接使用結果數據,以空間換時間的方法提升查詢性能。該引擎須要使用AggregateFunction類型來處理全部列。

瞭解了ClickHouse相關內容後,接下來將介紹完整的技術方案。

技術方案及查詢優化

資源位的數據來源包括Kafka的實時數據和hdfs裏面存儲的離線數據。實時數據經過Flink實時任務寫入ClickHouse,離線數據經過創建MapReduce定時任務寫入ClickHouse。

架構圖

實時數據入庫

實時數據從實時數據到寫入CK過程:

  • 各業務線產生的實時數據寫入kafka通道,根據數據量分配不一樣的分區個數。

  • 建立的flink任務對各個業務的kafka數據進行消費,每一個業務處理過程會有所不一樣。通常包括過濾算子、數據加工算子、寫入算子。

過濾算子,過濾掉不須要的數據,這個步驟很是重要,設置嚴格的數據評估標準,防止髒數據、不符合規則的數據寫入集羣。另外對髒數據的過濾要作好記錄,在數據完整性測試過程當中會用到。

數據加工算子,主要負責從實時數據流中解析出業務須要的數據,這個過程也要設置嚴格的校驗邏輯,保證數據整潔;若涉及數據加工邏輯更新,要保證加工邏輯及時更新。

寫入算子,採用批量寫入方式,根據集羣狀況,設置合理的批次,實時查詢和寫入性能達到均衡。

寫入ck過程能夠經過域名鏈接分佈式表,也能夠經過nginx進程掌握一份集羣機器IP列表,每一個nginx進程本身輪詢,均衡寫入集羣的每臺機器,但須要保證寫入ClickHouse的QPS不能過小,防止出現寫入不均衡狀況。

離線數據入庫

  • 離線數據創建定時任務,將hive表中的數據加工好,經過創建MapReduce定時任務,將加工後的數據寫入ClickHouse。

  • 離線數據入庫過程一樣包括過濾、數據加工、寫入ClickHouse過程。

批量寫入

在前面merge章節有介紹,每次數據寫入都會產生臨時分區目錄,後續會異步的將相同分區的目錄進行合併。寫入過程會消耗集羣的資源,因此必定採用批量寫入方式,每批次寫入條數看集羣和數據狀況(1萬、5萬、10萬每批次可做爲參考)。採用JDBC方式實現批量寫入程序以下:

JDBC驅動,可使用官方提供的驅動程序:

<dependency> 
        <groupId>ru.yandex.clickhouse</groupId> 
        <artifactId>clickhouse-jdbc</artifactId> 
        <version>0.2.4</version> 
    </dependency>

初始化Connection:

Class.forName(Ck.DRIVER); 
 Connection connection = DriverManager.getConnection(Ck.URL, Ck.USERNAME, Ck.PASSWORD); 
   connection.setAutoCommit(false);

批量寫入:

PreparedStatement state = null; 
   try { 
               state = connection.prepareStatement(INSERT_SQL); 
               for(控制寫入批次) 
               { 
                   state.set...(index, value); 
                   state.addBatch(); 
               } 
               state.executeBatch(); 
               connection.commit(); 
           }catch (SQLException e) {

建表

在開始建表前,對業務進行充分理解,瞭解集羣數據的查詢場景,在建表時規劃好分區字段和排序規則,這個過程很是重要,是集羣查詢性能良好的基礎。

例如咱們面臨的業務場景爲,計算移動App每一個點擊按鈕聚合PV和UV(須要去重),按天或者小時聚合計算,還有商品各類屬性聚合計算的PV和UV。

選擇分區字段。正如前面MergeTree章節介紹,ClickHouse支持分區,分區字段是每張表整個數據目錄最外層結構,能夠很大程度加快查詢速度。

另外分區字段不易過多,分區過多就意味着數據目錄更加複雜,在進行聚合計算時,確定會影響整個集羣的查詢性能。目前咱們遇到的業務場景,適合以時間字段(時分秒)來做爲分區字段,toYYYYMMDD(ts)。

設置排序規則。數據會按照設置的排序字段前後順序來進行存儲,在進行聚合計算時也會按照聚合條件對相鄰數據進行計算,但若是聚合條件不在排序字段裏,集羣會對當前分區的全部數據掃描一遍,這種查詢就會慢不少,大量消耗集羣的內存、CPU資源。咱們應該避免這種狀況出現,設置合理的排序規則才能以最快的速度聚合出咱們想要的結果。

當前業務場景下,咱們能夠選擇表明各個按鈕的id和商品的屬性做爲排序字段。在進行聚合查詢時,where條件下選擇分區,排序規則卡出來須要的數據,可以很大程度提升查詢速度。

因此在建表階段就要充分了解將來的查詢場景,選擇合適的分區字段和排序規則。

另外,建表時候最重要的是選擇合適的表引擎,每種表引擎的使命都不一樣,根據自身業務選擇出最合適表引擎。當前業務場景咱們能夠選擇ReplicatedMergeTree引擎存明細數據。

建表實例:

CREATE TABLE table_name 
    ( 
        Event_ts DateTime, 
        T1 String, 
        T2 UInt32, 
        T3 String 
    ) ENGINE = ReplicatedMergeTree('/clickhouse/ck.test/tables/{layer}-{shard}/table_name', '{replica}') 
    PARTITION BY toYYYYMM(Event_ts) 
    ORDER BY (T1, T2)

進行到這裏,完成了建表和數據寫入,集羣的查詢速度通常仍是能夠的,在集羣硬件還不差的狀況下知足每次10幾億的數據的聚合查詢沒有問題,固然前提是是選擇了分區和卡排序字段的基礎上。

但數據再進一步多到百億甚至近千億數據,只是簡單的設置分區和優化排序字段是很難作到實時秒級查詢了。

查詢優化

雖然在查詢時卡了分區和排序條件,但隨着存儲的數據量增多,ClickHouse集羣的查詢壓力會逐漸增長,出現查詢速度慢狀況。若是有大SQL請求發給了集羣,會形成整個集羣的CPU和內存升高,直到把整個集羣內存打滿,集羣基本會處於癱瘓狀態。對查詢進行優化很是重要。

排查耗時SQL。耗時的SQL對整個集羣形成很大的壓力,要先找到解決耗時SQL的優化方案。當前業務場景下,能很容易發現聚合計算UV(去重)是比較消耗集羣資源的。

對於聚合結果的場景,咱們屢次嘗試優化方案後,經過創建物化視圖,以空間換取時間,大部分聚合查詢速度能提升10幾倍。創建物化視圖一樣要先去了解業務場景,選擇分區字段、ORDER BY字段,並選擇count、sum、uniq等聚合函數。

物化視圖建表語句:

CREATE MATERIALIZED VIEW test_db.app_hp_btn_event_test ON CLUSTER test_cluster ENGINE = ReplicatedAggregatingMergeTree( '/clickhouse/ck.test/tables/{layer}-{shard}/test_db/app_hp_btn_event_test', '{replica}') PARTITION BY toYYYYMMDD(time) ORDER BY(btn_id,cate2) TTL time + toIntervalDay(3) SETTINGS index_granularity = 8192 
    AS 
        SELECT 
            toStartOfHour(event_time) AS time, 
            btn_id, 
            countState(uid) PV, 
            uniqState(uid) AS UV 
        FROM 
            test_db.app_hp_btn_event_test 
        GROUP BY 
            btn_id, 
            toStartOfHour(event_time)

查詢實例:

hour from test_db.app_hp_btn_event_test where toYYYYMMDD(time) = 20200608 group by hour

避免明細數據join。ClickHouse更適合大寬表數據聚合查詢,對於明細數據join的場景儘可能避免出現。

集羣硬件升級。軟件的優化老是有限的,觀察集羣的CPU、內存、硬盤狀況,集羣的平常CPU、內存較高時,及時升級機器。

數據監控報警

完善的監控體系讓咱們及時得知引擎異常,同時也能時刻觀測數據寫入查詢狀況,掌握整個引擎的運行狀況。

  • 數據從消費到寫入各個階段異常信息。主要包括java.lang.NullPointerException、java.lang.ArrayIndexOutOfBoundsException等異常信息,大部分是由於數據源有所調整引發;

  • 各個階段添加報警功能,Kafka添加積壓報警、核心算子計算邏輯添加異常報警、ck集羣在mdc系統添加硬盤、cpu、內存預警;

  • Grafana查詢系統。主要包括CPU、內存、硬盤使用狀況;

  • 大SQL監控。查詢耗時SQL和沒有卡分區和排序字段的查詢。

最後

ClickHouse自身有處理萬億數據的能力。在掌握了它的存儲、查詢、MergeTree原理後,建立符合業務要求的數據庫表,執行符合ClickHouse特性的查詢SQL,實現1000億數據的秒級聚合查詢並非難事。

ClickHouse還有不少特性,須要在開發過程當中不斷地摸索和嘗試。

【編輯推薦】

  1. 8月份Github上熱門的Python開源項目

  2. 學會Python後都能作什麼?網友們的回答簡直不要太厲害

  3. 詳解 Python 的二元算術運算,爲何說減法只是語法糖?

  4. Python機器學習教程

  5. 微信羣總有人發廣告?用Python寫一個自動化機器人消滅他

【責任編輯:未麗燕 TEL:(010)68476606】

相關文章
相關標籤/搜索