大衆點評的大數據實踐

2011年小規模試水
前端

這一階段的主要工做是創建了一個小的集羣,並導入了少許用戶進行測試。爲了知足用戶的需求,咱們還調研了任務調度系統和數據交換系統。git

咱們使用的版本是當時最新的穩定版,Hadoop 0.20.203和Hive  0.7.1。此後經歷過屢次升級與Bugfix。如今使用的是Hadoop 1.0.3+自有Patch與Hive 0.9+自有Patch。考慮到人手不足及本身的Patch很少等問題,咱們採起的策略是,以Apache的穩定版本爲基礎,儘可能將本身的修改提交到社區,而且應用這些尚未被接受的Patch。由於如今Hadoop生態圈中尚未出現一個相似Red  Hat地位的公司,咱們也不但願被鎖定在某個特定的發行版上,更重要的是Apache  Jira與Maillist依然是獲取Hadoop相關知識、解決Hadoop相關問題最好的地方(Cloudera爲CDH創建了私有的Jira,但人氣不足),因此沒有采用Cloudera或者Hortonworks的發行版。目前咱們正對Hadoop 2.1.0進行測試。github

在前期,咱們團隊的主要工做是ops+solution,如今DBA已接手了很大一部分ops的工做,咱們正在轉向solution+dev的工做。數據庫

咱們使用Puppet管理整個集羣,用Ganglia和Zabbix作監控與報警。瀏覽器

集羣搭建好,用戶便開始使用,面臨的第一個問題是須要任務級別的調度、報警和工做流服務。當用戶的任務出現異常或其餘狀況時,須要以郵件或者短信的方式通知用戶。並且用戶的任務間可能有複雜的依賴關係,須要工做流系統來描述任務間的依賴關係。咱們首先將目光投向開源項目Apache Oozie。Oozie是Apache開發的工做流引擎,以XML的方式描述任務及任務間的依賴,功能強大。但在測試後,發現Oozie並非一個很好的選擇。安全

Oozie採用XML做爲任務的配置,特別是對於MapReduce Job,須要在XML裏配置Map、Reduce類、輸入輸出路徑、Distributed Cache和各類參數。在運行時,先由Oozie提交一個Map only的Job,在這個Job的Map裏,再拼裝用戶的Job,經過JobClient提交給JobTracker。相對於Java編寫的Job Runner,這種XML的方式缺少靈活性,並且難以調試和維 護。先提交一個Job,再由這個Job提交真正Job的設計,我我的認爲至關不優雅。網絡

另外一個問題在於,公司內的不少用戶,但願調度系統不只能夠調度Hadoop任務,也能夠調度單機任務,甚至Spring容器裏的任務,而Oozie並不支持Hadoop集羣以外的任務。多線程

因此咱們轉而自行開發調度系統Taurus(https://github.com/dianping/taurus)。Taurus是一個調度系統,  經過時間依賴與任務依賴,觸發任務的執行,並經過任務間的依賴管理將任務組織成工做流;支持Hadoop/Hive  Job、Spring容器裏的任務及通常性任務的調度/監控。架構

圖1  Taurus的結構圖併發

圖1是Taurus的結構圖,Taurus的主節點稱爲Master,Web界面與Master在一塊兒。用戶在Web界面上建立任務後,寫入MySQL作持久化存儲,當Master判斷任務觸發的條件知足時,則從MySQL中讀出 任務信息,寫入ZooKeeper;Agent部署在用戶的機器上,觀察ZooKeeper上的變化,得到任務信息,啓動任務。Taurus在2012年 中上線。

另外一個迫切需求是數據交換系統。用戶須要將MySQL、MongoDB甚至文件中的數據導入到HDFS上進行分析。另一些用戶要將HDFS中生成的數據再導入MySQL做爲報表展示或者供在線系統使用。

咱們首先調研了Apache  Sqoop,它主要用於HDFS與關係型數據庫間的數據傳輸。通過測試,發現Sqoop的主要問題在於數據的一致性。Sqoop採用MapReduce Job進行數據庫的插入,而Hadoop自帶Task的重試機制,當一個Task失敗,會自動重啓這個Task。這是一個很好的特性,大大提升了Hadoop的容錯能力,但對於數據庫插入操做,卻帶來了麻煩。

考慮有10個Map,每一個Map插入十分之一的數據,若是有一個Map插入到一半時failed,再經過Task rerun執行成功,那麼fail那次插入的一半數據就重複了,這在不少應用場景下是不可接受的。 並且Sqoop不支持MongoDB和MySQL之間的數據交換,但公司內卻有這需求。最終咱們參考淘寶的DataX,於2011年末開始設計並開發了Wormhole。之因此採用自行開發而沒有直接使用DataX主要出於維護上的考慮,並且DataX並未造成良好的社區。

2012年大規模應用

2012年,出於成本、穩定性與源碼級別維護性的考慮,公司的Data  Warehouse系統由商業的OLAP數據庫轉向Hadoop/Hive。2012年初,Wormhole開發完成;以後Taurus也上線部署;大量應用接入到Hadoop平臺上。爲了保證數據的安全性,咱們開啓了Hadoop的Security特性。爲了提升數據的壓縮率,咱們將默認存儲格式替換爲RCFile,並開發了Hive Web供公司內部使用。2012年末,咱們開始調研HBase。

圖2  Wormhole的結構圖

Wormhole(https://github.com /dianping/wormhole)是一個結構化數據傳輸工具,用於解決多種異構數據源間的數據交換,具備高效、易擴展等特色,由Reader、Storage、Writer三部分組成(如圖2所示)。Reader是個線程池,能夠啓動多個Reader線程從數據源讀出數據,寫入Storage。Writer也是線程池,多線程的Writer不只用於提升吞吐量,還用於寫入多個目的地。Storage是個雙緩衝隊列,若是使用一讀多寫,則每一個目的地都擁有本身的Storage。

當寫入過程出錯時,將自動執行用戶配置的Rollback方法,消除錯誤狀態,從而保證數據的完整性。經過開發不一樣的Reader和Writer插件,如MySQL、MongoDB、Hive、HDFS、SFTP和Salesforce,咱們就能夠支持多種數據源間的數據交換。Wormhole在大衆點評內部獲得了大量使用,得到了普遍好評。

隨着愈來愈多的部門接入Hadoop,特別是數據倉庫(DW)部門接入後,咱們對數據的安全性需求變得更爲迫切。而Hadoop默認採用Simple的用戶認證模式,具備很大的安全風險。

默認的Simple認證模式,會在Hadoop的客戶端執行whoami命令,並以whoami命令的形式返回結果,做爲訪問Hadoop的用戶名(準確地說,是以whoami的形式返回結果,做爲Hadoop RPC的userGroupInformation參數發起RPC  Call)。這樣會產生如下三個問題。

(1)User  Authentication。假設有帳號A和帳號B,分別在Host1和Host2上。若是惡意用戶在Host2上創建了一個同名的帳號A,那麼經過RPC Call得到的UGI就和真正的帳號A相同,僞造了帳號A的身份。用這種方式,惡意用戶能夠訪問/修改其餘用戶的數據。

(2)Service  Authentication。Hadoop採用主從結構,如NameNode-DataNode、JobTracker-Tasktracker。Slave節點啓動時,主動鏈接Master節點。Slave到Master的鏈接過程,沒有通過認證。假設某個用戶在某臺非Hadoop機器上,錯誤地啓動了一個Slave實例,那麼也會鏈接到Master;Master會爲它分配任務/數據,可能會影響任務的執行。

(3)可管理性。任何能夠連到Master節點的機器,均可以請求集羣的服務,訪問HDFS,運行Hadoop Job,沒法對用戶的訪問進行控制。

從Hadoop 0.20.203開始,社區開發了Hadoop  Security,實現了基於Kerberos的Authentication。任何訪問Hadoop的用戶,都必須持有KDC(Key  Distribution Center)發佈的Ticket或者Keytab File(準確地說,是Ticket Granting  Ticket),才能調用Hadoop的服務。用戶經過密碼,獲取Ticket,Hadoop Client在發起RPC  Call時讀取Ticket的內容,使用其中的Principal字段,做爲RPC  Call的UserGroupInformation參數,解決了問題(1)。Hadoop的任何Daemon進程在啓動時,都須要使用Keytab  File作Authentication。由於Keytab  File的分發是由管理員控制的,因此解決了問題(2)。最後,不管是Ticket,仍是Keytab  File,都由KDC管理/生成,而KDC由管理員控制,解決了問題(3)。

在使用了Hadoop  Security以後,只有經過了身份認證的用戶才能訪問Hadoop,大大加強了數據的安全性和集羣的可管理性。以後咱們基於Hadoop  Secuirty,與DW部門一塊兒開發了ACL系統,用戶能夠自助申請Hive上表的權限。在申請經過審批工做流以後,就能夠訪問了。

JDBC是一種很經常使用的數據訪問接口,Hive自帶了Hive Server,能夠接受Hive JDBC Driver的鏈接。實際 上,Hive JDBC Driver是將JDBC的請求轉化爲Thrift Call發給Hive Server,再由Hive Server將Job啓動起來。但Hive自帶的Hive Server並不支持Security,默認會使用啓動Hive Server的用戶做爲Job的owner提交到Hadoop,形成安全漏洞。所以,咱們本身開發了Hive Server的Security,解決了這個問題。

但在Hive Server的使用過程當中,咱們發現Hive Server並不穩定,並且存在內存泄漏。更嚴重的是因爲Hive Server自身的設計缺陷,不能很好地應對併發訪問的狀況,因此咱們如今並不推薦使用Hive JDBC的訪問方式。

社區後來從新開發了Hive Server 2,解決了併發的問題,咱們正在對Hive Server 2進行測試。

有一些同事,特別是BI的同事,不熟悉以CLI的方式使用Hive,但願Hive能夠有個GUI界面。在上線Hive Server以後,咱們調研了開源的SQL GUI Client——Squirrel,惋惜使用Squirrel訪問Hive存在一些問題。

  • 辦公網與線上環境是隔離的,在辦公機器上運行的Squirrel沒法連到線上環境的Hive Server。
  • Hive會返回大量的數據,特別是當用戶對於Hive返回的數據量沒有預估的狀況下,Squirrel會吃掉大量的內存,而後Out of Memory掛掉。
  • Hive JDBC實現的JDBC不完整,致使Squirrel的GUI中只有一部分功能可用,用戶體驗很是差。

基於以上考慮,咱們本身開發了Hive Web,讓用戶經過瀏覽器就可使用Hive。Hive  Web最初是做爲大衆點評第一屆Hackathon的一個項目被開發出來的,技術上很簡單,但得到了良好的反響。如今Hive  Web已經發展成了一個RESTful的Service,稱爲Polestar(https://github.com/dianping /polestar)。

圖3  Polestar的結構

圖3是Polestar的結構圖。目前Hive Web只是一個GWT的前端,經過HAProxy將RESTfull  Call分發到執行引擎Worker執行。Worker將自身的狀態保存在MySQL,將數據保存在HDFS,並使用JSON返回數據或數據在HDFS的 路徑。咱們還將Shark與Hive Web集成到了一塊兒,用戶能夠選擇以Hive或者Shark執行Query。

一開始咱們使用LZO做爲存儲格式,使大文件能夠在MapReduce處理中被切分,提升並行度。但LZO的壓縮比不夠高,按照咱們的測試,Lzo壓縮的文件,壓縮比基本只有Gz的一半。

通過調研,咱們將默認存儲格式替換成RCFile,在RCFile內部再使用Gz壓縮,這樣既可保持文件可切分的特性,同時又可得到Gz的高壓縮比,並且因 爲RCFile是一種列存儲的格式,因此對於不須要的字段就不用從I/O讀入,從而提升了性能。圖4顯示了將Nginx數據分別用Lzo、RCFile+Gz、RCFfile+Lzo壓縮,再不斷增長Select的Column數,在Hive上消耗的CPU時間(越小越好)。

圖4  幾種壓縮方式在Hive上消耗的CPU時間

但RCFile的讀寫須要知道數據的Schema,並且須要熟悉Hive的Ser/De接口。爲了讓MapReduce Job能方便地訪問RCFile,咱們使用了Apache Hcatalog。

社區又針對Hive 0.11開發了ORCFile,咱們正在對ORCFile進行測試。

隨着Facebook、淘寶等大公司成功地在生產環境應用HBase,HBase愈來愈受到你們的關注,咱們也開始對HBase進行測試。經過測試咱們發現HBase很是依賴參數的調整,在默認配置下,HBase能得到很好的寫性能,但讀性能不是特別出色。經過調整HBase的參數,在5臺機器的HBase集羣上,對於1KB大小的數據,也能得到5萬左右的TPS。在HBase 0.94以後,HBase已經優化了默認配置。

原來咱們但願HBase集羣與主Hadoop集羣共享HDFS,這樣能夠簡化運維成本。但在測試中,發現即便主Hadoop集羣上沒有任何負載,HBase的性能也很糟糕。咱們認爲,這是因爲大量數據屬於遠程讀寫所引發的。因此咱們如今的HBase集羣都是單獨部署的。而且經過封裝HBase  Client與Master-Slave Replication,使用2套HBase集羣實現了HBase的HA,用來支撐線上業務。

2013年持續演進

在創建了公司主要的大數據架構後,咱們上線了HBase的應用,並引入Spark/Shark以提升Ad Hoc Query的執行時間,並調研分佈式日誌收集系統,來取代手工腳本作日誌導入。

如今HBase上線的應用主要有OpenAPI和手機團購推薦。OpenAPI相似於HBase的典型應用Click  Stream,將開放平臺開發者的訪問日誌記錄在HBase中,經過Scan操做,查詢開發者在一段時間內的Log,但這一功能目前尚未對外開放。手機 團購推薦是一個典型的KVDB用法,將用戶的歷史訪問行爲記錄在HBase中,當用戶使用手機端訪問時,從HBase得到用戶的歷史行爲數據,作團購推 薦。

當Hive大規模使用以後,特別是原來使用OLAP數據庫的BI部門的同事轉入後,一個愈來愈大的抱怨就是Hive的執行速度。對於離 線的ETL任務,Hadoop/Hive是一個良好的選擇,但動輒分鐘級的響應時間,使得Ad Hoc Query的用戶難以忍受。爲了提升Ad Hoc Query的響應時間,咱們將目光轉向了Spark/Shark。

Spark是美國加州大學伯克利分校AMPLab開發的分佈式計算系統,基於RDD(Resilient Distributed  Dataset),主要使用內存而不是硬盤,能夠很好地支持迭代計算。由於是一個基於Memory的系統,因此在數據量可以放進Memory的狀況下,能 夠大幅縮短響應時間。Shark相似於Hive,將SQL解析爲Spark任務,而且Shark複用了大量Hive的已有代碼。

在Shark接入以後,大大下降了Ad Hoc Query的執行時間。好比SQL語句:

select host, count(1) from HIPPOLOG where dt = '2013-08-28' group by host order by host desc;

在Hive執行的時間是352秒,而Shark只須要60~70秒。但對於Memory中放不下的大數據量,Shark反而會變慢。

目前用戶須要在Hive  Web中選擇使用Hive仍是Shark,將來咱們會在Hive中添加Semantic-AnalysisHook,經過解析用戶提交的Query,根據 數據量的大小,自動選擇Hive或者Shark。另外,由於咱們目前使用的是Hadoop  1,不支持YARN,因此咱們單獨部署了一個小集羣用於Shark任務的執行。

Wormhole解決告終構化數據的交換問題,但對於非結構化數據,例如各類日誌,並不適合。咱們一直採用腳本或用戶程序直接寫HDFS的方式將用戶的Log導入HDFS。缺點是,須要必定的開發和維護成本。咱們 但願使用Apache  Flume解決這個問題,但在測試了Flume以後,發現了Flume存在一些問題:Flume不能保證端到端的數據完整性,數據可能丟失,也可能重複。

例如,Flume的HDFSsink在數據寫入/讀出Channel時,都有Transcation的保證。當Transaction失敗時,會回滾,而後重試。但因爲HDFS不可修改文件的內容,假設有1萬行數據要寫入HDFS,而在寫入5000行時,網絡出現問題致使寫入失敗,Transaction回滾,而後重寫這10000條記錄成功,就會致使第一次寫入的5000行重複。咱們試圖修正Flume的這些問題,但因爲這些問題是設計上的,並不能經過簡單的Bugfix來解決,因此咱們轉而開發Blackhole系統將數據流導入HDFS。目前Blackhole正在開發中。

總結

圖5是各系統整體結構圖,深藍部分爲自行開發的系統。

圖5  大衆點評各系統整體結構圖

在這2年多的Hadoop實踐中,咱們獲得了一些寶貴經驗。

  • 建設一支強大的技術團隊是相當重要的。Hadoop的生態系統,還處在快速演化中,並且文檔至關匱乏。只有具有足夠強的技術實力,才能用好開源軟件,並在開源軟件不能知足需求時,自行開發解決問題。
  • 要立足於解決用戶的需求。用戶須要的東西,會很容易被用戶接受,並推廣開來;某些東西技術上很簡單,但能夠解決用戶的大問題。
  • 對用戶的培訓,很是重要。

做者房明,大衆點評網平臺架構組高級工程師,Apache Contributor。2011年加入點評網,目前負責大數據處理的基礎架構及全部Hadoop相關技術的研發。

 

出處:http://www.csdn.net/article/2013-12-18/2817838-big-data-practice-in-dianping

相關文章
相關標籤/搜索