如何用Spark來實現已有的MapReduce程序

假定咱們須要計算大文本中每一行的長度,而且報告每一個長度的行數。在HadoopMapReduce中,咱們首先使用一個Mapper,生成爲以行的長度做爲key,1做爲value的鍵值對。

public class  LineLengthMapper extends
    Mapper<LongWritable, Text,  IntWritable, IntWritable> {
  @Override
  protected void map(LongWritable lineNumber,  Text line, Context context)
      throws IOException,  InterruptedException {
    context.write(new  IntWritable(line.getLength()), new IntWritable(1));
  }
}

值得注意的是Mappers和Reducers只對鍵值對進行操做。因此由TextInputFormat提供輸入給LineLengthMapper,實際上也是以文本中位置爲key(不多這麼用,可是老是須要有東西做爲Key),文本行爲值的鍵值對。

與之對應的Spark實現:

lines.map(line => (line.length, 1))

Spark中,輸入只是String構成的RDD,而不是鍵值對。Spark中對鍵值對的表示是一個Scala的元組,用(A,B)這樣的語法來建立。上面的map操做的結果是(Int,Int)元組的RDD。當一個RDD包含不少元組,它得到了多個方法,如reduceByKey,這對再現MapReduce行爲將是相當重要的。

Reduce
reduce()與reduceBykey()
統計行的長度的鍵值對,須要在Reducer中對每種長度做爲key,計算其行數的總和做爲value。

public class  LineLengthReducer extends
    Reducer<IntWritable, IntWritable,  IntWritable, IntWritable> {
  @Override
  protected void reduce(IntWritable length,  Iterable<IntWritable> counts,
      Context context) throws IOException,  InterruptedException {
    int sum = 0;
    for (IntWritable count : counts) {
      sum += count.get();
    }
    context.write(length, new  IntWritable(sum));
  }
}

Spark中與上述Mapper,Reducer對應的實現只要一行代碼:

val lengthCounts = lines.map(line =>  (line.length, 1)).reduceByKey(_ + _)

Spark的RDD API有個reduce方法,可是它會將全部鍵值對reduce爲單個value。這並非Hadoop MapReduce的行爲,Spark中與之對應的是ReduceByKey。

另外,Reducer的Reduce方法接收多值流,併產生0,1或多個結果。而reduceByKey,它接受的是一個將兩個值轉化爲一個值的函數,在這裏,就是把兩個數字映射到它們的和的簡單加法函數。此關聯函數能夠被調用者用來reduce多個值到一個值。與Reducer方法相比,他是一個根據Key來Reduce Value的更簡單而更精確的API。

Mapper
map() 與 flatMap()
如今,考慮一個統計以大寫字母開頭的單詞的個數的算法。對於每行輸入文本,Mapper可能產生0個,1個或多個鍵值對。

public class  CountUppercaseMapper extends
    Mapper<LongWritable, Text, Text,  IntWritable> {
  @Override
  protected void map(LongWritable lineNumber,  Text line, Context context)
      throws IOException,  InterruptedException {
    for (String word :  line.toString().split(" ")) {
      if  (Character.isUpperCase(word.charAt(0))) {
        context.write(new Text(word), new  IntWritable(1));
      }
    }
  }
}

Spark對應的寫法:

lines.flatMap(
_.split("  ").filter(word => Character.isUpperCase(word(0))).map(word =>  (word,1))
)

簡單的Spark map函數不適用於這種場景,由於map對於每一個輸入只能產生單個輸出,但這個例子中一行須要產生多個輸出。因此,和MapperAPI支持的相比,Spark的map函數語義更簡單,應用範圍更窄。

Spark的解決方案是首先將每行映射爲一組輸出值,這組值可能爲空值或多值。隨後會經過flatMap函數被扁平化。數組中的詞會被過濾並被轉化爲函數中的元組。這個例子中,真正模仿Mapper行爲的是flatMap,而不是map。

groupByKey()
寫一個統計次數的reducer是簡單的,在Spark中,reduceByKey能夠被用來統計每一個單詞的總數。好比出於某種緣由要求輸出文件中每一個單詞都要顯示爲大寫字母和其數量,在MapReduce中,實現以下:

public class  CountUppercaseReducer extends
    Reducer<Text, IntWritable, Text,  IntWritable> {
  @Override
  protected void reduce(Text word,  Iterable<IntWritable> counts, Context context)
      throws IOException,  InterruptedException {
    int sum = 0;
    for (IntWritable count : counts) {
      sum += count.get();
    }
    context
        .write(new  Text(word.toString().toUpperCase()), new IntWritable(sum));
  }
}

可是redeceByKey不能單獨在Spark中工做,由於他保留了原來的key。爲了在Spark中模擬,咱們須要一些更像Reducer API的操做。咱們知道Reducer的reduce方法接受一個key和一組值,而後完成一組轉換。groupByKey和一個連續的map操做可以達到這樣的目標:

groupByKey().map {  case (word,ones) => (word.toUpperCase, ones.sum) }

groupByKey只是將某一個key的全部值收集在一塊兒,而且不提供reduce功能。以此爲基礎,任何轉換均可以做用在key和一系列值上。此處,將key轉變爲大寫字母,將values直接求和。
相關文章
相關標籤/搜索