Storm 系列(二)實時平臺介紹

Storm 系列(二)實時平臺介紹

本章中的實時平臺是指針對大數據進行實時分析的一整套系統,包括數據的收集、處理、存儲等。通常而言,大數據有 4 個特色: Volumn(大量)、 Velocity(高速)、 Variety(多樣)、 Value(價值),所以針對大數據的實時平臺有如下特色。html

  • 延退 :高延遲意味着實時性的缺失。
  • 分佈式 :互聯網時代,大多數的系統都是部署在一套由多臺廉價 Linux 服務器組成的集羣上。
  • 高性能 :高速產生的大量數據,經過計算分析獲取其中的價值,這須要高性能可靠的處理模型。
  • 高擴展性 :整個系統要有較強的擴展性,數據井噴時可以經過快速部署解決系統的實時需求。而事實上,隨着業務的增加,數據量、計算量會呈指數級增加,因此係統的高擴展性是必須的。
  • 容錯性 :整個系統須要有較強的容錯性,一個節點宕機不影響業務。

同時,對於應用開發者而言,平臺上運行的應用程序容易開發和維護。各處理邏輯的分佈、消息的分發以及消息分發的可靠性對於應用開發者是透明的。對於運維而言,平臺還須要是可監控的。前端

結合互聯網大數據應用的特色,咱們基於 Storm 構建了實時平臺。node

2.1 實時平臺架構介紹

當網站或者 APP 到達必定的用戶量後,通常須要一套 Tracker 系統(如圖 2-1 所示),收集用戶行爲(如用戶 IP 地址、頁面來源、城市名、瀏覽器版本、按鈕位置等)、頁面訪問性能、異常出錯等信息,而後根據必定的策略上報到日誌服務器。搜索、推薦、廣告選品中心等開發團隊分析這些日誌,能夠調整和開發各類功能;產品經理、高級管理人員等經過這些日誌及時優化營運並進行正確決策;運維和應用開發人員根據這些日誌進行排錯和迭代產品等。 Tracker 系統在一個成熟的應用中扮演着重要的角色,隨着業務的發展,對它的實時性要求也愈來愈高。數據庫

圖2.1 Tracker系統

Tracker 系統通常採用 Javascript 語言開發,支持自動打點字段、自動擴展字段等,在網站或者應用的各個頁面的事件中嵌入 Tracker 系統的 API,設置必定的策略發送到日誌服務器,而後再同步到 Kafka 等消息隊列。對於須要實時日誌的應用,通常經過 Storm 等流式計算框架從消息隊列中拉取消息,完成相關的過濾和計算,最後存到 Hbase、 MYSQL 等數據庫中;對於實時性要求不高的應用,消息隊列中的日誌消息經過 Cloudera 的 Flume 系統 Sink 到 HDFS 中,而後後通常經過 ETL、Hive 或者批處理的 Hadoop 做業等抽取到 Hbase、MYSQL 等數據庫中。如圖 2-2 所示,日誌服務器的數據也能夠經過 Flume 系統 Sink 到 Kafka 等消息隊列中,供 Storm 實時時處理消息。apache

圖2.2 Flume過程

2.2 Kafka 架構

在 Kafka 的官方介紹中, Kafka 定義爲一個設計獨特的消息系統。相比於通常的消息隊列, Kafka 提供了一些獨特的特性,很是高的吞吐能力,以及強大的擴展性。本小節將簡單介紹 Kafka。後端

2.2.1 Kafka 的基本術語和概念

Kafka 中有如下一些概念。瀏覽器

  • Broker :任何正在運行中的 Kafka 示例都稱爲 Broker。
  • Topic :Topic 其實就是一個傳統意義上的消息隊列。
  • Partition:即分區。一個 Topic 將由多個分區組成,每一個分區將存在獨立的持久化文件,任何一個 Consumer 在分區上的消費必定是順序的;當一個 Consumer 同時在多個分區上消費時, Kafka 不能保證整體上的強順序性(對於強順序性的一個實現是 Exclusive Consumer,即獨佔消費,一個隊列列同時只能被一個 Consumer 消費而且從該消費開始消費某個消息到其確認纔算消費完成,在此期間任何 Consumer 不能再消費)。
  • Producer :消息的生產者。
  • Consumer :消息的消費者。
  • Consumer Group :即消費組。一個消費組是由一個或者多個 Consumer 組成的,對於同一個 Topic,不一樣的消費組都將能消費到全量的消息,而同一個消費組中的 Consumer 將競爭每一個消息(在多個 Consumer消 費同一個 Topic 時, Topic 的任何個分區將同時只能被一個 Consumer 消費)。

如圖 2-3 所示,在 Kafka 中,消息將被生產者「推」(push)到 Kafka 中, Consumer 會不停地輪詢從 Kafka 中「拉」(pull)數據。緩存

圖2.3 Karaf中消息的讀寫過程

2.2.2 Kafka 在實時平臺中的應用

在工做環境中,流式計算平臺架構如圖 2-4 所示。服務器

圖2.4 流式計算平臺

用戶訪問會源源不斷地產生數據,數據要麼存儲在本地並在須要時發送到相關的應用,要麼存儲到一個統一的中央存儲區中。產生的數據會被 Storm 中的 Spout 抓取、過濾並進行相關處理(例如應用之間協議解析、格式分析、數據據校驗等),而後發送到 Bolt 中進行數據分析,最終造成可用數據並存儲到持久化介質(如 DB)中,供其餘應用獲取。微信

數據暫存區的意義在於,首先數據是隨着用戶的訪問而產生的,通常的平臺在數據產生後要向其餘分析程序「推」數據,而 Storm 是主動抓取數據並進行分析處理,是「拉」;其次即使在 Storm 中實現一個可以接受「推」的模型(如在 Spout 中增長內存隊列等),當數據源忽然增長時有可能致使 Storm 上應用併發度不足而引發其餘情況,此時至關於對 Storm 發起一次 DoS 攻擊。所以,去掉數據暫存區對 Storm 的維護、整個平臺的運維而言都不是很是好的選擇。

不少大數據實時平臺的數據暫存區選用了 Kafka,是基於 Kaka 的如下優勢。

  • 高性能 :每秒鐘能處理數以千計生產者生產的消息,詳盡的數據請參考官網的壓力測試結果。
  • 高擴展性 :當 Kafka 容量不夠時能夠經過簡單增長服務器橫向擴展 Kafka 集羣的容量。
  • 分佈式 :消息來自數以千計的服務,數據量比較巨大,單機顯然不能處理這個量級的數據,爲解決容量不足、性能不夠等情況,分佈式是必需的。
  • 持久性 : Kafka 會將數據持久化到硬盤上,以防止數據的丟失。
  • Kafka相對比較活躍 : 在 Storm 0.9.2 中, Kafka 已是 Storm Spout 中的可選 Spout。

本節將簡單描述 Kafka,關於其更詳盡的信息請直接參考 Kafka 官方文檔: http://Kafka.apache.org/documentation.html

Kafka 是由 LinkedIn 開源的高效的持久化的日誌型消息隊列,利用用磁盤高效的順序讀寫特性使得在不少場景下,瓶頸甚至不在於磁盤讀寫而在於網絡的傳輸上。與 Amazon 的 Dynamo 引領了一批 NOSQL 相似, Kafka 的設計哲學很值得借鑑,在國內不少公司內部的消息隊列中均可以看到 Kafka 的身影,如 makfa、 metaq、 queue 等。如下將簡單介紹 Kafka,關於 Kafka 更多的內容詳見附錄 A 或者請查閱官方文檔。

2.2.3 消息的持久化和順序讀寫

Kafka 沒有使用內存做爲緩存,而是直接將數據順序地持久化到硬盤上(事實上數據是以塊的方式持久化的),同時 Kafka 中的每一個隊列能夠包含多個區並分別持久化到不一樣的文件中。關於順序讀寫的分析,在 Kafka 的官方介紹中有這樣的描述:「在一個 6 * 7200rpm SATA RAIL-5 的磁盤陣列上線性寫的速度大概是 300MB/S,可是隨機寫的速度只有50KB/s。」

2.2.4 sendfile 系統調用和零複製

在數據發送端, Kafka 使用 sendfile 調用減小了數據從硬盤讀取到發送之間內核態和用戶態之間的數據複製。

傳統上,當用戶須要讀取磁盤上的數據併發送到客戶端時,會經歷這樣的步驟:打開文件磁盤上的文件準備讀取,建立遠端套接字(socket)的鏈接,循環從磁盤上讀取數據將讀取到的數據發送出去,發送完成後關閉文件和遠端鏈接。仔細分析其中的步驟,咱們會發現,在這個過程當中,一份數據的發送須要屢次複製。首先,經過 read 調用每次從磁盤上讀取文件,數據會被從磁盤上覆制到內核空間,而後再被複制到讀取進程所在的用戶空間。其次,經過 write 調用將數據從進程所在的用戶空間發送出去時,數據會被從用戶空間複製到內核空間,再被複制到對應的網卡緩衝區,最終發送出去。期間數據經歷了屢次複製以及在用戶態和內核態之間的屢次轉換,每一次都將產生一個很是昂貴的上下文切換,當有大量的數據僅僅須要從文件讀出並被髮送時代價會很是大。

sendfile 系統調用優化了上述流程:數據將首先從磁盤複製到內核空間,再從內核空間複製到發送緩衝區,最終被髮送出去。在 Linux 系統中, sendfi1e 能夠支持將數據發送到文件、網絡設備(網卡)或者其餘設備上。 sendfile 是 Kernel2.2 提供的新特性(從 glibc 2.1 開始提供頭文件 <Sys/sendfile.h>)

圖 2-5 中簡單對比了使用通常 read/write 和使用 sendfile 將數據從硬盤中讀出併發送的過程。

圖2.5 read/write和sendfile系統調用對比 圖2.5 read/write和sendfile系統調用對比

經過分析,咱們能夠發現,經過簡單 的read/ rite 讀取併發送數據,須要 4 次系統調用以及 4 次數據複製;而使用 sendfile 只產生 2 次系統調用及數據複製。因爲每一次空間切換內核將產生中斷、保護現場(堆棧、寄存器的值須要保護以備執行完成後切換回來)等動做,每一次數據複製消耗大量 CPU。 sendfile 對這兩個優化帶來的變化是數據發送吞吐量提升,同時減小了對CPU 資源的消耗。當存在大量須要從硬盤上發送的數據時,其優點將很是明顯。也正所以,不少涉及文件下載、發送的服務都支持直接 sendfile 調用,如 Apache httpd、 Nginx、 Lighttpd 等。

2.2.5 Kafka的的客戶端

Kafka 目前支持的客戶端有 CC++、Java、.NET、 Python、Ruby、Perl、 Clojure、 Erlang、Scala 等,甚至還提供了 HTTP REST 的訪問接口。

在消息生產端,能夠預約義消息的投放規則,如某些消息該向哪一個 Partition 發送(如能夠按照消息中的某個字段,如用戶字段,進行哈希,使得全部該用戶的消息都發送到同個 Partition 上)。

在消息的消費端,客戶端會將消息消費的偏移量記錄到 Zookeeper 中。若是須要事務性的支持,能夠將偏移量的存儲放在事務中進行:除非消息被消費並被處理完成,不然事務的回滾將知足再次消費的目的。

2.2.6 Kafka 的擴展

Kafka依賴於 Zookeeper,集羣的擴容很是方便,直接啓動一個新的節點便可。對於已存在的消息隊列, Kafka 提供了相關的工具(kafka-reassign partitions.sh)將數據遷移到新節點上。在 0.8.1 版本中,該工具尚不能在保證遷移的同時保證負載均衡。

2.3 大衆點評

實時平臺大衆點評網於 2003 年 4 月成立於上海。大衆點評網是中國領先的城市生活消費平臺,也是全球最先創建的獨立第三方消費點評網站。大衆點評不只爲用戶提供商戶信息消費點評及消費優惠等信息服務,同時亦提供團購、餐廳預訂、外賣及電子會員卡等 O2O( Online to Offline)交易服務。大衆點評網是國內最先開發本地生活移動應用的企業業,目前己經成長爲一家領先的移動互聯網公司,大衆點評移動客戶端已成爲本地生活必備工具。

### 2.3.1 相關數據

截止到 2014 年第三季度,大衆點評網月活躍用戶數超過 1.7 億,點評數量超過 4200 萬條,收錄商戶數量超過 1000 萬家,覆蓋全國 2300 多個城市及美國、日本、法國、澳大利韓國、新加坡、泰國、越南、馬來西亞、印度尼西亞、柬埔寨、馬爾代夫、毛里求斯等近百個熱門旅遊國家和地區。

截止到 2014 年第三季度,大衆點評月綜合瀏覽量(網站及移動設備)超過 8 0億,其中移動客戶端的瀏覽量超過 80%,移動客戶端累計獨立用戶數超過 1.8 億。目前,除上海總部以外,大衆點評已經在北京、廣州、天津、杭州、南京等130座城市設立分支機構。

2.3.2 實時平臺簡介

目前大衆點評的實時數據平臺通過一段時間的搭建已經基本成型。平臺包括了一系列的工具和系統,大部分系統是在原有系統的基礎上適當增長功能來完成。主要部分包括了日誌打點和收集系統、數據傳輸和計算平臺、持久化數據服務以及在線數據服務等部分。

(1) 日誌的傳輸和收集,主要依賴 Blackhole 和 Puma 來完成。 Blackhole 是一個大衆評本身開發的相似於 Kafka 的分佈式消息系統,收集了除 MYSQL 日誌之外的全部數據源的日誌,並以流的形式提供了批量和實時兩種數據消費方式。2.3.3 節將具體介紹 Blackhole。Puma 是以 MYSQL 從節點(slave node)的方式運行,接收 MYSQL 的 binlog 解析 binlog,而後以MQ 的形式提供數據服務。

日誌打點和收集系統包括瞭如下幾個日誌數括源。

  • 瀏覽器自助打點服務,供產品經理和運營人員,數據分析人員在頁面上配置打點配置完成後,系統自動將須要打點的地方推送到前端網頁上,用戶瀏覽網頁時候的點擊行爲以及鼠標懸停等就會觸發相應的日誌數據,實時傳回後端的日誌服務器。

  • 在大衆點評的 3 個主要 APP(大衆點評、大衆點評團和周邊快查)的框架中內含了全部頁面的按鈕、頁面滑動以及頁面切換等的埋點,只要用戶有相應的操做,就會記錄日誌,批量發送到日誌服務器。

  • 此外,同其餘平臺的合做(如微信、QQ 空間等)也有相應的埋點,記錄對應的日誌。

以上全部的用戶瀏覽日誌數據加上後端應用的日誌、 Nginx 日誌和數據庫的增刪改志等,一併統統日誌收集系統實時地傳輸到日誌的消費方(主要是 Storm 中的 Topology)。其餘的數據源還包括 MQ 系統,由應用在執行過程當中產生。

(2) Storm 是實時平臺的核心組成部分,目前在 Storm 上運行了幾十個業務 Topology,日處理數據量在百億級,峯值的數據 TPS 在 10 萬左右。隨着大衆點評業務的發展,數據處理量仍在快速增長。

(3) Topology 中 Bolt 計算的結果數據和中間交換數據根據業務需求存放在 Redis、 Hbase 或者 MYSQL 中。

(4) 數據持久化到相應的數據庫中後,由 RPC 服務器提供對外統一的訪問服務,用戶不用關心數據存儲的細節、位置和容錯,直接獲取數據。

整個平臺的系統架構如圖 2-6 所示

圖2.6 平臺的系統架構

2.3.3 Blackhole

Blackhole 是相似於 Kafka 的的一個流式系統,是大衆點評的數據收集和訂閱消費的平臺。數據倉庫的全部日誌數據都是由 Blackhole 來完成收集並存入HDFS 中的。 Blackhole 天天收集超過 2TB 的日誌數據。 Blackhole 的 Agent 同其餘平臺工具一塊兒部署在全部的幾千臺線上機器中,批量日誌收集保證數據無丟失,實時數據保證高實效性和高性能。

Blackhole 具備良好的水平擴展性和容錯能力。內部基於行爲(actor-based)的分佈式系統實現系統的高性能:採用 Kafka 相似的提交日誌(commit log)保證數據完整性。在 Blackhole 中,分爲 4 類角色,即 Supervisor、 Broker、 Agent 和 Consumer。

  • Supervisor: Supervisor 是管理者,負責全部的調度以及元數據管理。 Agent、 Broker 、 Consumer 都和 Supervisor 維持了心跳信息,若是某個 Broker 失敗了, Supervisor 會讓這個 Broker 鏈接的 Agent 和 Consumer 轉移到其餘 Broker 節點上。進行相應的動態擴容之後, Supervisor 會發起 rebalance 操做,保持負載均衡。
  • Broker: Broker 是數據的管理者。 Agent 向 Broker 上報數據, Broker 會在本地磁盤緩存數據,用於可靠性保障。 Consumer 向 Broker 發送數據所在文件位置的偏移量,獲取對應具體的數據。同一個數據源的數據會發送到多個 Broker 中以達到負載均衡的效果。同時 Broker 會批量地將日誌文件上傳到 HDFS 中,用於後續的做業和各類數據分析。
  • Agent: Agent 監聽相應的日誌文件,是數據的生產者,它將日誌發送到 Broker。
  • Consumer: Consumer 實時地從 Broker 中獲取日誌數據。一般將 Storm 的 Spout 做爲具體的 Consumer 來消費數據。

Blackhole 體系架構如圖 2-7 所示。

圖2.7 Blackhole體系架構

2.4 1號店實時平臺

1號店於 2008 年 7 月成立於上海,開創了中國電子商務行業「網上超市」的先河。至 2013 年末,覆蓋了食品飲料、生鮮、進口食品、美容護理、服飾鞋靴、廚衛清潔用品母嬰用品、數碼手機、家居居用品、家電、保健器械、電腦辦公、箱包珠寶手錶、運動戶外禮品等等 14 個品類。1 號店是中國第一家自營生鮮的綜合性電商;在食品飲料尤爲進口食品方面,緊緊佔據中國 B2C 電商行業第一的市場份額;進口牛奶的銷量佔到全國海關進口總額的 37.29%;在洗護髮、沐浴、女性護理、口腔護理產品等細分品類保持了中國 B2C 電商行業第一的市場份額;手機在線銷售的市場份額躋身中國B2C電商行業前三名。

1號店擁有 9000 萬註冊用戶,800多萬的SKU,2013年實現了1154億元的銷售業績,數據平臺處理3億多的獨立用戶ID(未登陸用戶和登陸用戶),100T 的數據量。

2013 年規劃1號店實時平臺時,主要的應用爲個性化推薦、反爬蟲、反欺詐分析、商鋪訂單、流量實時分析和BI實時報表統計。平臺搭建之初,已上線的應用中天天須要實時分析的數據量峯值在450GB左右,秒級別延遲。基於 Storm 的流計算也一樣適用於搜索實時索引、移動端流量分析、廣告曝光數據分析、風險控制和移動端訪問數據分析等應用場景。

和全部互聯網公司大數據分析服務同樣,1 號店的數據服務包括數據的採集、收集、分析、持久化、應用引擎、推送和展現等。數據的收集主要來自基於 Javascript 的內部實現的服務(如 Tracker、基於開源的 Haproxy 的日誌等),數據收集後,部分要求準實時的服務會暫時持久化到硬盤上,後經過 Flume(這裏使用的 Flume 是 Flume-ng 版本 1.4,如下再也不贅述)、 syslogd 等推送到 Kafka 中, Storm 上的實時應用實時獲取 Kafka 中的數據進行分析,並將結果持久化供相關業務使用和展現。

1 號店實時處理平臺架構如圖 2-8 所示。

圖2.8 平臺架構

整個平臺用於處理用戶訪問產生的數據,包括行爲數據、 HALog、廣告曝光數據和流量數據等,數據會在產生的第一時間被收集併發送到日誌轉發服務(如 Scribe、 Flume)上,而後由日誌轉發服務將其推到到 Kafka 對應的 Topic 中。若是須要經過 Hadoop 計算全量,也會推送到 HDFS 中。運行在 Storm 中的應用會讀取 Kafka 中的數據進行分析,並將分析結果持久化到持久化層中。推送引擎主動獲取持久化層中的數據,將處理結果推送到對應的業務系統並最終展現給用戶。在整個平臺中,使用 Flume 做爲數據推送組件是基於如下幾點考慮。

  • Flume 可以接收多種數據源,包括獲取控制檯輸出、tail、 syslogd、exec 等,支持 TCP 和 UDP 協議。
  • Flume 支持基於內存、文件等通道,數據在轉發到相關服務以前暫時存放於通道內。
  • Flume 支打持多種數據推送,如將數據推送到 HDFS、 MYSQL、 Hbase、 Mongodb 中。
  • Fume 有着很是優雅的實現,經過編寫相應的 plugin,可以輕易擴展支持其餘類型的數據源和推送
  • Flume 具備高性能

使用 Kaka 做爲數據的緩存主要是基於如下幾點考慮。

  • 某些數據會被多種業務使用,如訪問日誌,既用於反爬蟲分析也用於反欺詐、反注入分析,一個一樣的數據會被消費屢次,而 Kafka 可以知足該需求。
  • 從實時平臺而言, Storm 中 Spout 的消息消費類型屬於「拉」模式,而數據產生服務屬於「推」模式(有訪問就有數據),中間須要同時支持「推」和「拉」的消息平臺。
  • Kafka 在單臺 6 塊硬盤的服務器上實測峯值可以達到 600Mbit/s,數據的產生和消費是準實時的,性能上是能夠接受的。對於互聯網應用而言,數據的高峯多是間歇性、井噴性的,如「大促」、「週年慶」、「雙11」等日時段的流量多是平時的 5 倍甚至 10 倍。從就成本而言,與其維打個容量爲平時流量 10 倍的集羣倒不如維護一個容量爲平時 2~3 倍容量而數據井噴時容許必定的延遲的集羣更划算些。

天天用心記錄一點點。內容也許不重要,但習慣很重要!

本文來自 《Strom 技術內幕與大數據實踐》 一書。

相關文章
相關標籤/搜索