做者蔡聰輝&鍾雲 html
隨着小米互聯網業務的發展,各個產品線利用用戶行爲數據對業務進行增加分析的需求愈來愈迫切。顯然,讓每一個業務產品線都本身搭建一套增加分析系統,不只成本高昂,也會致使效率低下。咱們但願能有一款產品可以幫助他們屏蔽底層複雜的技術細節,讓相關業務人員可以專一於本身的技術領域,從而提升工做效率。經過分析調查發現,小米已有的統計平臺沒法支持靈活的維度交叉查詢,數據查詢分析效率較低,複雜查詢須要依賴於研發人員,同時缺少根據用戶行爲高效的分羣工具,對於用戶的運營策略囿於設施薄弱而較爲粗放,運營效率較低和效果不佳。 基於上述需求和痛點,小米大數據和雲平臺合做開發了增加分析系統(Growing Analytics, 下面簡稱GA),旨在提供一個靈活的多維實時查詢和分析平臺,統一數據接入和查詢方案,幫助業務線作精細化運營。數據庫
如上圖所示,分析、決策、執行是一個循環迭代的過程,所以,增加分析查詢很是靈活,涉及分析的維度有幾十上百個,咱們沒法預先定義好全部要計算的結果,代價過高,因此這也就要求了全部的數據須要即時計算和分析。同時,決策具備時效性,所以數據從攝入到能夠查詢的時延不能過高。另外,業務發展迅速,須要增長新的分析維度,因此咱們須要可以支持schema的變動(主要是在線增長字段)。 在咱們的業務中,增加分析最經常使用的三個功能是事件分析(佔絕大多數)、留存分析和漏斗分析;這三個功能業務都要求針對實時入庫(只有append)的明細數據,可以即席選擇維度和條件(一般還要join業務畫像表或者圈選的人羣包),而後在秒級返回結果(業界相關的產品如神策、GrowingIO等都能達到這個性能)。一些只支持提早聚合的預計算引擎(如Kylin),雖然查詢性能優秀,但難以支持schema隨時變動,衆多的維度也會形成Cube存儲佔用失控,而Hive可以在功能上知足要求,可是性能上較差。 綜上,咱們須要存儲和計算明細數據,須要一套支持近實時數據攝取,可靈活修改schema和即席查詢的數據分析系統解決方案。json
3.1 初始架構緩存
GA立項於2018年年中,當時基於開發時間和成本,技術棧等因素的考慮,咱們複用了現有各類大數據基礎組件(HDFS, Kudu, SparkSQL等),搭建了一套基於Lamda架構的增加分析查詢系統。GA系統初代版本的架構以下圖所示:服務器
GA系統涵蓋了數據採集、數據清洗、數據查詢和BI報表展現等一整套流程。首先,咱們將從數據源收集到的數據進行統一的清洗,以統一的json格式寫入到Talos(注:小米自研的消息隊列)中。接着咱們使用Spark Streaming將數據轉儲到Kudu中。Kudu做爲一款優秀的OLAP存儲引擎,具備支持實時攝取數據和快速查詢的能力,因此這裏將Kudu做爲熱數據的存儲,HDFS做爲冷數據的存儲。爲了避免讓用戶感知到冷熱數據的實際存在,咱們使用了動態分區管理服務來管理表分區數據的遷移,按期將過時的熱數據轉化爲冷數據存儲到HDFS上,而且更新Kudu表和HDFS表的聯合視圖,當用戶使用SparkSQL服務查詢視圖時,計算引擎會根據查詢SQL自動路由,對Kudu表的數據和HDFS表的數據進行處理。 在當時的歷史背景下,初代版本的GA幫助咱們用戶解決了運營策略較爲粗放、運營效率較低的痛點,但同時也暴露了一些問題。首先是運維成本的問題,本來的設計是各個組件都使用公共集羣的資源,可是實踐過程當中發現執行查詢做業的過程當中,查詢性能容易受到公共集羣其餘做業的影響,容易抖動,尤爲在讀取HDFS公共集羣的數據時,有時較爲緩慢,所以GA集羣的存儲層和計算層的組件都是單獨搭建的。另外一個是性能的問題,SparkSQL是基於批處理系統設計的查詢引擎,在每一個Stage之間交換數據shuffle的過程當中依然須要落盤操做,完成SQL查詢的時延較高。爲了保證SQL查詢不受資源的影響,咱們經過添加機器來保證查詢性能,可是實踐過程當中發現,性能提高的空間有限,這套解決方案並不能充分地利用機器資源來達到高效查詢的目的,存在必定的資源浪費。所以,咱們但願有一套新的解決方案,可以提升查詢性能和下降咱們的運維成本。微信
3.2 從新選型網絡
MPP架構的SQL查詢引擎,如Impala,presto等可以高效地支持SQL查詢,可是仍然須要依賴Kudu, HDFS, Hive Metastore等組件, 運維成本依然比較高,同時,因爲計算存儲分離,查詢引擎不能很好地及時感知存儲層的數據變化,就沒法作更細緻的查詢優化,如想在SQL層作緩存就沒法保證查詢的結果是最新的。所以,咱們的目標是尋求一款計算存儲一體的MPP數據庫來替代咱們目前的存儲計算層的組件。咱們對這款MPP數據庫有以下要求:數據結構
3.3 性能測試架構
在配置大致相同計算資源的條件下,咱們選取了一個日均數據量約10億的業務,分別測試不一樣場景下(6個事件分析,3個留存分析,3個漏斗分析),不一樣時間範圍(一週到一個月)的SparkSQL和Doris的查詢性能。併發
如上圖測試結果,在增加分析的場景下,Doris查詢性能相比於SparkSQL+Kudu+HDFS方案具備明顯的提高,在事件分析場景下平均下降約85%左右的查詢時間,在留存和漏斗場景下平均下降約50%左右的查詢時間。對於咱們咱們業務大多數都是事件分析需求來說,這個性能提高很大。
4.1 Doris在增加分析平臺的使用狀況
隨着接入業務的增多,目前,咱們的增加分析集羣單集羣最大規模已經擴展到了近百臺,存量數據到了PB級別。其中,近實時的產品線做業有數十個,天天有幾百億條的數據入庫,每日有效的業務查詢SQL達1.2w+。業務的增多和集羣規模的增大,讓咱們也遇到很多問題和挑戰,下面咱們將介紹運維Doris集羣過程當中遇到的一些問題和應對措施或改進。
4.2 Doris數據導入實踐
Doris大規模接入業務的第一個挑戰是數據導入,基於咱們目前的業務需求,數據要儘量實時導入。而對於增加分析集羣,目前有數十個業務明細數據表須要近實時導入,這其中還包含了幾個大業務(大業務天天的數據條數從幾十億到上百億不等,字段數在200~400)。爲了保證數據不重複插入,Doris採用label標記每批數據的導入,並採用兩階段提交來保證數據導入的事務性,要麼所有成功,要麼所有失敗。爲了方便監控和管理數據導入做業,咱們使用Spark Streaming封裝了stream load操做,實現了將Talos的數據導入到Doris中。每隔幾分鐘,Spark Streaming會從Talos讀取一個批次的數據並生成相應的RDD,RDD的每一個分區和Talos的每一個分區一一對應,以下圖所示:
對於Doris來講,一次stream load做業會產生一次事務,Doris的fe進程的master節點會負責整個事務生命週期的管理,若是短期內提交了太多的事務,則會對fe進程的master節點形成很大的壓力。對於每一個單獨的流式數據導入產品線做業來講,假設消息隊列一共有m個分區,每批次的每一個分區的數據導入可能執行最多n次stream load操做,因而對消息隊列一個批次的數據的處理就可能會產生m*n次事務。爲了Doris的數據導入的穩定性,咱們把Spark Streaming每批次數據的時間間隔根據業務數據量的大小和實時性要求調整爲1min到3min不等,並儘可能地加大每次stream load發送的數據量。
在集羣接入業務的初期,這套流式數據導入Doris的機制基本能平穩運行。可是隨着接入業務規模的增加,問題也隨之而來。首先,咱們發現某些存了數日數據的大表頻繁地出現數據導入失敗問題,具體表現爲數據導入超時報錯。通過咱們的排查,肯定了致使數據導入超時的緣由,因爲咱們使用stream load進行數據導入的時候,沒有指定表的寫入分區(這裏線上的事件表都是按天進行分區),有的事件表已經保留了三個多月的數據,而且天天擁有600多個數據分片,加上每張表默認三副本保存數據,因此每次寫入數據以前都須要打開約18萬個writer,致使在打開writer的過程當中就已經超時,可是因爲數據是實時導入,其餘天的分區沒有數據寫入,因此並不須要打開writer。定位到緣由以後,咱們作了相應的措施,一個是根據數據的日期狀況,在數據導入的時候指定了寫入分區,另外一個措施是縮減了天天分區的數據分片數量,將分片數據量從600+下降到了200+(分片數量過多會影響數據導入和查詢的效率)。經過指定寫入數據分區和限制分區的分片數量,大表也能流暢穩定地導入數據而不超時了。
另外一個困擾咱們的問題就是須要實時導入數據的業務增多給fe的master節點帶來了較大的壓力,也影響了數據導入的效率。每一次的 stream load操做,coordinator be節點都須要屢次和fe節點進行交互,以下圖所示:
曾經有段時間,咱們發現master節點偶爾出現線程數飆升,隨後cpu load升高, 最後進程掛掉重啓的狀況。咱們的查詢併發並非很高,因此不太多是查詢致使的。但同時咱們經過對max_running_txn_num_per_db參數的設置已經對數據導入在fe端作了限流,因此爲什麼fe的master節點的線程數會飆升讓咱們感到比較奇怪。通過查看日誌發現,be端有大量請求數據導入執行計劃失敗的日誌。咱們的確限制住了單個db可以容許同時存在的最大事務數目,可是因爲fe在計算執行計劃的時候須要獲取db的讀鎖,提交和完成事務須要獲取db的寫鎖,一些長尾任務的出現致使了好多計算執行計劃的任務都堵塞在獲取db鎖上邊,這時候be客戶端發現rpc請求超時了,因而當即重試,fe端的thirft server須要啓動新的線程來處理新的請求,可是以前的事務任務並無取消,這時候積壓的任務不斷增多,最終致使了雪崩效應。針對這種狀況,咱們對Doris主要作了如下的改造:
4.3 Doris在線查詢實踐
在增加分析業務場景中,事件表是咱們的核心表,須要實時導入明細日誌。這些事件表沒有聚合和去重需求,並且業務需求是可以查詢明細信息,因此都選用了冗餘模型(DUPLICATE KEY)。事件表根據天級別分區,分桶字段使用了日誌id字段(其實是一個隨機產生的md5),其hash值可以保證分桶之間數據均勻分佈,避免數據傾斜致使的寫入和查詢問題。 下圖是咱們線上規模最大的集羣最近30天的查詢性能統計(查詢信息的統計來自於Doris的查詢審計日誌),最近一週天天成功的SQL查詢數在1.2w~2w之間。
從圖中能夠看出,使用了Doris後,平均查詢時間保持在10秒左右,最大不超過15秒;查詢時間P95通常能保證在30秒內。這個查詢體驗,相對於原來的SparkSQL,提高效果比較明顯。 Doris提供了查詢併發度參數parallel_fragment_exec_instance_num,查詢服務端根據正在運行的任務個數動態調整它來優化查詢,低負載下增長併發度提升查詢性能,高負載下減小併發度保證集羣穩定性。在分析業務查詢profile時,咱們發現Doris默認執行過程當中exchange先後併發度是同樣的,實際上對於聚合型的查詢,exchange後的數據量是大大減小的,這時若是繼續用同樣的併發度不只浪費了資源,並且exchange後較少數據量用較大的併發執行,理論上反而下降了查詢性能。所以,咱們添加了參數doris_exchange_instances控制exchange後任務併發度(以下圖所示),在實際業務測試中取得了較好的效果。
這個對數據量巨大的業務或者exchange後不能明顯下降數據量級的查詢並不明顯,可是這個對於中小業務(尤爲是那些用了較多bucket的小業務)的聚合或join查詢,優化比較明顯。咱們對不一樣數量級業務的測試,也驗證了咱們的推斷。咱們選取了一個數據量4億/日的小業務,分別測試了不一樣場景下查詢性能:
從上圖結果能夠看出,doris_exchange_instances對於聚合和join類型的小查詢改進明顯。固然,這個測試是在不少次測試以後找到的最優doris_exchange_instances值,在實際業務中每次都能找到最優值可行性較低,通常對於中小業務根據查詢計劃中須要掃描的buckets數目結合集羣規模適當下降,用較小的代價得到必定性能提高便可。後來咱們將這個改進貢獻到社區,該參數名被修改成parallel_exchange_instance_num。 爲了擴展SQL的查詢能力,Doris也提供了和SparkSQL,Hive相似的UDF(User-Defined Functions)框架支持。當Doris內置函數沒法知足用戶需求時,用戶能夠根據Doris的UDF框架來實現自定義函數進行查詢。Doris支持的UDF分紅兩類(目前不支持UDTF,User-Defined Table-Generating Functions,一行數據輸入對應多行數據輸出),一類是普通UDF,根據單個數據行的輸入,產生一個數據行的輸出。另外一類是UDAF(User-Defined Aggregate Functions),該類函數屬於聚合函數,接收多個數據行的輸入併產生一個數據行的輸出。UDAF的執行流程以下圖所示:
UDAF通常須要定義4個函數,分別爲Init、Update、Merge、Finalize函數,若爲中間輸出的數據類型爲複雜數據類型時,則還須要實現Serialize函數,在Shuffle過程當中對中間類型進行序列化,並在Merge函數中對該類型進行反序列化。在增加分析場景中,留存分析、漏斗分析等都使用到了UDAF。以留存分析爲例,它是一種用來分析用戶參與狀況/活躍程度的分析模型,考查進行初始行爲後的用戶中有多少人會進行後續行爲。針對以上需求,咱們首先定義了函數retention_info,輸入是每一個用戶的行爲信息,而後以每一個用戶的id爲key進行分組,生成每一個用戶在指定時間內的每一個時間單元(如天,周,月等)的留存信息,而後定義函數retention_count,輸入是retention_info函數生成的每一個用戶的留存信息,而後咱們以留存的時間單位(這裏一般是天)爲key進行分組,就能夠算出每一個單位時間內留存的用戶數。這樣在UDAF的幫助下,咱們就能夠順利完成留存分析的計算。
4.4 Doris表的管理
在咱們的增加分析場景中,從是否分區的角度上看,Doris的olap表主要分紅兩種類型,一種是非分區表,如人羣包和業務畫像表,人羣包表的特徵是數據量較小,可是表的數量多;業務畫像表數據量較少,數據量中等,但有更新需求。另外一種是分區表,如事件表,這類表通常單表數據規模都比較大,在設計上,咱們以時間字段爲分區鍵,須要天天增長爲表添加新的分區,使得實時的數據可以成功地導入當天的分區,而且須要及時地刪掉過時的分區。顯然,讓每一個業務本身去管理表的分區,不只繁瑣,並且可能出錯。在咱們原先的GA架構中,就有動態分區管理服務,使用Doris系統後,咱們將動態分區管理服務集成到了Doris系統中,支持用戶按天、周、月來設置須要保留的分區個數以及須要提早建立的分區數量。 另外一個表管理的典型場景是修改表的schema,主要操做爲增長表的字段。Doris現階段只支持一些基本數據類型,在大數據場景下業務打點上報的日誌的數據類型多爲嵌套類型(list,map),因此接入Doris時須要展開或者轉換,致使Doris表字段數目較爲龐大,部分類型字段展開困難不得不用varchar存儲致使使用起來很是不方便,查詢性能也相對低下。因爲Doris不支持嵌套數據類型,當嵌套類型新增元素時,則Doris表須要增長字段,從提交增長字段請求到添加字段成功等待的時間較長,當集羣管理的tablet數目龐大而且表的數據量和tablet數目都比較多的狀況下可能會出現添加列失敗的問題。針對以上問題,目前咱們主要作了如下兩點改進:
Doris在小米從2019年9月上線接入第一個業務至今,已經在海內外部署近十個集羣(整體達到幾百臺BE的規模),天天完成數萬個在線分析查詢,承擔了咱們包括增加分析和報表查詢在內的大多數在線分析需求。從結果上來看,用Doris替換SparkSQL做爲主要OLAP引擎,既大幅度提升查詢性能,又簡化了目前的數據分析架構,是Doris基於明細數據查詢的大規模服務的一個比較成功的實踐。 在接下來的一段時間內,咱們將繼續投入精力提高數據實時導入效率和優化整體的查詢性能,因爲公司內部有很多業務有使用UNIQUE KEY模型的需求,目前該模型與DUPLICATE KEY模型的scan性能相比仍是有比較明顯的差距,這塊也是將來咱們須要重點解決的性能問題。
蔡聰輝,小米OLAP工程師,Apache Doris Committer
鍾雲,小米大數據工程師
Apache Doris(Incubating) 一款基於大規模並行處理技術的交互式SQL分析數據庫,由百度於2018年貢獻給 Apache 基金會,目前在 Apache 基金會孵化器中。
Doris 微信公衆號: