360 政企安全集團基於 Flink 的 PB 級數據即席查詢實踐

簡介:Threat Hunting 平臺的架構與設計,及以下降 IO 爲目標的優化與探索。爲何以及如何使用塊索引。前端

本文整理自 360 政企安全集團的大數據工程師蘇軍以及劉佳在 Flink Forward Asia 2020 分享的議題《基於 Flink 的 PB 級數據即席查詢實踐》,文章內容爲:git

  1. Threat Hunting 平臺的架構與設計(蘇軍)
  2. 以下降 IO 爲目標的優化與探索(劉佳)
  3. 將來規劃

GitHub 地址
https://github.com/apache/flink
歡迎你們給 Flink 點贊送 star~github

 title=

首先作一個簡單的我的以及團隊介紹。咱們來自 360 政企安全集團,目前主要從事 360 安全大腦的 「威脅狩獵「 項目的開發工做。咱們團隊接觸 Flink 的時間比較早,在此期間,咱們基於 Flink 開發出了多款產品,並在 2017 年和 2019 年參加了於柏林舉辦的 Flink Forward 大會,分別介紹了咱們的 「UEBA」 以及 「AutoML」 兩款產品。算法

 title=

本次分享主要分爲兩塊內容:數據庫

  • 第一部分 「Threat Hunting 平臺的架構與設計」 將由蘇軍來爲你們分享;
  • 第二部分 「以下降 IO 爲目標的優化與探索」 將由劉佳來爲你們分享。

1、Threat Hunting 平臺的架構與設計 (蘇軍)

第一部份內容大體分爲三個部分,分別是:apache

  • 平臺的演進
  • 架構設計
  • 深刻探索索引結構

1. 平臺的演進

 title=

咱們認爲全部技術的演化和革新都須要具體的商業問題來驅動,如下是咱們團隊近幾年基於 Flink 開發的幾款產品:segmentfault

  • 2017 年咱們基於 Flink DataStream 開發了用戶行爲分析系統 UEBA,它是經過接入企業 IT 拓撲的各種行爲數據,好比身份認證數據、應用系統訪問數據、終端安全數據、網絡流量解析數據等等,以用戶 / 資產爲核心來進行威脅行爲的實時檢測,最後構建出用戶威脅等級和畫像的系統;
  • 2018 年基於 UEBA 的實施經驗,咱們發現安全分析人員每每須要一種手段來獲取安全事件對應的原始日誌,去進一步確認安全威脅的源頭和解決方式。因而咱們基於 Spark 開發了 HQL 來解決在離線模式下的數據檢索問題,其中 HQL 能夠認爲是表達能力比 SQL 更加豐富的查詢語言,大體能夠看做是在 SQL 能力的基礎上增長了算法類算;
  • 2019 年隨着離線 HQL 在客戶那邊的使用,咱們發現其自己就可以快速定義安全規則,構建威脅模型,若是在離線模式下寫完語句後直接發佈成在線任務,會大大縮短開發週期,加上 Flink SQL 能力相對完善,因而咱們基於 Flink SQL + CEP 來升級了 HQL 的能力,產生了 HQL RealTime 版本;
  • 2020 年隨着客戶數據量的增大,不少已經達到了 PB 級,過往的解決方案致使離線的數據檢索性能遠遠低於預期,安全分析人員習慣使用 like 和全文檢索等模糊匹配操做,形成查詢延時很是大。因而從今年開始,咱們着重優化 HQL 的離線檢索能力,並推出了全新的 Threat Hunting 平臺。

 title=

經過調查發現,擁有 PB 級數據規模的客戶每每有如下幾個商業需求:api

  • 第一是低成本的雲原生架構。咱們知道目前大部分的大數據架構都是基於 hadoop 的,其特色是數據就在計算節點上,可以減小大量網絡開銷,加速計算性能。可是整個集羣爲了作到資源均衡,每每須要相同的資源配置,且爲了可以存儲儘可能多的數據,集羣規模會很大, 因此這類架構在前期須要投入大量硬件成本。緩存

    而存算分離和彈性計算則可以解決這一問題,由於磁盤的價格是遠低於內存和 CPU 的,因此用廉價的磁盤存儲搭配低配 CPU 和內存來存儲數據,用少許高配機器來作計算,能夠在很大程度上下降成本。安全

  • 第二是低延時的查詢響應。安全分析人員在作威脅檢測時,大部分時間是即席查詢,即經過過濾、join 來作數據的檢索和關聯。爲了可以儘快的獲取查詢結果,對應的技術方案是:列存/索引/緩存。

    • 列存不用多說了,是大數據領域常見的存儲方案;
    • 在列存的基礎上,高效的索引方案可以大量下降 io,提升查詢性能;
    • 而存算分析帶來的網絡延時能夠由分佈式緩存來彌補。
  • 第三是須要豐富的查詢能力,其中包括單行的 fields/filter/udf 等,多行的聚合 /join,甚至算法類的分析能力,這部分咱們主要依賴於本身開發的分析語言 HQL 來提供。

2. 架構設計

 title=

首先,數據是來自於已經存儲在 ES 中的歷史數據和 kafka 裏的實時數據,其中 ES 裏的歷史數據咱們經過本身開發的同步工具來同步,kafka 裏的實時數據咱們則經過 Streaming File Sink 寫 orc 文件到存儲集羣。在數據同步的同時,咱們會將這批數據的索引信息更新到數據庫中。

安全分析人員會從前端頁面經過寫交互式分析語言 HQL 發起數據檢索的請求,此時請求會進入調度系統,一旦開始執行做業,首先會將分析語句解析成算子列表,算子緩存算法會判斷該次查詢是否能夠命中緩存系統中已有的緩存數據。

  • 若是分析語句的輸入是已經算好而且 cache 好了的中間結果,那麼直接讀取緩存來繼續計算;
  • 若是不能命中,證實咱們必須從 orc 文件開始從新計算。

咱們會先提取出查詢語言的過濾條件或者是 Join 條件來作謂詞下推,進入索引數據庫中得到目前符合該查詢的文件列表,隨後將文件列表交給計算引擎來進行計算。計算引擎咱們採用雙引擎模式,其中複雜度高的語句咱們經過 Flink 引擎來完成,其它較爲簡單的任務咱們交給平臺內部的 「蜂鳥引擎」。「蜂鳥引擎」 基於 Apache arrow 作向量化執行,加上 LLVM 編譯,查詢延遲會很是小。

因爲整個系統的存算分離,爲了加速數據讀取,咱們在計算集羣節點上增長了 alluxio 來提供數據緩存服務,其中不只緩存 remote cluster 上的數據,同時會緩存部分歷史做業結果,經過算子緩存的算法來加速下次計算任務。

這裏還須要強調兩點:

  • 第一點是索引數據庫會返回一批符合該條件的文件列表,若是文件列表很是大的話,當前的 Flink 版本在構建 job graph 時,在獲取 Filelist Statistics 邏輯這裏在遍歷大量文件的時候,會形成長時間沒法構建出 job graph 的問題。目前咱們對其進行了修復,後期會貢獻給社區。
  • 第二點是數據緩存那一塊,咱們的 HQL 以前是經過 Spark 來實現的。用過 Spark 的人可能知道,Spark 會把一個 table 來作 cache 或 persist。咱們在遷移到 Flink 的時候,也沿用了這個算子。Flink 這邊咱們本身實現了一套,就是用戶在 cache table 時,咱們會把它註冊成一個全新的 table source,後面在從新讀取的時候只會用這個新的 table source 來打通整個流程。

3. 深刻探索索引結構

 title=

數據庫爲了加速數據檢索,咱們每每會事先爲數據建立索引,再在掃描數據以前經過索引定位到數據的起始位置,從而加速數據檢索。而傳統數據庫常見的是行索引,經過一個或若干字段建立索引,索引結果以樹形結構存儲,此類索引可以精確到行級別,索引效率最高。

某些大數據項目也支持了行索引,而它所帶來的弊端就是大量的索引數據會形成寫入和檢索的延時。而咱們平臺處理的是機器數據,例如終端/網絡這類數據,它的特色是重複度很是高,而安全分析的結果每每很是少,極少數的威脅行爲會隱藏在海量數據裏,佔比每每會是 1/1000 甚至更少。

因此咱們選擇性價比更高的塊索引方案,已經可以支撐目前的應用場景。目前經過客戶數據來看, 索引可以爲 85% 的語句提供 90% 以上的裁剪率,基本知足延時要求。

 title=

某些大數據平臺是將索引數據以文件的形式存儲在磁盤上,外加一些 cache 機制來加速數據訪問,而咱們是將索引數據直接存在了數據庫中。主要有如下兩個方面的考慮:

  • 第一是 transaction。咱們知道列存文件每每是沒法 update 的,而咱們在按期優化文件分佈時會作 Merge File 操做,爲了保證查詢一致性,須要數據庫提供 transaction 能力。
  • 第二是性能。數據庫擁有較強的讀寫和檢索能力,甚至能夠將謂詞下推到數據庫來完成,數據庫的高壓縮比也能進一步節省存儲。

 title=

上圖爲塊索引的設計。在咱們的索引數據庫中,咱們把這些數據分爲不一樣類別數據源,好比終端數據爲一類數據源,網絡數據爲一類數據源,咱們分類數據源的邏輯是他們是否擁有統一的 Schema。就單個數據源來講,它以日期做爲 Partition,Partition 內部是大量的 ORC 小文件,具體到索引結構,咱們會爲每個字段建 min/max 索引,基數小於 0.001 的字段咱們建 Bloom 索引。

上文提到過,安全人員比較喜歡用 like 和全文檢索。對於 like 這一塊,咱們也作了一些優化。全文檢索方面,咱們會爲數據來作分詞,來構建倒排索引,同時也會對於單個分詞事後的單個 item 來作文件分佈層面的位圖索引。

 title=

上圖是一個索引大小的大體的比例假設,JSON 格式的原始日誌大有 50PB,轉化成 ORC 大概是 1PB 左右。咱們的 Index 數據是 508GB, 其中 8GB 爲 Min/Max 索引,500GB 爲 Bloom。加上上文提到的位圖以及倒排,這個索引數據的佔比會進一步加大。基於此,咱們採用的是分佈式的索引方案。

 title=

咱們知道日誌是在不斷的進行變化的,對於有的數據員來講,他有時會增長字段或者減小字段,甚至有時字段類型也會發生變化。

那麼咱們採起這種 Merge Schema 模式方案,在文件增量寫入的過程當中,也就是在更新這批數據的索引信息的同時來作 Schema Merge 的操做。如圖所示,在 block123 中,文件 3 是最後一個寫入的。隨着文件的不斷寫入,會組成一個全新的 Merge Schema。能夠看到 B 字段和 C 字段實際上是歷史字段,而 A\_V 字段是 A 字段的歷史版本字段,咱們用這種方式來儘可能多的讓客戶看到比較全的數據。最後基於本身開發的 Input format 加 Merge Schema 來構建一個新的 table source ,從而打通整個流程。

2、以下降 IO 爲目標的優化與探索 (劉佳)

上文介紹了爲何要選擇塊索引,那麼接下來將具體介紹如何使用塊索引。塊索引的核心能夠落在兩個字上:「裁剪」。裁剪就是在查詢語句被真正執行前就將無關的文件給過濾掉,儘量減小進入計算引擎的數據量,從數據源端進行節流。

 title=

這張圖展現了整個系統使用 IndexDB 來作裁剪流程:

  • 第一步是解析查詢語句。獲取到相關的 filter
,能夠看到最左邊的 SQL 語句中有兩個過濾條件, 分別是 src\_address = 某個 ip,occur\_time > 某個時間戳。
  • 第二步將查詢條件帶入 Index DB 對應數據源的 meta 表中去進行文件篩選
。src\_address 是字符串類型字段,它會聯合使用 min/max 和 bloom 索引進行裁剪。occur\_time 是數值類型字段而且是時間字段,咱們會優先查找 min/max 索引來進行文件裁剪。須要強調的是, 這裏咱們是將用戶寫的 filter 封裝成了 index db 的查詢條件,直接將 filter pushdown 到數據庫中完成。
  • 第三步在獲取到文件列表後,這些文件加上前面提到的 merged schema 會共同構形成一個 TableSource 來交給 Flink 進行後續計算。

同時,構建 source 的時候,咱們在細節上作了一些優化。好比在將 filter 傳給 ORC reader 的時候,清除掉已經 pushdown 了的 filter, 避免在引擎側進行二次過濾。固然, 這裏並非將全部 filter 都清除掉了,咱們保留了 like 表達式,關於 like 的 filter pushdown 會在後文介紹。

 title=

接下來着重介紹一下四大優化點:

  • 第一點,數據在未排序的狀況下,裁剪率是有理論上限的,咱們經過在數據寫入的時候使用 hilbert 曲線排序原始數據來提高裁剪率;
  • 第二點,由於安全領域的特殊性,作威脅檢測嚴重依賴 like 語法,因此咱們對 orc api 進行了加強,使其支持了 like 語法的下推;
  • 第三點,一樣是由於使用場景嚴重依賴 join,因此咱們對 join 操做也作了相應的優化;
  • 第四點,咱們的系統底層支持多種文件系統,因此咱們選取 Alluxio 這一成熟的雲原生數據編排系統來作數據緩存,提升數據的訪問局部性。

1. 裁剪率的理論上限及 Hilbert 空間填充曲線

 title=

裁剪能夠抽象成 N 個球扔進 M 個桶的機率問題,在這裏咱們直接說結論。假設行在塊中隨機均勻分佈,全部塊的總行數固定,查詢條件命中的總行數也固定,則塊命中率直接與 「命中的總行數 / 總塊數」 正相關。

結論有兩個:

  • 第一點,若是命中總行數 = 總塊數,即 X 軸值爲 1 的時候,命中率爲 2/3, 也就是 2/3 的塊,都包含命中的行,對應的塊修剪率的上限是 1/ 3。1/3 是一個很低數值,可是因爲它的前提是數據隨機均勻分佈,因此爲了讓數據分佈更好,咱們須要在數據寫入時對原始數據進行排序。
  • 第二點,假設命中總行數固定,那麼大幅度減小每塊中的行數來增長總塊數,也能提高塊修剪率。因此咱們縮小了塊大小。根據測試結果,咱們設定每一個文件的大小爲:16M。縮小文件大小是很簡單的。針對排序,咱們引入了 hilbert 空間填充曲線。

 title=

爲何使用 hilbert 曲線?主要是基於兩點:

  • 首先是,以什麼路徑遍歷 2 維空間,使路徑的地址序列對其中任一維度都基本有序?爲何要對每一列或者說子集都有序?由於系統在使用的過程當中,查詢條件是不固定的。數據寫入時排序用到了 5 個字段,查詢的時候可能只用到了其中的一個或兩個字段。Hilbert 排序能讓多個字段作到既總體有序,又局部有序。
  • 另外,空間填充曲線有不少,還有 Z 形曲線、蛇形曲線等等,你們能夠看看右邊這兩張對比圖。直觀的看,曲線路徑的長跨度跳躍越少越好,點的位置在迭代過程當中越穩定越好。 而 hilbert 曲線在空間填充曲線裏面綜合表現最好。

hilbert 用法,就是實現一個 UDF,輸入列值,輸出座標值,而後根據座標值排序。

 title=

咱們抽樣了客戶環境所使用的 1500 條 SQL 語句,過濾掉了其中裁剪率爲分之 100% 的相關語句,也就是沒有命中文件的無效語句。而後還剩下 1148 條,咱們使用這些語句作了裁剪率排序後,對裁剪率進行了對比,裁剪率 95 百分位從以前的 68% 提高到了 87%,提高了 19%。可能你們會以爲 19% 這個數值不是特別高,但若是咱們帶上一個基數,好比說 10 萬個文件,這樣看的話就會很可觀了。

2. 字典索引上 Like 的優化

 title=

以前也有講到安全行業的特殊性,咱們作威脅檢測的時候會嚴重依賴 like 查詢。鑑於此,咱們也對它作了優化。

  • 首先咱們爲 ORC api 添加了 like 條件表達式,保證 SQL 中的 like 能下推到 orc record reader 中。
  • 其次,重構了 orc record reader 的 row group filter 邏輯,若是發現是 like 表達式,首先讀取該字段的 dict steam,判斷 dict stream 是否包含 like 目標字符串,若是字典中不存在該值,直接跳過該 row group,不用讀取 data stream 和 length steam,能大幅提升文件讀取速度。後期咱們也考慮構建字典索引到索引數據庫中,直接將字典過濾 pushdown 到數據庫中完成。

例如圖上所示,最左邊的 SQL 中有三個表達式。前兩個在上文中已經提到了,是將 filter 直接 pushdown 到 index db 中完成,咱們交給 orc reader 的 filter 只有最後一個 attachment\_name like '%投標%',真正須要讀取的記錄只是 dict 包含 」投標「 的 row group,也就是作到了 row group 級別的過濾,進一步減小了須要進入計算引擎的數據量。

3. 基於索引對 join 的優化

 title=

威脅情報的匹配中大量使用 join 操做,若是要加速 join 的性能,僅僅是 where 條件的 filter pushdown 是遠遠不夠的。

Flink 中已經內置了許多 join 算法,好比 broadcast join, hash join 和 sort merge join。其中,sort merge join 對預先排好序的表 join 很是友好,而上文有提到咱們使用 Hilbert 曲線來對多字段進行聯合排序,因此 sort merge join 暫時不在咱們的優化範圍以內。

另外,咱們知道 join 的性能和左右表的大小正相關,而威脅情報 join 的稀疏度很是高,因此事先對左右表作裁剪,可以大幅減小進入 join 階段的數據。

上文提到過咱們已經爲常見字段創建了 bloom 索引。那麼利用這些已經建立好的 bloom,來進行文件預過濾,就變得瓜熟蒂落,而且省掉了構建 bloom 的時間開銷。

對於 broadcast join,咱們直接掃描小表,將小表記錄依次進入大表所屬文件的 bloom,判斷該數據塊是否須要, 對數據量大的表作預裁剪。

對於 hash join,正如咱們看到的,咱們能夠預先對 join key 的文件級 bloom 作 「預 join」 操做,具體就是將左表所屬的某個文件的 bloom 依次與右表所屬文件的 bloom 作 「與」 操做,只保留左右表能 」與後結果條數不爲 0「 的文件,再讓各表剩餘的文件進入引擎作後續計算。

 title=

好比說圖上的這三張表,分別是 table一、 table2 和 table3 。咱們能夠從 index DB 中獲取到表的統計信息,也就是文件個數或者說是文件表的大小。圖上就直接列的是文件個數:table 1 是 1000 個, 而後 table 2 是 5 萬個文件, table 3 是 3 萬個文件。

咱們就是參照上一張圖片裏面的邏輯進行預 join,而後預估 join 的成本。咱們會讓成本低的預 join 先進行,這樣的話就可以大幅度減小中間結果,提高 join 的效率。

4. Alluxio 做爲對象存儲的緩存

 title=

由於底層文件存儲系統的多種多樣,因此咱們選取了 Alluxio 數據編排系統,Alluxio 的優勢是讓數據更靠近計算框架,利用內存或者 SSD 多級緩存機制加速文件訪問,若是在徹底命中 cache 的狀況下,可以達到內存級 IO 的文件訪問速度,減小直接從底層文件系統讀文件的頻次,很大程度上緩解了底層文件系統的壓力。

對咱們系統來講就是它帶來了更高的併發,並且對低裁剪率的查詢更友好,由於低裁剪率的話就意味着須要讀取大量的文件。

若是這些文件在以前的查詢中已經被 load 到 cache 裏面,就可以大幅度的提高查詢速度。

 title=

在作完這些優化之後,咱們作了性能對比測試。咱們選取了一個規模爲 249TB 的 es 集羣。它使用了 20 臺服務器,Flink 使用了兩臺服務器,爲了在圖標上看到更直觀的對比效果,咱們選取了 16 條測試結果。

圖表上紅橙色的是 es,藍色的是 HQL 優化前,綠色的是 HQL 優化後。上面的數字標籤是與 es 相比,HQL 的性能差值。好比第一個標籤就意味着 HQL 的性能五倍於 es,其中 6 號和 7 號比 es 慢,主要是由於 HQL 是塊索引,es 是行索引,全在內存裏面,因此能夠作到超快的檢索速度。13 號是由於 HQL 在使用 not equal 的狀況下,裁剪率相對較差。

整體說,優化效果是很明顯的,大部分語句在與 es 查詢速度相比是持平甚至略優的。徹底知足客戶對長週期數據存儲和查詢的指望。

3、將來規劃

 title=

上圖是將來規劃。由於客戶現場常常會涉及到不少的 BI Dashboard 運算和長週期運算報告的需求,因此咱們下一步會考慮作 BI 預算,以及蘇軍提到的容器化和 JVM 預熱,固然還有對標 es,以及提高多用戶併發查詢的能力。


第一時間獲取最新技術文章和社區動態,請關注公衆號~

本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。
相關文章
相關標籤/搜索