淺談分佈式計算的開發與實現

介紹

分佈式計算簡單來講,是把一個大計算任務拆分紅多個小計算任務分佈到若干臺機器上去計算,而後再進行結果彙總。 目的在於分析計算海量的數據,從雷達監測的海量歷史信號中分析異常信號(外星文明),淘寶雙十一實時計算各地區的消費習慣等。html

海量計算最開始的方案是提升單機計算性能,如大型機,後來因爲數據的爆發式增加、單機性能卻跟不上,纔有分佈式計算這種妥協方案。 由於計算一旦拆分,問題會變得很是複雜,像一致性、數據完整、通訊、容災、任務調度等問題也都來了。mysql

舉個例子,產品要求從數據庫中100G的用戶購買數據,分析出各地域的消費習慣金額等。 若是沒什麼時間要求,程序員小明就寫個對應的業務處理服務程序,部署到服務器上,讓它慢慢跑就是了,小明預計10個小時能處理完。 後面產品嫌太慢,讓小明想辦法加快到3個小時。
日常開發中相似的需求也不少,總結出來就是,數據量大、單機計算慢。 若是上Hadoop、storm之類成本較高、並且有點大才小用。 固然讓老闆買更好的服務器配置也是一種辦法。程序員

利用分片算法

小明做爲一個有追求有理想的程序員,決定用介於單機計算和成熟計算框架的過分解決方案,這樣成本和需求都能知足了。 分佈式計算的核心在於計算任務拆分,若是數據能以水平拆分的方式,分佈到5臺機器上,每臺機器只計算自身的1/5數據,這樣即能在3小時內完成產品需求了。算法

如上所述,小明須要把這些數據按照必定維度進行劃分。 按需求來看以用戶ID劃分最好,因爲用戶之間沒有狀態上的關聯,因此也不須要事務性及二次迭代計算。 小明用簡單的hash取模對id進行劃分。sql

f(memberid) % 5 = ServerN

這樣程序能夠分別部署到5臺機器上,而後程序按照配置只取對應餘數的用戶id,計算出結果併入庫。 這種方式多機之間毫無關聯,不須要進行通訊,能夠避免不少問題。 機器上的程序自己也不具有分佈式的特性,它和單機同樣,只計算自身獲取到的數據便可,因此若是某臺機器上程序崩潰的話,處理方式和單機同樣,好比記錄下處理進度,下次從當前進度繼續進行後續計算。數據庫

利用消息隊列

使用分片方式相對比較簡單,但有以下不足之處。編程

  • 它不具備負載均衡的能力,若是某臺機器配置稍好點,它可能最早計算完,而後空閒等待着。也有多是某些用戶行爲數據比較少,致使計算比較快完成。
  • 還有一個弊端就是每臺機器上須要手動更改對應的配置, 這樣的話多臺機器上的程序不是徹底同樣的,這樣能夠用遠程配置動態修改的辦法來解決。

小明這種方式引入了個第三方,消息隊列。 小明先用一個單獨的程序把用戶信息推送到消息隊列裏去,而後各臺機器分別取消費這個隊列。 因而就有了3個角色:服務器

  • 推送消息的,簡稱Master。
  • 消息隊列,這裏以Rabbitmq爲例。
  • 各個處理程序,簡稱Worker或Slave都行。

雖然僅僅引入了個第三方,但它已經具有了分佈式計算的不少特性。網絡

  1. 計算任務分發。 Master把須要計算的用戶數據,不斷的推送消息隊列。
  2. 程序一致性。 Worker訂閱相同的消息隊列便可,無需更改程序代碼。
  3. 任意擴容。 因爲程序徹底同樣,意味着若是想要加快速度,重複部署一份程序到新機器便可。 固然這是理論上的,實際當中會受限於消息隊列、數據庫存儲等。
  4. 容災性。 若是5臺中某一臺程序掛了也不影響,利用Rabbitmq的消息確認機制,機器崩潰時正在計算的那一條數據會在超時,在其餘節點上進行消費處理。

Hadoop簡介

Hadoop介紹已經至關多了,這裏簡述下好比:"Hadoop是一套海量數據計算存儲的基礎平臺架構",分析下這句話。架構

  • 其中計算指的是MapReduce,這是作分佈式計算用的。
  • 存儲指的是HDFS,基於此上層的有HBase、Hive,用來作數據存儲用的。
  • 平臺,指能夠給多個用戶使用,好比小明有一計算需求,他只須要按照對應的接口編寫業務邏輯便可,而後把程序以包的形式發佈到平臺上,平臺進行分配調度計算等。 而上面小明的分佈式計算設計只能給本身使用,若是另外有小華要使用就須要從新寫一份,而後單獨部署,申請機器等。Hadoop最大的優點之一就在於提供了一套這樣的完整解決方案。

下面找了介紹Hadoop的概覽圖,跟小明的設計作對比下:

  • 圖中「大數據計算任務」 對應小明的100G用戶數據的計算任務。
  • 」任務劃分「 對應Master和消息隊列。
  • 「子任務」 對應Worker的業務邏輯。
  • 」結果合併「 對應把每一個worker的計算結果入庫。
  • 「計算結果」 對應入庫的用戶消費習慣數據。

PS:爲了方便描述,把小明設計的分佈式計算,叫作小和尚。

MapReduce

因爲MapReduce計算輸入和輸出都是基於HDFS文件,因此大多數公司的作法是把mysql或sqlserver的數據導入到HDFS,計算完後再導出到常規的數據庫中,這是MapReduce不夠靈活的地方之一。 MapReduce優點在於提供了比較簡單的分佈式計算編程模型,使開發此類程序變得很是簡單,像以前的MPI編程就至關複雜。

狹隘的來說,MapReduce是把計算任務給規範化了,它能夠等同於小和尚中Worker的業務邏輯部分。 MapReduce把業務邏輯給拆分紅2個大部分,Map和Reduce,能夠先在Map部分把任務計算一半後,扔給Reduce部分繼續後面的計算。 固然在Map部分把計算任務全作完也是能夠的。 關於Mapreduce實現細節部分很少解釋,有興趣的同窗能夠查相關資料或看下樓主以前的C#模擬實現的博客【探索C#之微型MapReduce

若是把小明產品經理的需求放到Hadoop來作,其處理流程大體以下:

  1. 把100G數據導入到HDFS
  2. 按照Mapreduce的接口編寫處理邏輯,分Map、Reduce兩部分。
  3. 把程序包提交到Mapreduce平臺上,存儲在HDFS裏。
  4. 平臺中有個叫Jobtracker進程的角色進行分發任務。 這個相似小和尚的Master負載調度管理。
  5. 若是有5臺機器進行計算的話,就會提早運行5個叫TaskTracker的slave進程。 這相似小和尚worker的分離版,平臺把程序和業務邏輯進行分離了, 簡單來講就是在機器上運行個獨立進程,它能動態加載、執行jar或dll的業務邏輯代碼。
  6. Jobtracker把任務分發到TaskTracker後,TaskTracker把開始動態加載jar包,建立個獨立進程執行Map部分,而後把結果寫入到HDFS上。
  7. 若是有Reduce部分,TaskTracker會建立個獨立進程把Map輸出的HDFS文件,經過RPC方式遠程拉取到本地,拉取成功後,Reduce開始計算後續任務。
  8. Reduce再把結果寫入到HDFS中
  9. 從HDFS中把結果導出。

這樣一看好像是把簡單的計算任務給複雜化了,其實若是隻有幾臺計算任務的話,使用Mapreduce確實是殺雞用牛刀了。 若是有TB、PB級別的數據、跑在成百上千臺計算節點上,Mapreduce的優點纔會體現出來。 其計算框架圖架構以下: 

離線計算

一般稱Mapreduce及小和尚這種計算爲離線計算,由於它對已經持久化的文件數據進行計算,不能實時響應。 還有個緣由就是它的處理速度比較慢,它的輸入和輸出源都是基於HDFS設計,若是數據不是一開始就寫入到HDFS上,就會涉及到數據導入導出,這部分相對耗費時間。 並且它的數據流動是基於文件系統的,Map部分輸出的數據不是直接傳送到Reduce部分,而是先寫入HDFS再進行傳送。

處理速度慢也是Mapreduce的不足之處,促使了後面實時計算的誕生。
另外個缺點是Mapreduce的計算任務流比較單一,它只有Map、Reduce兩部分。 簡單的能夠只寫一部分邏輯來解決,若是想拆分紅多個部分,如邏輯A、邏輯B、邏輯C等, 並且一部分計算邏輯依賴上一次計算結果的話,MapReduce處理起來就比較困難了。 像storm框架解決此類問題的方案,也稱爲流式計算,下一章繼續補充。

 

實時計算

接上篇,離線計算是對已經入庫的數據進行計算,在查詢時對批量數據進行檢索、磁盤讀取展現。 而實時計算是在數據產生時就對其進行計算,而後實時展現結果,通常是秒級。 舉個例子來講,若是有個大型網站,要實時統計用戶的搜索內容,這樣就能計算出熱點新聞及突發事件了。 按照之前離線計算的作法是不能知足的,須要使用到實時計算。

小明做爲有理想、有追求的程序員開始設計其解決方案了,主要分三部分。

  • 每當搜索內容的數據產生時,先把數據收集到消息隊列,因爲其數據量較大,以使用kafka爲例。 這個收集過程是一直持續的,數據不斷產生而後不斷流入到kafka中。
  • 要有一個能持續計算的框架,一旦收集到數據,計算系統能實時收到數據,根據業務邏輯開始計算,而後不斷產生須要的結果,這裏以storm爲例。
  • 根據結果進行實時展現併入庫, 能夠一邊展現一邊入庫,對外提供實時查詢的服務。這裏的入庫能夠是基於內存的Redis、MongoDB,也但是基於磁盤的HBase、Mysql、SqlServer等。

其流程圖以下: 

storm簡介

一般都介紹Storm是一個分佈式的、高容錯的實時計算系統。 「分佈式」是把數據分佈到多臺上進行計算,「高容錯」下面談,這裏主要細節介紹下「實時計算」的實現。

storm有個角色叫topology,它相似mapreduce的job,是一個完整的業務計算任務抽象。 上章談到hadoop的缺點在於數據源單一依賴HDFS,storm中Spout角色的出現解決了這個問題。 在Spout內部咱們能夠讀取任意數據源的數據,好比Redis、消息隊列、數據庫等等。 並且spout能夠是多個,這樣更好的分類,好比能夠SpoutA讀取kafka,SpoutB讀取Redis。 示例以下:

複製代碼

public class CalcPriceSpout : BaseRichSpout
{
    private SpoutCollector Collector;
    public override void NexData()
    {
        //讀取各類數據源,Redis、消息隊列、數據庫等
        Collector.emit("消息")
    }
}

複製代碼

代碼中NexData是storm的核心方法,它一直被storm循環調用着, 在方法裏咱們實時讀取kafka的消息,而後把消息經過Collector組件發射到各個計算節點裏,它相似小和尚中的Master。 這樣應用每產生一條數據,會實時收集到kafka,而後被NextData消費,發射到節點開始計算。 NextData讀取的消息時在內存中,而後直接經過網絡流動到節點機器上的內存中開始計算,不會持久化到磁盤上。

由於速度比較快,因此叫實時計算,也有叫持續計算,意思是能夠很是快的一直進行計算,至於叫什麼均可以。

流式計算

主流的流式計算有S四、StreamBase、Borealis,其storm也具備流式計算的特性。 流式計算是指「數據能像液體水同樣不斷的在各個節點間流動,每一個節點均可以對「數據(液體水)」進行計算,而後產生新的數據,繼續像水同樣流動」。如圖: 

圖中Spout就是水龍頭,它不斷的經過NextData產生數據,而後流動各個Bolt中。 Bolt是各個計算節點上的計算邏輯,它拿到數據後開始計算,完成後流向另一個,直到完成。 其Bolt也能夠是任意個,這比Mapreduce只能分紅Map、Reduce兩部分好多了。 這樣能夠在BlotA中計算中間值,而後經過這個中間值去任意數據源拉取數據後,在流動到下一步處理邏輯中, 這個中間值直接在內存中,經過網絡流動BlotB上。 其大大增長了其適用範圍和靈活度,Spout和bolt的數據流動構成了一個有向無環圖。 Bolt示例代碼以下。

複製代碼

public class CalcProductPriceBolt : BaseRichBolt
{
    private BoltCollector Collector;
    public override void Execute(Tuple<string, string> input)
    {
        //Result=計算計算計算。
        //Collector.Emit("Reulst"); 流動到另一個節點
    }
}

複製代碼

數據流動圖: 

概括總結

結合上篇,發現Hadoop離線計算的計算要求是把業務邏輯包上傳到平臺上,數據導入到HDFS上,這樣才能進行計算。 其產生的結果數據是展現以前就計算好的,另外它的計算是按批次來的,好比不少公司的報表,都是天天凌晨開始計算前一天的數據,以便於展現。 其數據是不動的,計算邏輯也是不動的。

Storm的流式計算一樣是把計算邏輯包上傳到平臺上,由平臺調度,計算邏輯是不動的。 但數據能夠是任意來源的,不斷在計算節點進行流動。 也便是說在數據產生的時刻,就開始進行流動計算,它展現的結果數據是實時變化的。 其數據是流動的,計算邏輯是不動的。storm把產生的每條數據當成一個消息來處理,其內部也是經過消息隊列組件zeromq來完成的。

高容錯性

storm提供了各級別的可靠性保證,一消息從Spout流動到boltA,在流動boltB, 那storm會經過惟一值不斷異或的設計去監測這個消息的完成狀況,這個監測是一個和業務邏輯相似的bolt,不過它是有storm自身實現的,叫Acker,它的任務就是接收各個消息任務的完成狀態,而後告訴Spout這個消息是否已經徹底處理。下面是幾種異常處理狀況:

  • BoltB所在的節點掛了或消息異常,那麼這條消息就沒有處理完,Spout可在超時後從新發射該數據便可。
  • Acker所在節點掛了後,即當前節點監控的消息徹底狀況,會所有丟失,Spout會在消息超時作後續處理。
  • 若是Spout所在節點掛了,那Spout發射的數據也會所有丟失, 這時可在消息隊列中設置超時時間,若是沒有一直沒對消息進行Ack的話,那麼這條消息會從新讓其餘的Spout從新接收到。這部分須要單獨在消息隊列中配置,另外storm消息的Ack確認對性能有必定影響,可根據消息的重要性是否要開啓它。
  • 若是storm平臺級別的組件掛了,平臺會嘗試重啓失敗的組件,storm除nimbus組件外都是多節點點部署,掛了某一節點,不會對任務計算有所影響。

下篇寫消息保證機制及改造小和尚的設計。

做者:蘑菇先生 出處: http://mushroom.cnblogs.com/

相關文章
相關標籤/搜索