什麼是實時流計算?算法
一、實時流計算背景 數據庫
二、實時計算應用場景編程
三、實時計算處理流程緩存
四、實時計算框架服務器
什麼是實時流計算?網絡
所謂實時流計算,就是近幾年因爲數據獲得普遍應用以後,在數據持久性建模不知足現狀的狀況下,急需數據流的瞬時建模或者計算處理。這種實時計算的應用實例有金融服務、網絡監控、電信數據管理、 Web 應用、生產製造、傳感檢測,等等。在這種數據流模型中,單獨的數據單元多是相關的元組(Tuple),如網絡測量、呼叫記錄、網頁訪問等產生的數據。可是,這些數據以大量、快速、時變(多是不可預知)的數據流持續到達,由此產生了一些基礎性的新的研究問題——實時計算。實時計算的一個重要方向就是實時流計算。
架構
實時流計算背景 併發
數據的價值隨着時間的流逝而下降,因此事件出現後必須儘快對它們進行處理,最好事件出現時便馬上對其進行處理,發生一個事件進行一次處理,而不是緩存起來成一批處理。例如商用搜索引擎,像 Google、 Bing 和 Yahoo! 等,一般在用戶查詢響應中提供結構化的Web 結果,同時也插入基於流量的點擊付費模式的文本廣告。爲了在頁面上的最佳位置展示最相關的廣告,經過一些算法來動態估算給定上下文中一個廣告被點擊的可能性。上下文可能包括用戶偏好、地理位置、歷史查詢、歷史點擊等信息。一個主搜索引擎可能每秒鐘處理成千上萬次查詢,每一個頁面均可能會包含多個廣告。爲了及時處理用戶反饋,須要一個低延遲、可擴展、高可靠的處理引擎。負載均衡
對於這些實時性要求很高的應用,若把持續到達的數據簡單地放到傳統數據庫管理系統DBMS)中,並在其中進行操做,是不切實際的。傳統的 DBMS 並非爲快速連續地存放單的數據單元而設計的,並且也不支持「持續處理」,而「持續處理」是數據流應用的典型特徵。另外,如今人們都認識到,「近似性」和「自適應性」是對數據流進行快速查詢和其處理(如數據分析和數據採集)的關鍵要素,而傳統 DBMS 的主要目標偏偏與之相反:通穩定的查詢設計,獲得精確的答案。框架
另一些方案是採用 MapReduce 來處理實時數據流。可是,儘管 MapReduce 作了實時性改進,也很難穩定地知足應用需求。這是由於 Hadoop MapReduce 框架爲批處理作了高度優化,典型的是經過調度批量任務來操做靜態數據,任務不是常駐服務,數據也不是實時流入;而數據流計算的典型範式之一是不肯定數據速率的事件流流入系統,系統處理能力必須與事件流量匹配。
實時計算應用
互聯網領域的實時流計算通常都是針對海量數據進行的,除了非實時計算的需求(如計算結果準確)之外,實時計算最重要的一個需求是可以實時響應計算結果,通常要求爲秒級。我的理解,互聯網行業的實時計算能夠分爲如下兩種應用場景。
1)數據源是實時的、不間斷的,要求對用戶的響應時間也是實時的。主要用於互聯網流式數據處理。所謂流式數據,是指將數據看做數據流的形式來處理。數據流則是在時間分佈和數量上無限的一系列數據記錄的集合體;數據記錄是數據流的最小組成單元。例如,對於大型網站,活躍的流式數據很是常見,這些數據包括網站的訪問 PV/UV、用戶訪問的內容、搜索的內容等。實時的數據計算和分析能夠動態實時地刷新用戶訪問數據,展現網站實時流量的變化狀況,分析天天各小時的流量和用戶分佈狀況,這對於大型網站來講具備重要的實際意義。
2)數據量大且沒法或不必預算,但要求對用戶的響應時間是實時的。主要用於特定場合下的數據分析處理。當數據量很大,同時發現沒法窮舉全部可能條件的查詢組合或者大量窮舉出來的條件組合無用時,實時計算就能夠發揮做用,將計算過程推遲到查詢階段進行,但須要爲用戶提供實時響應。
實時計算處理流程
互聯網上海量數據(通常爲日誌流)的實時計算過程能夠劃分爲 3 個階段:數據的產生與收集階段、傳輸與分析處理階段、存儲對對外提供服務階段,如圖 1-1 所示。下面分別進行簡單介紹。
圖 1 實時計算處理流程
(1)數據實時採集
需求:功能上保證能夠完整地收集到全部日誌數據,爲實時應用提供實時數據;響應時間上要保證明時性、低延遲(在 1s 左右);配置簡單,部署容易;系統穩定可靠等。
目前,互聯網企業的海量數據採集工具備 Facebook 開源的 Scribe、 LinkedIn 開源的Kafka、 Cloudera 開源的 Flume,淘寶開源的 TimeTunnel、 Hadoop 的 Chukwa 等,它們都可以知足每秒數百 MB 的日誌數據採集和傳輸需求。
(2)數據實時計算
傳統的數據操做,首先將數據採集並存儲在 DBMS 中,而後經過查詢和 DBMS 進行交互,獲得用戶想要的答案。在整個過程當中,用戶是主動的,而 DBMS 系統是被動的,過程操做如圖 1-2 所示。
圖 2 傳統的數據操做流程
可是,對於如今大量存在的實時數據,如股票交易的數據,這類數據實時性強,數據量大,沒有止境,傳統的架構並不合適。流計算就是專門針對這種數據類型準備的。在流數據不斷變化的運動過程當中實時地進行分析,捕捉到可能對用戶有用的信息,並把結果發送出去。在整個過程當中,數據分析處理系統是主動的,而用戶卻處於被動接收的狀態,處理流程如圖 3 所示。
圖 3 流計算處理過程
需求:適應流式數據、不間斷查詢;系統穩定可靠、可擴展性好、可維護性好等。有關計算的一些注意點:分佈式計算、並行計算(節點間的並行、節點內的並行)、熱點數據的緩存策略、服務端計算。
(3)實時查詢服務
全內存:直接提供數據讀取服務,按期轉存到磁盤或數據庫進行持久化。
半內存:使用 Redis、 Memcache、 MongoDB、 BerkeleyDB 等內存數據庫提供數據實時查詢服務,由這些系統進行持久化操做。
全磁盤:使用 HBase 等以分佈式文件系統(HDFS)爲基礎的 NoSQL 數據庫,對於KeyValue 內存引擎,關鍵是設計好 Key 的分佈。
實時計算框架
最近這幾年隨着實時計算的流行,相繼出現瞭如下實時計算的框架。
一、 IBM 的 StreamBase
StreamBase 是 IBM 開發的一款商業流式計算系統,在金融行業和政府部門使用,其自己是商業應用軟件,但提供了開發版。相對於付費使用的企業版,開發版的功能更少,但這並不妨礙咱們從外部使用 API 接口來對 StreamBase 自己進行分析。
StreamBase 使用 Java 開發, IDE 是基於 Eclipse 進行二次開發,功能很是強大。 StreamBase也提供了至關多的 Operator、 Functor 以及其餘組件來幫助構建應用程序。用戶只須要經過 IDE拖拉控件,而後關聯,設置好傳輸的 Schema 而且設置控件計算過程,就能夠編譯出一個高效處理的流式應用程序。同時, StreamBase 還提供了類 SQL 來描述計算過程。 StreamBase 的架構如圖 1-4 所示。
StreamBase Server 是節點上啓動的管理進程,它負責管理節點上 Container 的實例,每一個 Container 通 過 Adapter 獲 得 輸 入, 交 給 應 用 邏 輯 計 算, 然 後 通 過 Adapter 輸 出。 各 個Container 相互鏈接,造成一個計算流圖。
圖4 StreamBase 架構圖
Adapter 負責與異構輸入或輸出交互,源或目的地可能包括 CSV 文件、 JDBC、 JMS、Simulation( StreamBase 提供的流產生模擬器)或用戶定製。
每一個 StreamBase Server 上面都會有一個 System Container,主要是產生系統監控信息的流式數據。
HA Container 用於容錯恢復,能夠看出它實際包含兩個部分: Heartbeat 和 HA Events,其中 Heartbeat 也是 Tuple 在 Container 之間傳輸。在 HA 方案下, HA Container 監控 PrimaryServer 的活動狀況,而後將這些信息轉換成爲 HA Events 交給 StreamBase Monitor 來處理。
Monitor 就是從 System Container 和 HA Container 中獲取數據並進行處理。 StreamBase認爲 HA 問題應該經過 CEP 方式處理,也就是說出現問題的部件確定會反映在 SystemContainer 和 HA Container 的輸出流上面, Monitor 若是經過復瑣事件處理這些 Tuples 就可以檢測到機器故障等問題,並作出相應處理。
二、Yahoo 的 S42
Yahoo! S4(Simple Scalable Streaming System)是一個通用的、分佈式的、可擴展的、分區容錯的、可插拔的流式系統 。基於 S4 框架,開發者能夠容易地開發面向持續流數據處理的應用。 S4 的最新版本是 v0.6.0,是 Apache 孵化項目,其設計特色有如下幾個方面。
Actor 計算模型:爲了能在普通機型構成的集羣上進行分佈式處理,而且在集羣內部不使用共享內存, S4 架構採用了 Actor 模式,這種模式提供了封裝和地址透明語義,所以在容許應用大規模併發的同時,提供了簡單的編程接口。 S4 系統經過處理單元(Processing Elements, PEs)進行計算,消息在處理單元間以數據事件的形式傳送,PE 消費事件,發出一個或多個可能被其餘 PE 處理的事件,或者直接發佈結果。每一個PE 的狀態對於其餘 PE 不可見, PE 之間惟一的交互模式就是發出事件和消費事件。
對等集羣架構: S4 採用對等架構,集羣中的全部處理節點都是等同的,沒有中心控制節點,這使得集羣的擴展性很好,處理節點的總數理論上無上限;同時, S4 沒有單點容錯的問題。
可插拔體系架構: S4 系統使用 Java 語言開發,採用了極富層次的模塊化編程,每一個通用功能點都儘可能抽象出來做爲通用模塊,並且儘量地讓各模塊實現可定製化。
支持部分容錯:基於 ZooKeeper 服務的集羣管理層會自動路由事件從失效節點到其餘節點。除非顯式保存到持久性存儲,不然節點故障時,節點上處理事件的狀態會丟失。
S4 的重要應用場景是預估點擊經過率(CTR)。 CTR 是廣告點擊數除以展示數獲得的比率,擁有足夠歷史的展示和點擊數據後, CTR是用戶點擊廣告可能性的一個很好的估算,精確的來源點擊對於個性化和搜索排名來講都價值無限。據 S4 的開發者稱,在線流量上的實驗顯示基於S4系統的新CTR計算框架能夠在不影響收入的前提下將 CTR 值提升 3%,這主要是經過快速檢測低質量的廣告並把它們過濾出去而得到的收益。 S4 系統提供的低延遲處理可以使得商務廣告部門獲益,可是潛在的風險也不能忽視,那就是事件流的速率快到必定程度後,S4可能沒法處理, 會致使事件的丟失, 如圖4所示。
圖 5 S4 在流量壓力測試下的事件丟失狀況
三、Twitter 的 Storm
Twitter 的 Storm : Storm 是一個分佈式的、容錯的實時計算系統。 Storm 的用途:可用於處理消息和更新數據庫(流處理),在數據流上進行持續查詢,以流的形式返回結果到客戶端(持續計算),並行化一個相似實時查詢的熱點查詢(分佈式的 RPC)。
Storm 爲分佈式實時計算提供了一組通用原語,可被用於「流處理」中,實時處理消息並更新數據庫。這是管理隊列及工做者集羣的另外一種方式。 Storm 也可用於「連續計算」
( continuous computation),對數據流作連續查詢,在計算時將結果以流的形式輸出給用戶。它還用於「分佈式 RPC」,以並行的方式運行昂貴的運算。
Storm 的主要特色以下:
簡單的編程模型。相似於 MapReduce 下降了並行批處理複雜性, Storm 下降了進行實時處理的複雜性。
可使用各類編程語言。能夠在 Storm 上使用各類編程語言。默認支持 Clojure、Java、 Ruby 和 Python。要增長對其餘語言的支持,只需實現一個簡單的 Storm 通訊協議便可。
容錯性。 Storm 會管理工做進程和節點的故障。
水平擴展。計算是在多個線程、進程和服務器之間並行進行的。
可靠的消息處理。 Storm 保證每一個消息至少能獲得一次完整處理。當任務失敗時,它會負責從消息源重試消息。
快速。系統的設計保證了消息能獲得快速的處理,使用 ZeroMQ 做爲其底層消息隊列。
本地模式。 Storm 有一個「本地模式」,能夠在處理過程當中徹底模擬 Storm 集羣。這可使用戶快速進行開發和單元測試。
四、Twitter 的 Rainbird
Rainbird 是一款分佈式實時統計系統,能夠用於實時數據的統計。
1)統計網站中每個頁面,域名的點擊次數。
2)內部系統的運行監控(統計被監控服務器的運行狀態)。
3)記錄最大值和最小值。
Rainbird 構建在 Cassandra 上,使用 Scala 編寫,依賴於 ZooKeeper、 Scribe 和 Thrift。每秒能夠寫入 10 萬個事件,並且都帶有層次結構,或者進行各類查詢,延遲小於 100ms。目前 Twitter 已經在 Promoted Tweets、微博中的連接、短地址、每一個用戶的微博交互等生產環境使用了 Rainbird。其主要組件的功能以下。
ZooKeeper:是 Hadoop 子項目中的一款分佈式協調系統,用於控制分佈式系統中各個組件的一致性。
Cassandra :是 NoSQL 中一款很是出色的產品,集合了 Dynamo 和 BigTable 特性的分佈式存儲系統,用於存儲須要統計的數據,並提供客戶端查詢統計數據(須要使用分佈式 Counter 補丁 CASSANDRA-1072)。
Scribe :是 Facebook 開源的一款分佈式日誌收集系統,用於在系統中將各個須要統計的數據源收集到 Cassandra 中。
Thrift :是 Facebook 開源的一款跨語言 C/S 網絡通訊框架,開發人員基於該框架能夠輕鬆地開發 C/S 應用。
五、Facebook 的 Puma
Puma 是 Facebook 的數據流處理系統,早期的處理系統如圖 1-6 所示,即二代 Puma。PTail 將數據以流的方式傳遞給 Puma 2, Puma 2 每秒須要處理百萬級的消息,處理多爲Aggregation 方式的操做,遵循時間序列,涉及的複雜 Aggregation 操做諸如獨立訪次、最頻繁事件,等等。
圖 6 Puma 2 系統數據處理通路
對於每條消息, Puma 2 發送「Increment」操做到 HBase。考慮到自動負載均衡、自動容錯和寫入吞吐等因素, Puma 選擇 HBase 而不是 MySQL 做爲其存儲引擎。 Puma 2的服務器都是對等的,即同時可能有多個 Puma 2 服務器向 HBase 中修改同一行數據。所以,Facebook 爲 HBase 增長了新的功能,支持一條 Increment 操做修改同行數據的多列。
Puma 2的架構很是簡單而且易於維護,其涉及的狀態僅僅是 PTail 的 Checkpoint,即上游數據位置週期性地存儲在 HBase中。因爲是對稱結構,集羣擴容和機器故障的處理都很是方便。不過, Puma 2的缺點也很突出,首先,HBase的Increment操做是很是昂貴的,由於它涉及讀和寫,而HBase的隨機讀效率比較差;另外,複雜 Aggregation 操做也很差支持,須要在 HBase上寫不少用戶代碼;再者,Puma 2在故障時會產生少許重複數據,由於 HBase的 Increment 和 PTail 的 Checkpoint 並非一個原子操做。
但值得一提的是, Puma 並無開源出來,用戶能夠了解和借鑑其實現原理。
六、阿里的 JStorm
JStorm 是一個 Alibaba 開源的分佈式實時計算引擎,能夠認爲是 Twitter Storm 的 Java版本,用戶按照指定的接口實現一個任務,而後將這個任務遞交給 JStorm 系統, JStorm 會啓動後臺服務進程 7×24 小時運行,一旦某個 Worker 發生故障,調度器當即分配一個新的Worker 替換這個失效的 Worker。
JStorm 處理數據的方式是基於消息的流水線處理,所以特別適合無狀態計算,也就是計算單元依賴的數據所有能夠在接受的消息中找到,而且最好一個數據流不依賴另一個數據流。所以, JStorm 適用於下面的場景:
日誌分析。從日誌中分析出特定的數據,並將結果存入外部存儲器,如數據庫。
管道系統。將數據從一個系統傳輸到另一個系統,如將數據庫同步到 Hadoop。
消息轉化器。將接收到的消息按照某種格式轉化,存儲到另一個系統,如消息中間件中。
統計分析器。從日誌或消息中提煉出某個字段,而後進行 COUNT 或 SUM 計算,最後將統計值存入外部存儲器。
可是, JStorm 的活躍度並不高,截至本章書寫時,整個 JStorm 項目共提交過 36 次,而且只有 1 個 Committer,相比 Twitter Storm,無論是活躍度,仍是承認度都還不是一個數量級的產品。
七、其餘實時計算系統
(1) HStreaming
HStreaming 是創建在 Hadoop 上的可擴展的、可持續的數據分析系統。它能夠分析、可視化並處理大量連續數據,如一個金融交易系統實時展現數據圖。 HStreaming 由 Jana Uhlig與 Volkmar Uhlig 聯合創立,該公司沒有提供相關產品的開源版本,從官網信息來看,只提供相關的解決方案。
HStreaming 公司嘗試爲 Hadoop 環境添加一個實時的組件,當數據提交到系統,在存儲到磁盤以前會進行數據處理,相似開源的 Storm 和 Kafka。目前 HStreaming 已經創建了一個完整的系統,該系統可以利用實時的引擎來處理視頻、服務器、傳感器以及其餘機器上生成的數據流,並且徹底兼容 Hadoop 做爲一個歸檔和批量處理系統。
(2) Esper
Esper 是 EsperTech 公司使用 Java 開發的事件流處理(Event Stream Processing, ESP)和復瑣事件處理(Complex Event Processing, CEP)引擎。 CEP 是一種實時事件處理並從大量事件數據流中挖掘複雜模式的技術。 ESP 是一種從大量事件數據流中過濾、分析有意義的事件,並可以實時取得這些有意義的信息的技術。該引擎可應用於網絡入侵探測、 SLA 監測、RFID 讀取、航空運輸調控、金融(風險管理、欺詐探測)等領域。 Esper 能夠用在股票系統、風險監控系統等實時性要求比較高的系統中。
(3) Borealis
Borealis 是由 Brandeis University、 Brown University 和 MIT 合做開發的一個分佈式流式系統,由以前的流式系統 Aurora、 Medusa 演化而來,是學術研究的一個產品, 2008 年已經中止維護。
Borealis 具備豐富的論文、完整的用戶 / 開發者文檔,系統是用 C++ 實現的,運行於x86-based Linux 平臺。系統是開源的,同時使用了較多的第三方開源組件,包括用於查詢語言翻譯的 ANTLR、 C++ 的網絡編程框架庫 NMSTL 等。
Borealis 系統的流式模型和其餘流式系統基本一致:接受多元的數據流和輸出流,爲了容錯,採用肯定性計算,對於容錯性要求高的系統,會對輸入流使用算子進行定序。
八、框架對比
實時數據流計算是近年來分佈式、並行計算領域研究和實踐的重點,不管是工業界,仍是學術界,都誕生了多個具備表明性的數據流計算系統,用於解決實際生產問題和進行學術研究。不一樣的系統知足不一樣應用的需求,系統並沒有好壞之分,關鍵在於服務的對象是誰。圖 1-7 從開發語言、高可用機制、支持精確恢復、主從架構、資源利用率、恢復時間、支持狀態持久化及支持去重等幾個方面比較了典型的 3 個數據流計算系統 Puma、 Storm 和 S4。由於 StreamBase 是廠商發行商用版本, HStreaming 只提供解決方案,而 JStorm 和 Storm 很是類似,因此這幾種產品並無羅列在圖 7 中。
圖7 Puma、 Storm 和 S4 三種數據流計算系統對比
能夠看到,爲了高效開發,兩個系統使用 Java,另外一種系統使用函數式編程語言Clojure ;高可用方案,有兩個系統使用 Primary Standby 方式,系統恢復時間可控,但系統複雜度增長,資源使用率也較低,由於須要一些機器來當備機;而 Storm 選擇了更簡單可行的上游回放方式, 資源使用率高,就是恢復時間可能稍長些; Puma 和 S4 都支持狀態持久化,但 S4 目前不支持數據去重,將來可能會實現;三個系統都作不到精確恢復,即恢復後的執行結果和無端障發生時保持一致,由於即便是 Primary Standby 方式,也只是按期Checkpoint,並無跟蹤每條消息的執行。商用的 StreamBase 支持精確恢復,這主要應用於金融領域。