關於Cascading

 Cascading是一個開源的Java庫和應用程序編程接口(API),它爲MapReduce提供了一個抽象層。它容許開發者構建出能在Hadoop集羣上運行的複雜的、關鍵任務的數據處理應用。html

  Cascading項目始於2007年夏天。它的第一個公開版本,即版本0.1,發佈於2008年1月。版本1.0發佈於2009年1月。從該項目的主頁http://www.cascading.org/能夠下載二進制版本,源代碼以及一些加載項模塊。前端

  map和reduce操做提供了強大的原語操做。然而,在建立複雜的、能夠被不一樣開發者共享的合成性高的代碼時,它們粒度級別彷佛不合適。再者,許多開發者發現當他們面對實際問題的時候,很難用MapReduce的模式來思考問題。java

  爲了解決第一個問題,Cascading用簡單字段名和一個數據元組模型值來替代MapReduce使用的鍵和值,而該模型的元組是由值的列表構成的。對第二個問題,Cascading直接從Map和Reduce操做分離出來,引入了更高層次的抽象:Function,Filter,Aggregator和Buffer。數據庫

  其餘一些可選擇的方案在該項目初始版本公開發布的同時基本上也出現了,但Cascading的設計初衷是對它們進行補充和完善。主要是考慮到大部分可選的架構都是對系統強加一些前置和後置條件或有其餘方面的要求而已。express

  例如,在其餘幾種MapReduce工具裏,運行應用程序以前,你必須對數據進行預格式化處理、過濾或把數據導入HDFS(Hadoop分佈式文件系統)。數據準備步驟必須在系統的程序設計抽象以外完成。相反,Cascading提供方法實現把數據準備和管理做爲系統程序設計抽象的組成部分。編程

  該實例研究將首先介紹Cascading的主要概念,最後歸納介紹ShareThis如何在本身的基礎框架上使用Cascading。數組

  若是但願進一步瞭解Cascading處理模型,請參見項目主頁上的「Cascading用戶手冊」。瀏覽器

  字段、元組和管道服務器

  MapReduce模型使用鍵和值的形式把輸入數據和Map函數,Map函數和Reduce函數以及Reduce函數和輸出數據聯繫起來。網絡

  但據咱們所知,實際的Hadoop應用程序一般會將多個MapReduce做業鏈在一塊兒。看一下用MapReduce模型實現的一個典型的字數統計例子。若是須要根據統計出來的數值進行降序排列,這是一個可能的要求,它將須要啓動另外一個MapReduce做業來進行這項工做。

  所以,理論上來講,鍵和值的模式不只把Map和 Reduce綁定到一塊兒,它也把Reduce和下一次的Map綁定了,這樣一直進行下去(圖16-11)。即鍵/值對源自輸入文件,流過Map和Reduce操做造成的鏈,而且最後終止到一個輸出文件。實現足夠多這樣連接的MapReduce應用程序,便能看出一系列定義良好的鍵/值操做,它們被一遍一遍地用來修改鍵/值數據流的內容。

  圖16-11. 基於MapReduce的計數和排序

  Cascading系統經過使用具備相應字段名的元組(與關係型數據庫中的表名和列名相似)來替代鍵/值模式的方法簡化了這一處理流程。在處理過程當中,由這些字段和元組組成的流數據在它們經過用戶定義的、由管道(pipe)連接在一塊兒的操做時得以處理(圖16-12)。

  所以,MapReduce的鍵和值被簡化成以下形式。

  字段

  字段是一個String(字符串)類型的名稱集合(如「first_name」)、表示位置信息的數值(如2和-1分別是第三和最後一個位置)或是二者混合使用的集合,與列名很是像。所以字段用來聲明元組裏值的名稱和經過名稱在元組中選出對應的值。後者就像執行SQL的select語句。

  圖16-12. 由字段和元組連接的管道

  元組

  元組就是由java.lang.Comparable類對象組成的數組。元組與數據庫中的行或記錄相似。

  Map和Reduce操做都被抽象隱藏到一個或多個管道實例以後(圖16-13)。

  Each

  Each管道一次只處理一個單獨的輸入元組。它能夠對輸入元組執行一個Function或一個Filter操做(後文立刻要介紹)。

  GroupBy

  GroupBy管道在分組字段上對元組進行分組。該操做相似於SQL的group by語句。若是元組的字段名相同,它也能把多個輸入元組數據流合併成一個元組數據流。

  CoGroup

  CoGroup管道既能夠實現元組在相同的字段名上鍊接,也能夠實現基於相同字段的分組。全部的標準鏈接類型(內鏈接—inner join,外鏈接—outer join等)以及自定義鏈接均可以用於兩個或多個元組數據流。

  圖16-13. 管道類型

  Every

  Every管道每次只處理元組的一個單獨分組的數據,分組數據能夠由GroupBy或CoGroup管道產生。Every管道能夠對分組數據應用Aggregator或Buffer操做。

  SubAssembly

  SubAssembly管道容許在一個單獨的管道內部進行循環嵌套流水線處理,或反過來,一個管道也能夠被嵌入更加複雜的流水線處理中。

  全部這些管道被開發者連接在一塊兒造成「管道流水線處理流程」,這裏每一個流水線能夠有不少輸入元組流(源數據,source)和不少輸出元組流(目標數據,sink)(見圖16-14)。

  圖16-14. 簡單的管道流水線

  從表面上看來,這可能比傳統的MapReduce模型更復雜。而且,不能否認,相較於Map,Reduce,Key和Value,這裏涉及的概念更多。但實際上,咱們引入了更多的概念,它們必須都工做協助提供不一樣的功能。

  例如,若是一個開發者想對reducer的輸出值提供「輔助排序」功能,她將須要實現Map、Reduce,一個「合成」Key(嵌套在父Key中的兩個Key),值,partitioner、一個用於「輸出值分組」的comparator和一個「輸出鍵」的comparator,全部這些概念以各類方式結合協做使用,而且在後續的應用中幾乎不可重用。

  在Cascading裏,這項工做只對應一行代碼:new GroupBy(, , ),其中previous是數據源管道。

  操做

  如前所述,Cascading經過引入一些替換性操做脫離了MapReduce模式,這些操做或應用於單個元組,或應用於元組分組(圖16-15)。

  Function

  Function做用於單個的輸入元組,對每一個輸入,它可能返回0或多個輸出元組。Function操做供Each類型的管道使用。

  圖16-15. 操做類型

  Filter

  Filter是一種特殊的函數,它的返回值是boolean(布爾)值,用於指示是否把當前的元組從元組流中刪除。雖然定義一個函數也能實現這一目的,可是Filter是爲實現這一目的而優化過的操做,而且不少過濾器可以經過邏輯運算符(如And、Or、Xor和Not)分組,能夠快速建立更復雜的過濾操做。

  Aggregator

  Aggregator對一組元組執行某種操做,這些分組元組是經過一組共同字段分組獲得的。好比,字段「last-name」值相同的元組。常見的Aggregator方法是Sum(求和)、Count(計數),Average(均值)、Max(最大)和Min(最小)。

  Buffer

  Buffer和Aggregator操做相似,不一樣的是,它被優化用來充當一個「滑動窗口」掃描一個惟一分組中全部的元組。當開發者須要有效地爲一組排序的元組插入遺漏的值時,或計算動態均值的時候,這個操做很是有用。一般,處理元組分組數據的時候,Aggregator也是一個可選的操做,由於不少Aggregator可以有效地連接起來工做,但有時,Buffer纔是處理這種做業的最佳工具。

  管道流水線建立的時候,這些操做便綁定到各管道(圖16-16)。

  Each和Every類型的管道提供了一種簡單的元組選擇機制,它們能夠選擇一些或全部的輸入元組,而後把這些選擇的數據傳送給它的子操做。而且咱們有一個簡單的機制把這些操做的結果和原來的輸入元組進行合併,而後產生輸出元組。這裏並不詳細說明機制,它使得每一個操做只關心參數指定的元組值和字段,而不是當前輸入元組的整個字段集。其次,操做在不一樣應用程序之間重用,這點和Jave方法重用的方式相同。

  圖16-16. 操做流程

  例如,在Java中,聲明一個方法concatenate(String first, Stringsecond),比直接定義concatenate(Person person)更抽象。第二個方法的定義,concatenate()函數必須「瞭解」Person對象;而第一個方法的定義並不清楚數據來自哪裏。Cascading操做展示了一樣的抽象能力。

  Tap類、Scheme對象和Flow對象

  在前面的幾個圖中,咱們屢次提到源數據(source)和目標數據(sink)。在Cascading系統中,全部的數據都是讀自或寫入Tab類實例,可是它們是經過Scheme對象被轉換成或取自元組實例對象。

  Tap

  Tap類負責如何訪問數據以及從哪一個位置訪問數據。例如,判斷數據是存於HDFS仍是存於本地?在Amazon S3中,仍是跨HTTP協議進行訪問?

  Scheme

  Scheme類負責讀取原始數據並把它們轉換成元組格式/或把元組數據寫入原始數據格式文件,這裏的原始數據能夠是文本行、Hadoop二進制的順序文件或是一些專用格式數據。

  注意,Tap類對象不是管道處理流程的一部分,所以它們不是Pipe類型。

  可是當Tap對象在集羣上變得可執行的時候,它們就和管道組件關聯到一塊兒。當一個管道處理流程與必要的幾個源和目標數據Tap實例關聯一塊兒後,咱們就獲得一個Flow對象。Flow對象是在管道處理流程與指定數量的源及目標數據Tap關聯時建立的,而Tap對象的功能是輸出或獲取管道流程指望的字段名。就是說,若是Tap對象輸出一個具備字段名「line」的元組(經過讀取HDFS上的文件數據),那麼這個管道流程頭部必須也但願字段名是「line」。不然,鏈接管道處理流程和Tap的處理程序會馬上失敗並報錯。

  所以,管道處理流程實際上就是數據處理定義,而且它們自己不是「可執行」的。在它們能夠在集羣上運行以前,必須鏈接到源和目標Tap對象。這種把Tap和管道處理流程分開處理的特性使Cascading系統很是強大。

  若是認爲管道處理流程和Java類類似,那麼Flow就像Java對象實例(圖16-17)。也就是說,在同一個應用程序裏面,一樣的管道處理流程能夠被實例化不少次從而造成新的Flow,不用擔憂它們之間會有任何干擾。如此一來,管道處理流程就能夠像標準Java庫同樣建立和共享。

  圖16-17. 流水線處理過程

  Cascading實戰

  如今咱們知道Cascading是什麼,清楚地瞭解它是如何工做的,可是用Cascading寫的應用程序是什麼樣子呢?咱們來看看例16-2。

  例16-2. 字數統計和排序

  Scheme sourceScheme =

  new TextLine(new Fields("line")); ?

  Tap source =

  new Hfs(sourceScheme, inputPath); ?

  Scheme sinkScheme = new TextLine(); ?

  Tap sink =

  new Hfs(sinkScheme, outputPath, SinkMode.REPLACE); ?

  Pipe assembly = new Pipe("wordcount"); ?

  String regexString = "(?

  Function regex = new RegexGenerator(new Fields("word"), regexString);

  assembly =

  new Each(assembly, new Fields("line"), regex); ?

  assembly =

  new GroupBy(assembly, new Fields("word")); ?

  Aggregator count = new Count(new Fields("count"));

  assembly = new Every(assembly, count); ?

  assembly =

  new GroupBy(assembly, new Fields("count"), new Fields("word")); ?

  FlowConnector flowConnector = new FlowConnector();

  Flow flow =

  flowConnector.connect("word-count", source, sink, assembly);

  flow.complete();

  ? 建立一個新的Scheme對象讀取簡單的文本文件,爲每一行名爲「line」字段(被Fields對象聲明)輸出一個新的Tuple對象。

  ? 建立一個新的Scheme對象用於寫簡單文本文件,而且它指望輸出的是一個具備任意多個字段/值的Tuple對象。假若有多個值要輸出,這些值在輸出文件裏將以製表符分隔。

  ? 建立源和目標Tap實例分別指向輸入文件和

  ? 輸出目錄。目標Tap對象輸出數據時將覆蓋目錄下現有的全部文件。

  ? 構建管道處理流程的頭,並把它命名爲「wordcount」。這個名稱用於綁定源及目標數據到這個管道處理流程。多個頭或尾要求必須有本身惟一的名稱。

  ? 構建具備一個函數的Each類型管道,它將解析line字段裏的每一個詞,把解析結果放入一個新的Tuple對象。

  ? 構建GroupBy管道,它將建立一個新的Tuple組,實現基於word字段的分組。

  ? 構建一個具備Aggregator操做的Every類型管道,它將對基於不一樣詞的分組Tuple對象分別進行字數統計。統計結果存於count的字段裏。

  ? 構建GroupBy類型管道,它將根據數值對count字段進行分組,造成新的Tuple分組,而後對word字段值進行輔助排序。結果是一組基於count字段值升序排列的count字段值和word字段的值列表。

  用Flow對象把管道處理流程和數據源及目標聯繫起來,而後

  在集羣上執行這個Flow。

  在這個例子裏,咱們統計輸入文件中的不一樣單詞的數量,並根據它們的天然序(升序)進行排序。假若有些詞的統計值相同,這些詞就根據它們的天然順序(字母序)排序。

  這個例子有一個明顯的問題,即有些詞可能會有大寫字母;例如,「the」和「The」,當它出如今句首的時候就是「The」。所以咱們能夠插入一個新的操做來強制全部單詞都轉換爲小寫形式,可是咱們意識到那些須要從文檔中解析詞語的全部未來的應用都必須作一樣的操做,所以咱們決定建立一個可重用的管道SubAssembly,如同咱們在傳統應用程序中建立一個子程序同樣(參見例16-3)。

  例16-3. 建立一個SubAssembly

  public class ParseWordsAssembly extends SubAssembly ?

  {

  public ParseWordsAssembly(Pipe previous)

  {

  String regexString = "(?

  Function regex = new RegexGenerator(new Fields("word"), regexString);

  previous = new Each(previous, new Fields("line"), regex);

  String exprString = "word.toLowerCase()";

  Function expression =

  new ExpressionFunction(new Fields("word"), exprString,String.class); ?

  previous = new Each(previous, new Fields("word"), expression);

  setTails(previous); ?

  }

  }

  ? 聲明SubAssembly是子類,它自己是一種管道類型。

  ? 建立一個Java的表達式函數,它將調用toLowerCase()方法來處理「word」字段對應的字符串類型值。咱們要傳入表達式函數指望的「word」字段的Java類型,這裏是String類型。後臺用Janino(http://www.janino.net/)來編譯。

  ? 咱們必須告知SubAssembly的父類這個管道子組件在哪裏結束。

  首先,咱們新建一個SubAssembly類,它管理咱們的「解析詞」管道組件。由於這是一個Java類,因此可用於其餘任何應用程序,固然這要求它們處理的數據中有word字段(例16-4)。注意,也有辦法可使這個函數更加通用,這些方法在「Cascading用戶手冊」中都有介紹。

  例16-4. 用一個SubAssembly擴展單詞計數和排序

  Scheme sourceScheme = new TextLine(new Fields("line"));

  Tap source = new Hfs(sourceScheme, inputPath);

  Scheme sinkScheme = new TextLine(new Fields("word", "count"));

  Tap sink = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);

  Pipe assembly = new Pipe("wordcount");

  assembly =

  new ParseWordsAssembly(assembly); ?

  assembly = new GroupBy(assembly, new Fields("word"));

  Aggregator count = new Count(new Fields("count"));

  assembly = new Every(assembly, count);

  assembly = new GroupBy(assembly, new Fields("count"), new Fields("word"));

  FlowConnector flowConnector = new FlowConnector();

  Flow flow = flowConnector.connect("word-count", source, sink, assembly);

  flow.complete();

  ? 咱們用ParseWordsAssembly管道組件替換了以前例子中的Each類型管道。最後,咱們只是用新的SubAssembly類型子管道替代了前面Every類型管道和單詞解析函數。有必要的話,還能夠繼續進行更深刻的嵌套處理。

  靈活性

  後退一步,讓咱們來看看這個新的模型給咱們帶來了什麼好處,或更妙的是,消除了哪些不足。

  能夠看出,咱們沒必要再用MapReduce做業模式來考慮問題,或考慮Mapper和Reducer接口的實現問題,後續的MapReduce做業和前面的MapReduce做業如何綁定或連接。在運行的時候,Cascading「規劃器」(planner)會算出最優的方法把管道處理流程切分紅MapReduce做業,並管理做業之間的連接(圖16-18)。

  圖16-18. 怎麼把Flow翻譯成鏈式MapReduce做業

  所以,開發者能夠以任何粒度來構造本身的應用程序。它們能夠一開始就只是一個很小的作日誌文件過濾處理的應用程序,可是後來能夠根據須要不斷增添新的功能。

  Cascading是一個API而不是相似SQL的字符串句法,所以它更靈活。首先,開發者能用他們熟悉的語言建立特定領域語言(domain-specific language,DSL),像Groovy,JRuby,Jython,Scala等(示例參見項目網站)。其次,開發者能對Cascading不一樣的部分進行擴展,像容許自定義Thrift或JSON對象使其能讀寫,而且容許它們以元組數據流的形式傳送。

  Hadoop和Cascading在ShareThis的應用

  ShareThis是一個方便用戶共享在線內容的共享網絡。經過單擊網頁上或瀏覽器插件上的一個按鈕,ShareThis容許用戶無縫地訪問他們的任何在線聯繫人及在線網絡,而且容許他們經過電子郵件,IM,Facebook,Digg,手機SMS等方式共享它們的內容,而這一過程的執行甚至不要求他們離開當前的訪問網頁。發佈者能配置他們的ShareThis按鈕來標記服務的全球共享能力,如此推進網絡流量,刺激傳播活動,追蹤在線內容的共享。經過減小網頁不須要的內容及提供經過社會網絡、隸屬組和社區實時的內容發佈功能,ShareThis還簡化了社區媒體服務。

  ShareThis用戶經過在線窗口共享網頁和信息時,一個連續的事件數據流就進入ShareThis網絡。這些事件首先要過濾和處理,而後傳送給各類後臺系統,包括 AsterData,Hypertable和Katta。

  這些事件的數據量能達到很大數量級,數據量太大以至於傳統的系統沒法處理。這種數據的「污染」(dirty)也很嚴重,主要歸咎於流氓軟件系統的「注入式攻擊」、網頁缺陷或錯誤窗口。所以,ShareThis選擇爲後臺系統部署Hadoop做爲預處理和處理協調管理(orchestration)前臺。他們也選擇使用Amazon Web服務(基於彈性雲計算平臺EC2)來託管其服務器,而且使用Amazon S3(簡單服務存儲服務)提供長期的存儲功能,目的是利用其彈性的MapReduce模式(Elastic MapReduce,EMR)。

  這裏着重介紹「日誌處理管道」(圖16-19)。日誌處理管道只是簡單地從S3文件夾(bucket)裏讀取數據,進行處理(稍後介紹),而後把結果存入另外一個文件夾。簡單消息隊列服務(Simple Queue Service,SQS)用於協調各類事件的處理,用它來標記數據處理執行程序的開始和完成狀態。下行數據流是一些其餘的處理程序,它們用於拖動數據裝載AsterData數據倉庫,如從Hypertable系統獲取URL列表做爲網絡爬取工具的下載源,或把下載的網頁推入Katta系統來建立Lucene索引。注意,Hadoop系統是ShareThis整個架構的中心組件。它用於協調架構組件之間的數據處理和數據移動工做。

  有了Hadoop系統做爲前端處理系統,在全部事件日誌文件被加載到AsterData集羣或被其餘組件使用以前,它會基於一系列規則基於一系列規則對數據進行解析、過濾、清理和組織。AsterData是一個集羣化數據倉庫系統,它能支持大數據存儲,並容許使用標準的SQL語法發出複雜的即時查詢請求。ShareThis選擇Hadoop集羣來進行數據清理和準備工做,而後它把數據加載到AsterData集羣實現即時分析和報告處理。儘管使用AsterData也有可能達到咱們的目的,可是在處理流程的第一階段使用Hadoop系統來抵消主數據倉庫的負載具備重要意義。

  爲了簡化開發過程,制定不一樣架構組件間的數據協調規則以及爲這些組件提供面向開發者的接口,Cascading被選爲主要的數據處理API。這顯示出它和「傳統的」Hadoop用例的差異,它們主要是用「Hadoop」來實現對存儲數據的查詢處理。

  圖16-19. ShareThis日誌處理管道

  相反的,Cascading和Hadoop的結合使用爲端到端的完整解決方案提供了一個更好、更簡單的結構,所以對用戶來講更有價值。

  對於開發者來講,Cascading的學習過程很簡單,它從一個簡單的文本解析單元測試(經過建立cascading.ClusterTestCase類的子類)開始,而後把這個單元程序放入有更多規則要求的處理層,而且在整個過程當中,與系統維護相關的應用邏輯組織不變。Cascading用如下幾種方法幫助保持這種邏輯組織的不變性。首先,獨立的操做(Function,Filter等)均可以進行獨立寫和測試。其次,應用程序被分紅不一樣的處理階段:一個階段是解析,一個階段是根據規則要求進行處理,最後一個階段是封裝/整理數據,全部這些處理都是經過前述的SubAssembly基礎類實現的。

  ShareThis的日誌文件數據看起來很是像Apache日誌文件,它們有日期/時間戳、共享URL、引用頁URL和一些元數據。爲了讓分析下行數據流使用這些數據,這些URL必須先解壓(解析查詢字符串數據和域名等)。所以須要建立一個高層的SubAssembly對象來封裝解析工做,而且,若是字段解析很複雜,SubAssembly子對象就可被嵌入來解析一些特定字段。

  咱們使用一樣的方式來應用處理規則。當每一個Tuple對象經過SubAssembly對象實例的時候,若是有任何規則被觸發,該對象就會被標記上標籤「壞」(bad)。具備「壞」字標籤的Tuple對象,會被附上被標記的緣由用於後來的審查工做。

  最後,建立一個切分SubAssembly對象來作兩件事。第一,用於對元組數據流進行分流處理,一個數據流針對標記「好」(good)的數據,另外一個針對標記「壞」的數據。第二件是,切分器把數據切分紅片,如以小時爲單位。爲了實現這一動做,只須要兩個操做:第一個是根據已有數據流的timestamp(時間戳)建立區間段;第二個是使用interval(區間)和good/bad元數據來建立目錄路徑(例如,「05/good/」中「05」是早上5點,「good」是通過全部規則驗證的數據)。這個路徑而後被Cascading TemplateTap使用,這是一個特殊的Tap類型,它能夠根據Tuple對象值把元組數據流動態輸出到不一樣的路徑位置。

  本例中,「path」值被TemplateTap用來建立最終輸出路徑。

  開發者也建立了第四個SubAssembly類型對象——它用於在單元測試時應用Cascading Assertion(斷言)類。這些斷言用來複查規則組件和解析SubAssembly作的工做。

  在例16-5的單元測試中,咱們看到partitioner沒有被檢測,可是它被放入另一個這裏沒有展現的集成測試中了。

  例16-5. Flow單元測試

  public void testLogParsing() throws IOException

  {

  Hfs source = new Hfs(new TextLine(new Fields("line")), sampleData);

  Hfs sink =

  new Hfs(new TextLine(), outputPath + "/parser", SinkMode.REPLACE);

  Pipe pipe = new Pipe("parser");

  // split "line" on tabs

  pipe = new Each(pipe, new Fields("line"), new RegexSplitter("\t"));

  pipe = new LogParser(pipe);

  pipe = new LogRules(pipe);

  // testing only assertions

  pipe = new ParserAssertions(pipe);

  Flow flow = new FlowConnector().connect(source, sink, pipe);

  flow.complete(); // run the test flow

  // verify there are 98 tuples, 2 fields, and matches the regex pattern

  // for TextLine schemes the tuples are { "offset", "line }

  validateLength(flow, 98, 2, Pattern.compile("^[0-9]+(\\t[^\\t]*){19}$"));

  }

  針對集成和部署,許多Cascading內置屬性均可以使該系統和外部系統更容易集成,並進行更大規模的處理工做。

  在生產環境中運行時,全部的SubAssembly對象都鏈接起來並規劃到一個Flow對象裏,可是除了有源和目標Tap對象以外,咱們也設計了trap(捕捉)Tap類型(圖16-20)。一般,當遠程的Mapper或Reducer任務的操做拋出一個異常的時候,Flow對象就會失敗並殺死它管理的全部MapReduce做業。當一個Flow有trap的時候,全部的異常都會被捕捉而且形成異常的數據信息會被保存到當前這個捕捉程序對應的Tap對象裏。而後能夠在不終止當前Flow的狀況下,繼續處理下一個Tuple對象。有時你想讓程序在出現錯誤的時候就中止,但在這裏,ShareThis開發者知道在生產系統運行的時候,他們能同時回覽並查看「失敗」的數據,而後更新其單元測試。丟失幾個小時的處理時間比丟失幾個壞記錄數據更糟糕。

  使用Cascading的事件監聽器,Amazon SQS可被集成進來。當一個Flow結束的時候,系統就發送一條消息來通知其餘系統它們已經能夠從 Amazon S3上獲取準備好的數據了。當Flow處理失敗的時候,會有不一樣的消息發送,向其餘的進程報警。

  其他的位於不一樣的獨立集羣的下行數據流進程將在中斷的日誌處理管道位置處開始處理。如今日誌處理管道一天運行一次,所以沒有必要讓100個節點的集羣閒着運轉23個小時。所以咱們是每24小時執行一次終止和啓用操做。

  未來,在小型的集羣上根據業務需求,增長運行間歇期到每6個小時一次或1小時一次都是很是簡單的。其餘的集羣系統能夠獨立地根據各自負責的業務須要以不一樣的間隔期啓用或關閉。例如,網絡數據爬取組件(使用Bixo,它是EMI和ShareThis開發的基於Cascading的網絡數據爬取工具)能夠在一個小型集羣上與Hypertable集羣協做連續運轉。這種隨需應變的模型在Hadoop上運行良好,每一個集羣都能把工做負載調節到它指望處理的數量級。

  圖16-20. ShareThis日誌處理Flow

  總結

  對於處理和協調跨不一樣架構組件的數據的移動這個問題,Hadoop是一個很是強大的平臺。它惟一的缺點是它的主要計算模型是MapReduce。

  Cascading的目標是(不用MapRedue模式來考慮設計方案的狀況下)幫助開發者經過使用一個邏輯定義良好的API來快速而簡單地創建強大的應用程序,而同時又把提升數據分佈、複製、分佈式處理管理的性能和程序活性的工做都留給了Hadoop。

相關文章
相關標籤/搜索