流式實時分佈式計算的設計

https://blog.csdn.net/anzhsoft/article/details/38168025java

1. 流式計算的背景和特色算法


如今不少公司天天都會產生數以TB級的大數據,如何對這些數據進行挖掘,分析成了很重要的課題。好比:數據庫

電子商務:須要處理而且挖掘用戶行爲產生的數據,產生推薦,從而帶來更多的流量和收益。最理想的推薦就是根據興趣推薦給用戶原本不須要的東西!而天天處理海量的用戶數據,須要一個低延時高可靠的實時流式分佈式計算系統。
新聞聚合:新聞時效性很是重要,若是在一個重大事情發生後可以實時的推薦給用戶,那麼確定能增大用戶粘性,帶來可觀的流量。
社交網站:你們天天都會去社交網站是爲了看看如今發生了什麼,周圍人在作什麼。流式計算能夠把用戶關注的熱點聚合,實時反饋給用戶,從而達到一個圈子的聚合效果。
交通監管部門:每一個城市的交通監管部門天天都要產生海量的視頻數據,這些視頻數據也是以流的形式源源不斷的輸系統中。實時流式計算系統須要以最快的速度來處理這些數據。
數據挖掘和機器學習:它們其實是互聯網公司內部使用的系統,主要爲線上服務提供數據支撐。它們能夠說是互聯網公司的最核心的平臺之一。系統的效率是挖掘的關鍵,理想條件下就是天天產生的海量數據都能獲得有效處理,對於原來的數據進行全量更新。
大型集羣的監控:自動化運維很重要,集羣監控的實時預警機制也很是重要,而流式系統對於日誌的實時處理,每每是監控系統的關鍵。
等等。
流式實時分佈式計算系統就是要解決上述問題的。這些系統的共同特徵是什麼?編程

很是方便的運行用戶編寫的計算邏輯:就如Hadoop定義了Map和Reduce的原語同樣,這些系統也須要讓用戶關注與數據處理的具體邏輯上,他們不該該也不須要去了解這些usder defined codes是如何在分佈式系統上運轉起來的。由於他們僅僅關注與數據處理的邏輯,所以能夠極大的提升效率。並且應該儘可能不要限制編程語言,畢竟不一樣的公司甚至同一公司的不一樣部門使用的語言多是千差萬別的。支持多語言無疑能夠搶佔更多的用戶。
Scale-out的設計:分佈式系統天生就是scale-out的。
無數據丟失:系統須要保證無數據丟失,這也是系統高可用性的保證。系統爲了無數據丟失,須要在數據處理失敗的時候選擇另外的執行路徑進行replay(系統不是簡單的從新提交運算,而是從新執行調度,不然按照來源的call stack有可能使得系統永遠都在相同的地方出一樣的錯誤)。
容錯透明:用戶不會也不須要關心容錯。系統會自動處理容錯,調度而且管理資源,而這些行爲對於運行於其上的應用來講都是透明的。
數據持久化:爲了保證高可用性和無數據丟失,數據持久化是沒法躲避的問題。的確,數據持久化可能在低延時的系統中比較影響性能,可是這沒法避免。固然了,若是考慮到出錯狀況比較少,在出錯的時候咱們可以忍受數據能夠從頭replay,那麼中間的運算能夠不進行持久化。注意,這隻有在持久化的成本要比計算的replay高的狀況下有效。通常來講,計算的結果須要replica,固然了,可使用將數據replica到其餘的節點的內存中去(這又會佔用集羣的網絡帶寬)。
超時設置:超時之因此在在這裏被提出來,由於超時時間的大小設置須要重視,若是過短能夠會誤殺正常運行的計算,若是太長則不能快速的檢測錯誤。還有就是對於錯誤的快速發現能夠這類系統的一個設計要點,畢竟,超時了才發現錯誤不少時候在時效性上是不可接受的。網絡


2. 原語設計架構


        Hadoop定義了Map和Reduce,使得應用者只須要實現MR就能夠實現數據處理。而流式系統的特色,容許它們能夠進行更加具體一些的原語設計。流式的數據的特色就是數據時源源不斷進入系統的,而這些數據的處理通常都須要幾個階段。拿普通的日誌處理來講,咱們可能僅僅關注Error的日誌,那麼系統的第一個計算邏輯就是進行filer。接下來可能須要對這個日誌進行分段,分段後可能交給不一樣的規則處理器進行處理。所以,數據處理通常是分階段的,能夠說是一個有向無環圖,或者說是一個拓撲。實際上,Spark抽象出的運算邏輯就是由RDD(Resilient Distributed Dataset)構成DAG(Directed Acyclic Graph),而Storm則有Spout和Blot構成Topology(拓撲)。併發

 

2.1 Spark的設計
       Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據,每一段數據都轉換成Spark中的RDD,而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加,或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。框架

 

 

WordCount的例子:運維

// Create the context and set up a network input stream to receive from a host:port
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
val lines = ssc.socketTextStream(args(1), args(2).toInt)

// Split the lines into words, count them, and print some of the counts on the master
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
這個例子使用Scala寫的,一個簡單優雅的函數式編程語言,同時也是基於JVM的後Java類語言。機器學習


2.2 Storm的設計
        Storm將計算邏輯成爲Topology,其中Spout是Topology的數據源,這個數據源多是文件系統的某個日誌,也多是MessageQueue的某個消息隊列,也有多是數據庫的某個表等等;Bolt負責數據的護理。Bolt有可能由另外兩個Bolt的join而來。

       而Storm最核心的抽象Streaming就是鏈接Spout,Bolt以及Bolt與Bolt之間的數據流。而數據流的組成單位就是Tuple(元組),這個Tuple可能由多個Fields構成,每一個Field的含義都在Bolt的定義的時候制定。也就是說,對於一個Bolt來講,Tuple的格式是定義好的。

 

 

2.3 原語設計的要點
流式系統的原語設計,要關注一下幾點:

如何定義計算拓撲:要方便算法開發者開發算法與策略。最好的實現是定義一個算法與框架的交互方式,定義好算法的輸入結構和算法的輸出結構。而後拓撲可以組合不一樣的算法來爲用戶提供一個統一的服務。計算平臺最大的意義在於算法開發者不須要了解程序的運行,併發的處理,高可用性的實現,只須要提供算法與計算邏輯便可以快速可靠的處理海量的數據。
拓撲的加載與啓動:對於每一個節點來講,啓動時須要加載拓撲,節點須要其餘的信息,好比上游的數據來源與下游的數據輸出。固然了下游的數據輸出的拓撲信息能夠存儲到Tuple中,對於數據須要放到那裏去拓撲自己是無狀態的。這就取決於具體的設計了。
拓撲的在線更新:對於每一個算法邏輯來講,更新是不可避免的,如何在不中止服務的狀況下進行更新是必要的。因爲實現了架構與算法的剝離,所以算法能夠以一個單獨的個體進行更新。能夠操做以下:Master將算法實體保存到一個Worker可見的地方,好比HDFS或者是NFS或者ZK,而後經過心跳發送命令到拓撲,拓撲會暫時中止處理數據而加載新的算法實體,加載以後從新開始處理數據。數據通常都會放到buffer中,這個buffer多是一個queue。可是從外界看來,拓撲其實是一直處於服務狀態的。
數據如何流動:流式系統最重要的抽象就是Streaming了。那麼Steaming如何流動?實際上涉及到消息的傳遞和分發,數據如何從一個節點傳遞到另一個節點,這是拓撲定義的,具體實現能夠參照第三小節。
計算的終點及結果處理:流式計算的特色就是計算一直在進行,流是源源不斷的流入到系統中的。可是對於每一個數據單位來講它的處理結果是肯定的,這個結果通常是須要返回調用者或者須要持久化的。好比處理一個時間段的交通違章,那麼輸入的數據是一段時間的視頻監控,輸出這是違章的信息,好比車牌,還有違章時刻的抓拍的圖片。這個數據要麼返回調用者,由調用者負責數據的處理,包括持久化等。或者是拓撲最後的節點將這些信息進行持久化。系統須要對這些常見的case進行指導性的說明,須要在Programmer Guide的sample中給出使用例子。


3. 消息傳遞和分發
       對於實現的邏輯來講,它們都是有向無環圖的一個節點,那麼如何設計它們之間的消息傳遞呢?或者說數據如何流動的?由於對於分佈式系統來講,咱們不能假定整個運算都是在同一個節點上(事實上,對於閉源軟件來講,這是能夠的,好比就是知足一個特定運算下的計算,計算平臺也不須要作的那麼通用,那麼對於一個運算邏輯讓他在一個節點完成也是能夠了,畢竟節省了調度和網絡傳輸的開銷)。或者說,對於一個通用的計算平臺來講,咱們不能假定任何事情。

      消息傳遞和分發是取決於系統的具體實現的。經過對比Storm和Spark,你就明白我爲何這麼說了。

 

3.1 Spark的消息傳遞
對於Spark來講,數據流是在經過將用戶定義的一系列的RDD轉化成DAG圖,而後DAG Scheduler把這個DAG轉化成一個TaskSet,而這個TaskSet就能夠向集羣申請計算資源,集羣把這個TaskSet部署到Worker中去運算了。固然了,對於開發者來講,他的任務是定義一些RDD,在RDD上作相應的轉化動做,最後系統會將這一系列的RDD投放到Spark的集羣中去運行。

 

3.2 Storm的消息傳遞      
對於Storm來講,他的消息分發機制是在定義Topology的時候就顯式定義好的。也就是說,應用程序的開發者須要清楚的定義各個Bolts之間的關係,下游的Bolt是以什麼樣的方式獲取上游的Bolt發出的Tuple。Storm有六種消息分發模式:

Shuffle Grouping: 隨機分組,Storm會盡可能把數據平均分發到下游Bolt中。
Fields Grouping:按字段分組, 好比按userid來分組, 具備一樣userid的tuple會被分到相同的Bolt。這個對於相似於WordCount這種應用很是有幫助。
All Grouping: 廣播, 對於每個Tuple, 全部的Bolts都會收到。這種分發模式要慎用,會形成資源的極大浪費。
Global Grouping: 全局分組, 這個Tuple被分配到storm中的一個bolt的其中一個task。這個對於實現事務性的Topology很是有用。
Non Grouping: 不分組, 這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是同樣的效果, 有一點不一樣的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裏面去執行。
Direct Grouping: 直接分組,  這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪一個task處理這個消息。
3.3 消息傳遞要點
消息隊列如今是模塊之間通訊的很是通用的解決方案了。消息隊列使得進程間的通訊能夠跨越物理機,這對於分佈式系統尤其重要,畢竟咱們不能假定進程到底是部署在同一臺物理機上仍是部署到不一樣的物理機上。RabbitMQ是應用比較普遍的MQ,關於RabbitMQ能夠看個人一個專欄:RabbitMQ

提到MQ,不得不提的是ZeroMQ。ZeroMQ封裝了Socket,引用官方的說法: 「ZMQ (如下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架同樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ 的明確目標是「成爲標準網絡協議棧的一部分,以後進入 Linux 內核」。如今還未看到它們的成功。可是,它無疑是極具前景的、而且是人們更加須要的「傳統」BSD 套接字之上的一層封裝。ZMQ 讓編寫高性能網絡應用程序極爲簡單和有趣。」

所以, ZeroMQ不是傳統意義上的MQ。它比較適用於節點之間和節點與Master之間的通訊。Storm在0.8以前的Worker之間的通訊就是經過ZeroMQ。可是爲何0.9就是用Netty替代了ZeroMQ呢?說替代不大合適,只是0.9的默認的Worker之間的通訊是使用了Netty,ZeroMQ仍是支持的。Storm官方認爲ZeroMQ有如下缺點:

不容易部署,尤爲是在雲環境下:覺得ZMQ是以C寫的,所以它仍是緊依賴於操做系統環境的。
沒法限制其內存。經過JVM能夠很容易的限制java所佔用的內存。可是ZMQ對於Storm來講是個黑盒似得存在。
Storm沒法從ZMQ獲取信息。好比Storm沒法知道當前buffer中有多少數據爲發送。
固然了還有所謂的性能問題,具體能夠訪問Netty做者的blog。結論就是Netty的性能比ZMQ(在默認配置下)好兩倍。不知道所謂的ZMQ的默認配置是什麼。反正我對這個結果挺驚訝。固然了,Netty使用Java實現的確方便了在Worker之間的通訊加上受權和認證機制。這個使用ZMQ的確是不太好作。

 

4. 高可用性
HA是分佈式系統的必要屬性。若是沒有HA,其實系統是不可用的。那麼若是實現HA?對於Storm來講,它認爲Master節點Nimbus是無狀態的,無狀態意味着能夠快速恢復,所以Nimbus並無實現HA(不知道之後的Nimbus是否會實現HA,實際上使用ZooKeeper實現節點的HA是開源領域的通用作法)。爲何說Nimbus是無狀態的呢?由於集羣全部的元數據都保存到了ZooKeeper(ZK)中。Nimbus定時從ZK獲取信息,而且經過向ZK寫信息來控制Worker。Worker也是經過從ZK中獲取信息,經過這種方式,Worker執行從Nimbus傳遞過來的命令。

Storm的這種使用ZK的方式仍是很值得借鑑的。

Spark是如何實現HA的?個人另一篇文章分析過Spark的Master是怎麼實現HA的:Spark技術內幕:Master基於ZooKeeper的High Availability(HA)源碼實現 。

也是經過ZK的leader 選舉實現的。Spark使用了百行代碼的級別實現了Master的HA,因而可知ZK的功力。

 

除了這些Master的HA,還有每一個Worker的HA。或者說Worker的HA說法不太準確,所以對於集羣裏的工做節點來講,它能夠很是容易失敗的。這裏的HA能夠說是如何讓Worker失敗後快速重啓,從新提供服務。實現方式也能夠由不少種。一個簡單的方法就是使用一個容器(Container)啓動Worker而且監控Worker的狀態,若是Worker異常退出,那麼就從新啓動它。這個方法很簡單也頗有效。

若是是節點宕機呢?上述方法確定是不能用的。這種狀況下Master會檢測到Worker的心跳超時,那麼就會從資源池中把這個節點刪除。回到正題,宕機後的節點重啓涉及到了運維方面的知識。對於一個集羣來講,硬件宕機這種狀況應該須要統一的管理,也就是集羣也能夠由一個Master,維持每一個節點的心跳來肯定硬件的狀態。若是節點宕機,那麼集羣首先是重啓它。若是啓動失敗可能會經過電話或者短信或者郵件通知運維人員。所以運維人員爲了保證集羣的高可用性付出了不少的努力,尤爲是大型互聯網公司的運維人員,很是值得點贊。固然了這個已經不是Storm或者Spark所能涵蓋的了。

 

5. 存儲模型與數據不丟失
其實,數據不丟失有時候和處理速度是矛盾的。爲了數據不丟失就要進行數據持久化,數據持久化意味着要寫硬盤,在固態硬盤尚未成爲標配的今天,硬盤的IO速度永遠是系統的痛點。固然了能夠在另外節點的內存上進行備份,可是這涉及到了集羣的兩個稀缺資源:內存和網絡。若是由於備份而佔用了大量的網絡帶寬的話,那必將影響系統的性能,吞吐量。

固然了,可使用日誌的方式。可是日誌的話對於錯誤恢復的時間又是不太能接受的。流式計算系統的特色就是要快,若是錯誤恢復時間太長,那麼可能不如直接replay來的快,並且系統設計還更爲簡單。

其實若是不是爲了追求100%的數據丟失,可使用checkpoint的機制,容許一個時間窗口內的數據丟失。

回到系統設計自己,實際上流式計算系統主要是爲了離線和近線的機器學習和數據挖掘,所以確定要保證數據的處理速度:至少系統能夠處理一天的新增數據,不然數據堆積愈來愈大。所以即便有的數據處理丟失了數據,可讓源頭從新發送數據。

 

還有另一個話題,就是系統的元數據信心如何保存,由於系統的路由信息等須要是全局可見的,須要保存相似的這些數據以供集羣查詢。固然了Master節點保持了和全部節點的心跳,它徹底能夠保存這些數據,而且在心跳中能夠返回這些數據。實際上HDFS的NameNode就是這麼作的。HDFS的NN這種設計很是合理,爲何這麼說?HDFS的元數據包含了很是多的數據:

目錄文件樹結構和文件與數據塊的對應關係:會持久化到物理存儲中,文件名叫作fsimage。
DN與數據塊的對應關係,即數據塊存儲在哪些DN中:在DN啓動時會上報到NN它所維護的數據塊。這個是動態創建的,不會持久化。所以,集羣的啓動可能須要比較長的時間。

那麼對於流式計算系統這種算得上輕量級的元數據來講,Master處理這些元數據實際上要簡單的多,固然了,Master須要實現服務的HA和數據的HA。這些不是一個輕鬆的事情。實際上,能夠採用ZooKeeper來保存系統的元數據。ZooKeeper使用一個目錄樹的結構來保存集羣的元數據。節點能夠監控感興趣的數據,若是數據有變化,那麼節點會收到通知,而後就保證了系統級別的數據一致性。這點對於系統比較重要,由於節點都是不穩定的,所以系統的其餘服務可能都會由於節點失效而發生變化,這些都須要通知相關的節點更新器服務列表,保證了部分節點的失效並不會影響系統的總體的服務,從而也就實現了故障對於用戶的透明性。--------------------- 做者:anzhsoft 來源:CSDN 原文:https://blog.csdn.net/anzhsoft/article/details/38168025 版權聲明:本文爲博主原創文章,轉載請附上博文連接!

相關文章
相關標籤/搜索