簡介:Threat Hunting 平臺的架構與設計,及以下降 IO 爲目標的優化與探索。爲何以及如何使用塊索引。前端
本文整理自 360 政企安全集團的大數據工程師蘇軍以及劉佳在 Flink Forward Asia 2020 分享的議題《基於 Flink 的 PB 級數據即席查詢實踐》,文章內容爲:git
- Threat Hunting 平臺的架構與設計(蘇軍)
- 以下降 IO 爲目標的優化與探索(劉佳)
- 將來規劃
GitHub 地址
https://github.com/apache/flink
歡迎你們給 Flink 點贊送 star~github
首先作一個簡單的我的以及團隊介紹。咱們來自 360 政企安全集團,目前主要從事 360 安全大腦的 「威脅狩獵「 項目的開發工做。咱們團隊接觸 Flink 的時間比較早,在此期間,咱們基於 Flink 開發出了多款產品,並在 2017 年和 2019 年參加了於柏林舉辦的 Flink Forward 大會,分別介紹了咱們的 「UEBA」 以及 「AutoML」 兩款產品。算法
本次分享主要分爲兩塊內容:數據庫
第一部份內容大體分爲三個部分,分別是:apache
咱們認爲全部技術的演化和革新都須要具體的商業問題來驅動,如下是咱們團隊近幾年基於 Flink 開發的幾款產品:segmentfault
經過調查發現,擁有 PB 級數據規模的客戶每每有如下幾個商業需求:api
第一是低成本的雲原生架構。咱們知道目前大部分的大數據架構都是基於 hadoop 的,其特色是數據就在計算節點上,可以減小大量網絡開銷,加速計算性能。可是整個集羣爲了作到資源均衡,每每須要相同的資源配置,且爲了可以存儲儘可能多的數據,集羣規模會很大, 因此這類架構在前期須要投入大量硬件成本。緩存
而存算分離和彈性計算則可以解決這一問題,由於磁盤的價格是遠低於內存和 CPU 的,因此用廉價的磁盤存儲搭配低配 CPU 和內存來存儲數據,用少許高配機器來作計算,能夠在很大程度上下降成本。安全
第二是低延時的查詢響應。安全分析人員在作威脅檢測時,大部分時間是即席查詢,即經過過濾、join 來作數據的檢索和關聯。爲了可以儘快的獲取查詢結果,對應的技術方案是:列存/索引/緩存。
首先,數據是來自於已經存儲在 ES 中的歷史數據和 kafka 裏的實時數據,其中 ES 裏的歷史數據咱們經過本身開發的同步工具來同步,kafka 裏的實時數據咱們則經過 Streaming File Sink 寫 orc 文件到存儲集羣。在數據同步的同時,咱們會將這批數據的索引信息更新到數據庫中。
安全分析人員會從前端頁面經過寫交互式分析語言 HQL 發起數據檢索的請求,此時請求會進入調度系統,一旦開始執行做業,首先會將分析語句解析成算子列表,算子緩存算法會判斷該次查詢是否能夠命中緩存系統中已有的緩存數據。
咱們會先提取出查詢語言的過濾條件或者是 Join 條件來作謂詞下推,進入索引數據庫中得到目前符合該查詢的文件列表,隨後將文件列表交給計算引擎來進行計算。計算引擎咱們採用雙引擎模式,其中複雜度高的語句咱們經過 Flink 引擎來完成,其它較爲簡單的任務咱們交給平臺內部的 「蜂鳥引擎」。「蜂鳥引擎」 基於 Apache arrow 作向量化執行,加上 LLVM 編譯,查詢延遲會很是小。
因爲整個系統的存算分離,爲了加速數據讀取,咱們在計算集羣節點上增長了 alluxio 來提供數據緩存服務,其中不只緩存 remote cluster 上的數據,同時會緩存部分歷史做業結果,經過算子緩存的算法來加速下次計算任務。
這裏還須要強調兩點:
數據庫爲了加速數據檢索,咱們每每會事先爲數據建立索引,再在掃描數據以前經過索引定位到數據的起始位置,從而加速數據檢索。而傳統數據庫常見的是行索引,經過一個或若干字段建立索引,索引結果以樹形結構存儲,此類索引可以精確到行級別,索引效率最高。
某些大數據項目也支持了行索引,而它所帶來的弊端就是大量的索引數據會形成寫入和檢索的延時。而咱們平臺處理的是機器數據,例如終端/網絡這類數據,它的特色是重複度很是高,而安全分析的結果每每很是少,極少數的威脅行爲會隱藏在海量數據裏,佔比每每會是 1/1000 甚至更少。
因此咱們選擇性價比更高的塊索引方案,已經可以支撐目前的應用場景。目前經過客戶數據來看, 索引可以爲 85% 的語句提供 90% 以上的裁剪率,基本知足延時要求。
某些大數據平臺是將索引數據以文件的形式存儲在磁盤上,外加一些 cache 機制來加速數據訪問,而咱們是將索引數據直接存在了數據庫中。主要有如下兩個方面的考慮:
上圖爲塊索引的設計。在咱們的索引數據庫中,咱們把這些數據分爲不一樣類別數據源,好比終端數據爲一類數據源,網絡數據爲一類數據源,咱們分類數據源的邏輯是他們是否擁有統一的 Schema。就單個數據源來講,它以日期做爲 Partition,Partition 內部是大量的 ORC 小文件,具體到索引結構,咱們會爲每個字段建 min/max 索引,基數小於 0.001 的字段咱們建 Bloom 索引。
上文提到過,安全人員比較喜歡用 like 和全文檢索。對於 like 這一塊,咱們也作了一些優化。全文檢索方面,咱們會爲數據來作分詞,來構建倒排索引,同時也會對於單個分詞事後的單個 item 來作文件分佈層面的位圖索引。
上圖是一個索引大小的大體的比例假設,JSON 格式的原始日誌大有 50PB,轉化成 ORC 大概是 1PB 左右。咱們的 Index 數據是 508GB, 其中 8GB 爲 Min/Max 索引,500GB 爲 Bloom。加上上文提到的位圖以及倒排,這個索引數據的佔比會進一步加大。基於此,咱們採用的是分佈式的索引方案。
咱們知道日誌是在不斷的進行變化的,對於有的數據員來講,他有時會增長字段或者減小字段,甚至有時字段類型也會發生變化。
那麼咱們採起這種 Merge Schema 模式方案,在文件增量寫入的過程當中,也就是在更新這批數據的索引信息的同時來作 Schema Merge 的操做。如圖所示,在 block123 中,文件 3 是最後一個寫入的。隨着文件的不斷寫入,會組成一個全新的 Merge Schema。能夠看到 B 字段和 C 字段實際上是歷史字段,而 A\_V 字段是 A 字段的歷史版本字段,咱們用這種方式來儘可能多的讓客戶看到比較全的數據。最後基於本身開發的 Input format 加 Merge Schema 來構建一個新的 table source ,從而打通整個流程。
上文介紹了爲何要選擇塊索引,那麼接下來將具體介紹如何使用塊索引。塊索引的核心能夠落在兩個字上:「裁剪」。裁剪就是在查詢語句被真正執行前就將無關的文件給過濾掉,儘量減小進入計算引擎的數據量,從數據源端進行節流。
這張圖展現了整個系統使用 IndexDB 來作裁剪流程:
同時,構建 source 的時候,咱們在細節上作了一些優化。好比在將 filter 傳給 ORC reader 的時候,清除掉已經 pushdown 了的 filter, 避免在引擎側進行二次過濾。固然, 這裏並非將全部 filter 都清除掉了,咱們保留了 like 表達式,關於 like 的 filter pushdown 會在後文介紹。
接下來着重介紹一下四大優化點:
裁剪能夠抽象成 N 個球扔進 M 個桶的機率問題,在這裏咱們直接說結論。假設行在塊中隨機均勻分佈,全部塊的總行數固定,查詢條件命中的總行數也固定,則塊命中率直接與 「命中的總行數 / 總塊數」 正相關。
結論有兩個:
爲何使用 hilbert 曲線?主要是基於兩點:
hilbert 用法,就是實現一個 UDF,輸入列值,輸出座標值,而後根據座標值排序。
咱們抽樣了客戶環境所使用的 1500 條 SQL 語句,過濾掉了其中裁剪率爲分之 100% 的相關語句,也就是沒有命中文件的無效語句。而後還剩下 1148 條,咱們使用這些語句作了裁剪率排序後,對裁剪率進行了對比,裁剪率 95 百分位從以前的 68% 提高到了 87%,提高了 19%。可能你們會以爲 19% 這個數值不是特別高,但若是咱們帶上一個基數,好比說 10 萬個文件,這樣看的話就會很可觀了。
以前也有講到安全行業的特殊性,咱們作威脅檢測的時候會嚴重依賴 like 查詢。鑑於此,咱們也對它作了優化。
例如圖上所示,最左邊的 SQL 中有三個表達式。前兩個在上文中已經提到了,是將 filter 直接 pushdown 到 index db 中完成,咱們交給 orc reader 的 filter 只有最後一個 attachment\_name like '%投標%',真正須要讀取的記錄只是 dict 包含 」投標「 的 row group,也就是作到了 row group 級別的過濾,進一步減小了須要進入計算引擎的數據量。
威脅情報的匹配中大量使用 join 操做,若是要加速 join 的性能,僅僅是 where 條件的 filter pushdown 是遠遠不夠的。
Flink 中已經內置了許多 join 算法,好比 broadcast join, hash join 和 sort merge join。其中,sort merge join 對預先排好序的表 join 很是友好,而上文有提到咱們使用 Hilbert 曲線來對多字段進行聯合排序,因此 sort merge join 暫時不在咱們的優化範圍以內。
另外,咱們知道 join 的性能和左右表的大小正相關,而威脅情報 join 的稀疏度很是高,因此事先對左右表作裁剪,可以大幅減小進入 join 階段的數據。
上文提到過咱們已經爲常見字段創建了 bloom 索引。那麼利用這些已經建立好的 bloom,來進行文件預過濾,就變得瓜熟蒂落,而且省掉了構建 bloom 的時間開銷。
對於 broadcast join,咱們直接掃描小表,將小表記錄依次進入大表所屬文件的 bloom,判斷該數據塊是否須要, 對數據量大的表作預裁剪。
對於 hash join,正如咱們看到的,咱們能夠預先對 join key 的文件級 bloom 作 「預 join」 操做,具體就是將左表所屬的某個文件的 bloom 依次與右表所屬文件的 bloom 作 「與」 操做,只保留左右表能 」與後結果條數不爲 0「 的文件,再讓各表剩餘的文件進入引擎作後續計算。
好比說圖上的這三張表,分別是 table一、 table2 和 table3 。咱們能夠從 index DB 中獲取到表的統計信息,也就是文件個數或者說是文件表的大小。圖上就直接列的是文件個數:table 1 是 1000 個, 而後 table 2 是 5 萬個文件, table 3 是 3 萬個文件。
咱們就是參照上一張圖片裏面的邏輯進行預 join,而後預估 join 的成本。咱們會讓成本低的預 join 先進行,這樣的話就可以大幅度減小中間結果,提高 join 的效率。
由於底層文件存儲系統的多種多樣,因此咱們選取了 Alluxio 數據編排系統,Alluxio 的優勢是讓數據更靠近計算框架,利用內存或者 SSD 多級緩存機制加速文件訪問,若是在徹底命中 cache 的狀況下,可以達到內存級 IO 的文件訪問速度,減小直接從底層文件系統讀文件的頻次,很大程度上緩解了底層文件系統的壓力。
對咱們系統來講就是它帶來了更高的併發,並且對低裁剪率的查詢更友好,由於低裁剪率的話就意味着須要讀取大量的文件。
若是這些文件在以前的查詢中已經被 load 到 cache 裏面,就可以大幅度的提高查詢速度。
在作完這些優化之後,咱們作了性能對比測試。咱們選取了一個規模爲 249TB 的 es 集羣。它使用了 20 臺服務器,Flink 使用了兩臺服務器,爲了在圖標上看到更直觀的對比效果,咱們選取了 16 條測試結果。
圖表上紅橙色的是 es,藍色的是 HQL 優化前,綠色的是 HQL 優化後。上面的數字標籤是與 es 相比,HQL 的性能差值。好比第一個標籤就意味着 HQL 的性能五倍於 es,其中 6 號和 7 號比 es 慢,主要是由於 HQL 是塊索引,es 是行索引,全在內存裏面,因此能夠作到超快的檢索速度。13 號是由於 HQL 在使用 not equal 的狀況下,裁剪率相對較差。
整體說,優化效果是很明顯的,大部分語句在與 es 查詢速度相比是持平甚至略優的。徹底知足客戶對長週期數據存儲和查詢的指望。
上圖是將來規劃。由於客戶現場常常會涉及到不少的 BI Dashboard 運算和長週期運算報告的需求,因此咱們下一步會考慮作 BI 預算,以及蘇軍提到的容器化和 JVM 預熱,固然還有對標 es,以及提高多用戶併發查詢的能力。
第一時間獲取最新技術文章和社區動態,請關注公衆號~
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。