Apache Crunch:用於簡化MapReduce編程的Java庫

Apache Crunch(孵化器項目)是基於Google的FlumeJava庫編寫的Java庫,用於建立MapReduce流水線。與其餘用來建立MapReduce做業的高層工具(如Apache Hive、Apache Pig和Cascading等)相似,Crunch提供了用於實現如鏈接數據、執行聚合和排序記錄等常見任務的模式庫。而與其餘工具不一樣的是,Crunch並不強制全部輸入遵循同一數據類型。相反,Crunch使用了一種定製的類型系統,很是靈活,可以直接處理複雜數據類型,如時間序列、HDF5文件、Apache HBase表和序列化對象(像protocol buffer或Avro記錄)等。 html

Crunch並不想阻止開發者以MapReduce方式思考,而是嘗試使之簡化。儘管MapReduce有諸多優勢,但對不少問題而言,並不是正確的抽象級別:大部分有意思的計算都是由多個MapReduce做業組成的,狀況每每是這樣——出於性能考慮,咱們須要將邏輯上獨立的操做(如數據過濾、數據投影和數據變換)組合爲一個物理上的MapReduce做業。 git

本質上,Crunch設計爲MapReduce之上的一個薄層,但願在不犧牲MapReduce力量(或者說不影響開發者使用MapReduce API)的前提下,更容易在正確的抽象級別解決手頭問題。 github

儘管Crunch會讓人想起歷史悠久的Cascading API,可是它們各自的數據模型有很大不一樣:按照常識簡單總結一下,能夠認爲把問題看作數據流的人會偏心Crunch和Pig,而考慮SQL風格鏈接的人會偏心Cascading和Hive。 apache

Crunch的理念 設計模式

PCollection和PTable<K, V>是Crunch的核心抽象,前者表明一個分佈式、不可變的對象集合,後者是Pcollection的一個子接口,其中包含了處理鍵值對的額外方法。這兩個核心類支持以下四個基本操做: app

  1. parallelDo:將用戶定義函數應用於給定PCollection,返回一個新的PCollection做爲結果。
  2. groupByKey:將一個PTable中的元素按照鍵值排序並分組(等同於MapReduce做業中的shuffle階段)
  3. combineValues:執行一個關聯操做來聚合來自groupByKey操做的值。
  4. union:將兩個或多個Pcollection看作一個虛擬的PCollection。

Crunch的全部高階操做(joins、cogroups和set operations等)都是經過這些基本原語實現的。Crunch的做業計劃器(job planner)接收流水線開發者定義的操做圖,將操做分解爲一系列相關的MapReduce做業,而後在Hadoop集羣上執行。Crunch也支持內存執行引擎,可用於本地數據上流水線的測試與調試。 框架

有些問題能夠從可以操做定製數據類型的大量用戶定義函數受益,而Crunch就是爲這種問題設計的。Crunch中的用戶定義函數設計爲輕量級的,爲知足應用程序的須要,仍然提供了完整的訪問底層MapReduce API的功能。Crunch開發者也可使用Crunch原語來定義API,爲客戶提供涉及一系列複雜MapReduce做業的高級ETL、機器學習和科學計算功能。 機器學習

Crunch起步 maven

能夠從Crunch的網站下載最新版本的源代碼或二進制文件,或者使用在Maven Central發佈的dependencies分佈式

源代碼中有不少示例應用。下面是Crunch中WordCount應用的源代碼:

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.type.writable.Writables;

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    // Define a function that splits each line in a PCollection of Strings into a
    // PCollection made up of the individual words in the file.
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
	for (String word : line.split("\\s+")) {
	  emitter.emit(word);
	}
      }
    }, Writables.strings()); // Indicates the serialization format

    // The count method applies a series of Crunch primitives and returns
    // a map of the top 20 unique words in the input PCollection to their counts.
    // We then read the results of the MapReduce jobs that performed the
    // computations into the client and write them to stdout.
     for (Pair<String, Long> wordCount : words.count().top(20).materialize()) {
      System.out.println(wordCount);
     }
   }
}

Crunch優化方案

Crunch優化器的目標是儘量減小運行的MapReduce做業數。大多數MapReduce做業都是 IO密集型的,所以訪問數據的次數越少越好。公平地說,每種優化器(Hive、Pig、Cascading和Crunch)的工做方式本質上是相同的。但與其餘框架不一樣的是,Crunch把優化器原語暴露給了客戶開發人員,對於像構造ETL流水線或構建並評估一組隨機森林模型這樣的任務而言,構造可複用的高階操做更容易。

結論

Crunch目前仍處於Apache的孵化器階段,咱們很是歡迎社區貢獻(參見項目主頁)讓這個庫更好。特別的是,咱們正在尋求更高效的MapReduce編譯思想(包括基於成本考慮的優化)、新的MapReduce設計模式,還但願支持更多的數據源和目標,如HCatalog、Solr和ElasticSearch等。還有不少把Crunch帶向如ScalaClojure等其餘JVM語言的項目,也有不少使用Crunch以R語言來建立MapReduce流水線的工具。

相關文章
相關標籤/搜索