第2章:MapReduce

MapReduce是一個數據處理的編程模型。這個模型很簡單,但也不是簡單到不可以支持一些有用的語言。Hadoop可以運行以多種語言寫成的MapReduce程序。在這一章中,咱們將看看怎樣用Java,Ruby,Python語言來寫同一個例子。更重要的是,MapReduce程序天生併發運行,這就至關於把可以進行大數據分析的工具交到了某個擁有足夠多機器的人手裏。java

氣候數據集

在咱們的例子中,將會寫一個程序來挖掘天氣數據。天氣傳感器每個小時都會在全球的許多地方收集數據,而且也收集了大量的日誌數據。這些數據很是適合於用MapReduce分析。由於咱們想要處理全部數據,而且這些數據是半結構化的和麪向記錄的。python

數據格式

咱們所使用的數據來自於國家氣候數據中心或稱爲NCDC。數據以行形式ASCII格式存儲,每一行一條記錄。這種格式支持豐富的氣象屬性集合,其中許多屬性是可選的,長度可變的。簡便起見,咱們僅僅關注基本的屬性,如溫度。溫度老是有值而且長度固定。
示例2-1顯示了一行記錄,而且將主要的屬性進行了註釋。這一行記錄被分紅了多行,每一個屬性一行。真實文件中,這些屬性都會被放進一行,而且沒有分隔符。數據庫

示例:2-1
0057
332130 # USAF 天氣基站標識
99999 # WBAN 天氣基站標識
19500101 # 觀察日期
0300 # 觀察時間
4
+51317 # 緯度 (角度 x 1000)
+028783 # 經度 (角度 x 1000)
FM-12
+0171 # 海拔 (米)
99999
V020
320 # 風向 (角度)
1 # 質量碼
N
0072
1
00450 # 天空最高高度 (米)
1 # 質量碼
C
N
010000 # 可見距離 (米)
1 # 質量碼
N
9
-0128 # 空氣溫度 (攝氏度 x 10)
1 # 質量碼
-0139 # 露點溫度 (攝氏度 x 10)
1 # 質量碼
10268 # 大氣壓 (百帕 x 10)
1 # 質量碼

數據文件按照日期和天氣基站整理。從1901到2001,每年都有一個目錄文件。每個目錄文件中包括每個天氣基站收集到的當年氣候數據的壓縮文件。例如1990年部分文件:apache

% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz

因爲有成千上萬個天氣基站,因此每年都由大量的相關小文件組成。一般處理少許的大文件更容易和有效。因此這些數據須要被預處理,使每年的全部記錄都被放到一個文件中(附錄C中有詳細的方法說明)。編程

使用Unix工具分析

如何獲取每年的全球最高溫度呢?咱們首先不使用Hadoop工具來回答這個問題。
這將會爲咱們提供一個性能基準線和檢查咱們日後的結果是否準確的方法。
經典的處理行結構數據的工具是awk。示例2-2向咱們展現瞭如何獲取每年全球最高溫度。ruby

示例2-2
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done

這個腳本循環處理已經壓縮的年文件,首先輸出年度值,而後使用awk處理每個文件。awk腳本從這些數據中提取出空氣溫度和質量碼。空氣溫度經過加0轉換成整數,下一步,判斷溫度(溫度9999在NCDC中表示沒檢測到溫度)和質量碼是否有效。質量碼錶示此溫度值是否準確或者錯誤。若是溫度值沒有問題,則與目前爲止最高溫度相比較,若是比目前最高溫度高,則更新最高溫度。當文件中全部行被處理以後,END塊被執行,打印出最高溫度。下面看看部分運行結果:
```
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
````
源文件中的溫度值被擴大了10倍,因此1901年的最高溫度是31.7攝氏度,因爲在20世紀初讀取到的氣候值很是有限,因此這個結果只能是近似真實。在硬件是單個超大型高CPU EC2實例計算中跑完整個世紀的數據花了42分鐘。bash

爲了提升處理速度,咱們須要並行運行部分程序。理論上,咱們很容易想到可使用計算機中全部可用的線程並行處理不一樣的年份數據。可是這樣仍然存在一些問題。網絡

首先,將整個處理工做進程等分爲相同的部分並不簡單或明顯。在這個例子中,不一樣的年份的文件大小不同,而且有的差異很大。全部一些處理進程將會完成地早一些,一些將會晚一些。即時完成早的進程再處理其它工做,整個運行時間仍然被最大的文件限制。一個更好的途徑是將輸入數據分紅大小相等的塊,而且處理每個數據塊。雖然這樣可能形成更多的工做量。併發

第二,將每個獨立的處理結果合併在一塊兒須要額外處理工做。在這個例子中,每年的處理結果都是相互獨立的。這些結果會被鏈接在一塊兒,而且按年排序。若是經過數據量大小數據塊途徑,合併將更加容易出錯。就這個例子而言,某一年的數據可能被分紅多個數據塊,每個數據塊都單獨處理,並獲得每一塊的最高溫度。最後,咱們還須要找到某年中這些塊中最高溫度中的最高溫度做爲這一年的最高溫度。app

第三,你仍然會被單個計算機的處理能力限制。若是用單個計算機中全部的處理器,最快的處理時間是20分鐘,那麼,你不可能更快。並且有的數據集超過單個計算機的處理能力。當使用多臺計算機一塊兒處理時,一些其它的因素又會影響性性能,主要有協調性和可靠性兩類。誰來執行全部的做業?咱們將怎麼處理失敗的進程?

因此,雖然並行處理是可行的,但倒是不那麼容易控制的,是複雜的。使用像Hadoop這樣的框架來處理這些問題極大地幫助了咱們。

使用Hadoop分析數據

爲了充分利用Hadoop提供的並行處理優點,咱們須要將咱們的查詢寫在一個MapReduce做業中。在本地的,小數據量地測試後,咱們將可以在集羣中運行它。

Map和Reduce

MapReduce將處理過程分紅兩階段,map階段和reduce階段。每階段將key-value鍵值對作爲輸入和輸出。開發者能夠選擇輸入輸出參數類型,也能指定兩個函數:map函數和reduce函數。

map階段的輸入數據是原始的NCDC數據。咱們選擇文本格式。文本中的每一行表示一條文本記錄。key值是行開頭距離當前文件開頭的位移,可是咱們不須要它,忽略便可。

map函數很簡單。由於咱們僅關心年份和溫度,因此獲取每行的年度和溫度便可,其它屬性不須要。這個例子中,僅僅是一個數據準備階段,以某種方法準備reduce函數可以處理的數據。map函數仍是一個丟棄壞記錄的地方,例如那些沒有測量到的,不許備的或錯誤的溫度。

爲了展示map怎麼樣工做的,選取少許的輸入數據進行說明(爲了適應頁面寬度,一些沒有使用到的列用省略號表示)
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
這些行以key-value的形式提供給map函數:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
關鍵值是行的位移,在map函數中咱們能夠忽略它。map函數僅僅須要獲取到年度和溫度值(以粗體表示的數據),而後輸出。輸出的時候將溫度值轉換成整數。
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)

map的輸出結果在被送往reduce函數以前被MapReduce框架按照關鍵字排序合併處理。因此在進行下一步以前,reduce函數會接收到以下數據:
(1949, [111, 78])
(1950, [0, 22, −11])
如上所示,每年的全部溫度值都合併到一個列表中。reduce函數所要作的就是遍歷每年的溫度,而後找到最高溫度。
(1949, 111)
(1950, 22)
以上就是最終的輸出:每年的最高溫度。
整個數據流程如圖2-1所示。在圖表底部是對應的Unix命令。它模擬整個MapReduce流程,咱們將會在這章節的後面Hadoop Streaming中看到。圖2-1 MapReduce邏輯數據流程圖

JAVA MapReduce

在知道了MapReduce程序怎麼樣工做了以後,下一步是用代碼實現它。咱們須要作三件事情:map函數,reduce函數,運行做業的代碼。map功能以Mapper抽象類表示
,它申明瞭一個map()抽象方法。示例2-3顯示了map函數的實現。

示例2-3
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
      extends Mapper<LongWritable, Text, Text, IntWritable> {
      private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
          String line = value.toString();
          String year = line.substring(15, 19);
          int airTemperature;
          if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
            airTemperature = Integer.parseInt(line.substring(88, 92));
          } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
          }
          String quality = line.substring(92, 93);
          if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
         }
     }
}

Mapper類是一個泛型,有四個形參,分別表示輸入key,輸入值,輸出key,和map函數輸出值類型。就當前的例子來講,輸入key是一個長整型的位移,輸入值是一行文本,輸出key是年份,輸出會是是空氣溫度(整數)。Hadoop使用它本身的基本類型集而不使用JAVA內建的基本類型。由於Hadoop本身的基本類型對網絡序列化進行了優化。這些基本類型能夠在 org.apache.hadoop.io pack‐
age中找到。這裏咱們使用 LongWritable類型,它表示長文本類型,對應了Java的String類型,又使用了 IntWritable類型,對應於Java的Integer類型。

map函數被傳了一個key值和一個value值,咱們把包含輸入的一行文本轉換成Java String類型數據,並使用String的SubString方法取到咱們感興趣的列值。

map函數也提供了一個Context實例,以便將輸出結果寫入其中。在咱們的這個例子中,咱們把年份做爲文本類型Key值寫到Context中,把溫度封閉成IntWritable類型也寫入Context.而且只有溫度有效而且質量碼顯示當前溫度的獲取是正常的時候才寫入。

reduce功能相似地用Reduce抽象類表示,實例類見示例2-4

示例2-4
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
      int maxValue = Integer.MIN_VALUE;
      for (IntWritable value : values) {
        maxValue = Math.max(maxValue, value.get());
      }
      context.write(key, new IntWritable(maxValue));
    }
}

Reduce抽象類也是一個泛型類,也具備四個形參。reduce函數的輸入類型必須匹配map的輸出類型,即Text和IntWritable.此例子中,reduce函數的輸出是Text和IntWritable類型,分別表示年份與當前年份最高溫度。經過遍歷溫度值,將當前溫度值與最高溫度比較來找到當前年份的最高溫度。

第三部分是運行MapReduce做業的代碼,見示例2-5.

示例2-5
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
   public class MaxTemperature {
      public static void main(String[] args) throws Exception {
      if (args.length != 2) {
        System.err.println("Usage: MaxTemperature <input path> <output path>");
        System.exit(-1);
      }
      Job job = new Job();
      job.setJarByClass(MaxTemperature.class);
      job.setJobName("Max temperature");
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      job.setMapperClass(MaxTemperatureMapper.class);
      job.setReducerClass(MaxTemperatureReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Job對象指明運行一個做業所須要的全部設置以及讓你控制做業如何執行。當咱們在一個Hadoop集羣上運行這個做業的時候,咱們須要將代碼打包成JAR文件,Hadoop會把JAR文件在集羣中分發。咱們能夠經過setJarByClass方法指定類文件,而不須要顯示指明JAR文件的名字。Hadoop會搜索包含setJarByClass指定的類的相關JAR文件。

建立了一個實例Job後,指定輸入和輸出文件路徑。經過調用 FileInputFormat 的靜態方法addInputPath()指定輸入路徑,此路徑能夠是一個文件,也能夠是一個目錄。若是是一個目錄,輸入的數據包含此目錄下全部文件。還能夠是文件類型。就像方法名所表示的那樣,addInputPath()能夠被調用屢次以便添加多個輸入路徑。

輸出路徑經過FileOutputFormat 的靜態方法setOutputPath()指定。輸出路徑僅能夠指定一次。它指定了一個目錄。reduce會把它的輸出結果的文件放到這個目錄下。這個目錄在運行Hadoop以前不該該存在。由於若是存在Hadoop將會報錯並不會執行做業。這是爲了預防數據丟失。由於若是不當心覆蓋了同一目錄下其它做業的輸出結果將是很是使人懊惱的。

下一步使用 setMapperClass() 和setReducerClass()方法指定map和reduce類。setOutputKeyClass()和 setOutputValueClass()方法控制reduce函數輸出參數的類型。必須和Reduce抽象類中參數的一致。map輸出參數的類型默認是相同的類型。因此若是map和reduce函數有相同的輸出參數類型時就不須要特別指定了。就像咱們這個例子這樣。然而,若是它們不相同,就須要經過 setMapOutputKeyClass() 和setMapOutputValueClass()函數來指定map的輸出參數類型。

map函數的輸入參數類型經過輸入格式指定。咱們沒有顯示地設置,由於咱們使用了默認的TextInputFormat格式。

在指定了自定義的map和reduce函數以後,就能夠準備執行做業了。Job類的waitForCompletion()方法用於提交做業,並用等待做業完成。這個方法須要一個參數,用以表示是否將做業日誌詳細信息輸出到控制檯。若是爲true,就輸出。這個方法的返回值是一個布爾類型,用於表示做業的執行成功與否。成功返回true,失敗返回false。這裏咱們將成功與否轉換成了0或1。

這部分使用的Java MapReduce API以及這本書所使用的全部API被稱爲"New API"。  
它代替了功能相同的老的API。這兩種API區別請查看附錄D,而且附錄D有如何在這兩種API轉換的相關建議。固然你也能在這兒用舊的API完成相同功能的獲取每一年最高溫度的應用。

測試運行

在完成MapReduce做業編寫以後,正常狀況下使用少許數據集測試運行,方便當即檢測出代碼問題。首先以脫機模式安裝Hadoop(附錄A中有說明),這個模式下Hadoop使用本地文件生成本地做業運行。能夠在這本書的網站上找到安裝和編譯這個示例的說明。
讓咱們使用上面五行數據運行這個做業,輸出結果稍微調整了一下以便適應頁面,而且有一些行被刪除了。

% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop MaxTemperature input/ncdc/sample.txt output
14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your application
with ToolRunner to remedy this.
14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job:
job_local26392882_0001
14/09/16 09:48:40 INFO mapreduce.Job: The url to track the job:
http://localhost:8080/
14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_0001
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter is
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner:
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task attempt_local26392882_0001_r_000000_0
28  |  Chapter 2: MapReduce
is allowed to commit now
14/09/16 09:48:40 INFO output.FileOutputCommitter: Saved output of task
'attempt...local26392882_0001_r_000000_0' to file:/Users/tom/book-workspace/
hadoop-book/output/_temporary/0/task_local26392882_0001_r_000000
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce > reduce
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_r_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce task executor complete.
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 running in uber
mode : false
14/09/16 09:48:41 INFO mapreduce.Job: map 100% reduce 100%
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 completed
successfully
14/09/16 09:48:41 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=377168
FILE: Number of bytes written=828464
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=61
Input split bytes=129
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=61
Reduce input records=5
Reduce output records=2
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=39
Total committed heap usage (bytes)=226754560
File Input Format Counters
Bytes Read=529
File Output Format Counters
Bytes Written=29

當咱們在hadoop命令第一個參數填寫一個類名的時候,會啓動一個JVM(JAVA虛擬機),並執行這個類。hadoop命令添加hadoop庫和庫所依賴的其它庫文件到Classpath變量,而且加載hadoop配置。爲了將應用中的類文件添加到classpath中,咱們定義了一個 HADOOP_CLASSPATH環境變量,來加載咱們所寫的hadoop腳本。

當以本地(脫機)模式運行時,這本書中全部程序都假設你已經以這種方法設置了 HADOOP_CLASSPATH環境變量。這條命令應該在示例代碼所在目錄運行。

做業運行日誌提供了一些有用的信息。例如,咱們能看到這個做業被給了一個做業ID:job_local26392882_0001.運行了一個map任務和一個reduce任務(ID分別是:attempt_local26392882_0001_m_000000_0 和attempt_local26392882_0001_r_000000_0)。知道做業和任務ID在調用MapReduce做業時將頗有用。

最後還有一部分名爲"Counters"的數據,這部分數據是Hadoop爲每個做業生成的統計信息。這些信息將對於檢查處理的數據與預期的數據是否同樣很是有用。例如,咱們能知道經過系統各部分的記錄數,5條map輸入記錄,5條map輸出記錄(能夠看出map對於每一條有效的輸入記錄都有對應的一條輸出記錄)。還能看出以key值分紅2組的5條reduce輸入記錄,以及2條輸出記錄。

輸出結果寫入輸出目錄。每個reduce函數生成一個輸出文件。這個做業只有一個reduce函數,因此只產生一個文件。名稱是part-r-00000:
% cat output/part-r-00000
1949 111
1950 22

這個結果跟以前手工計算的一致。這個結果表示1949年最高溫度是11.1攝氏度,1950是2.2度。

擴展

你已經知道了MapReduce怎麼樣處理少許數據。如今是時候全局看系統,而且對於大數據處理的數據流。簡單來講,到目前爲止,咱們所舉的例子都用的本地計算機的文件。更進一步,咱們將要在分佈計算機(特別是HDFS,咱們將在下一節中學到)中存儲文件數據。使用Hadoop的資源管理系統YARN(第4節),Hadoop會將MapReduce計算過程分發到各個計算機中計算,而這些計算機每一臺都保存着一部分數據。讓咱們來看看這些是如何發生的。

工做流

首先,MapReduce做業是客戶端須要去執行的工做單元。它包括輸入數據,MapReduce程序以及一些配置信息。Hadoop會把這個做業分紅多個任務步驟執行。有兩種類型:map任務和reduce任務。這些任務經過YARN計劃調度並在分佈式系統節點上運行。若是一個任務失敗了,YARN會把它放到另一個節點上從新運行。

Hadoop會把輸入數據化分紅大小相同的數據片段(被稱爲輸入片或均片),Hadoop會爲每個片建立一個map任務。map任務會一條條記錄地循環執行用戶自定義的map函數,直到這個片段中全部記錄處理完畢。

不少片段意味着處理每個片段的時間比一次處理整個輸入數據的時間少。因此當咱們併發地處理這些片段,而這些片段很小時,可以更好地負載均衡。因此一個性能好的機器比一個性能差些的機器可以相應在處理更多地片段。即便這些機器性能徹底同樣,失敗的處理進程或者同時運行的做業使負載均衡成爲可能(Even if the machines are identical, failed processes or other jobs running
concurrently make load balancing desirable)。而且當片段細粒度越高,負載均衡的質量也會越高。

別外一方面,若是片段過於小,管理片段和建立Map任務所花費的時候則會成爲整個做業執行時間的瓶頸。對於大多數做業來講,一個好的片段大小趨向於一個HDFS塊的大小,默認是128M。這個大小能夠被集羣(Cluster)改變(集羣的改成會影響在機羣中新建立的全部文件),或者文件新建時就指定。

Hadoop儘可能會在輸入數據存放的HDFS那個節點運行Map任務,由於這樣不會佔用寶貴的集羣帶寬資源。這被稱爲本地優化。而後,有時候擁有HDFS數據的節點上正運行着其它Map任務,做業調試器會嘗試着在當前集羣其它空閒的節點上建立一個Map任務。極少狀況下,會到其它集羣中的某個節點中建立一個Map任務,這樣就須要集羣間網絡傳輸。這三種可能性在圖表2-2中展現:圖2-2

如今清楚了爲何最優的片段大小是設置成HDFS塊大小。由於這樣作是數據能被存儲在一個節點上的最大數據量。若是一個片段跨兩個塊大小,任何一個HDFS節點都不太可能儲存兩個塊大小的數據量,這個勢必會形成片段的部分數據經過網絡傳輸到正在運行Map任務的節點上。這明顯的比直接在本地運行Map任務的性能差一些。

Map任務會將它的輸出結果寫入本地硬盤中,而不是HDFS,爲何要這樣作?由於Map的輸出只是中間的輸出,後續它將會被Reduce任務處理產生最終輸出結果。一旦做業完成了,Map的輸出結果能夠被丟棄,因此將Map的輸出結果複製到HDFS中沒必要要的。若是在Reduce利用Map的輸入結果前,節點運行失敗了。Hadoop將在自動的在另一個節點中從新執行這個Map任務,從新產生輸入結果。

Reduce任務沒有像Map任務那樣利用數據本地化的優點,一個Reduce任務的輸入每每來自全部Map任務的輸出。就拿目前的例子來講,咱們有一個Reduce任務,其輸入數據來自全部的Map任務。所以存儲的Map結果必須經過網絡傳輸到運行Reduce的節點上。以後這些傳過來的數據會被合併,並傳到用戶自定義的reduce函數中執行。Reduce的輸出結果正常都會存儲在HDFS中。就像第三節說明的,對於存儲Reduce輸出結果的每個HDFS塊,第一份複製的數據會存儲在本地,其它複製的數據會存儲在其它集羣可靠的HDFS塊中。所以存儲Reduce的輸出結果肯定須要消耗網絡帶寬,但也僅僅和一個正常的HDFS輸出通道消耗的同樣多。

擁有一個Reduce任務的數據流在圖表2-3中展現。虛線框表示節點,虛線箭頭表示節點內的數據傳輸。實線的箭頭表示節點間的數據傳輸。圖2-3 單個Reduce任務的MapReduce數據流

Reduce任務的個數不是由輸入數據量的大小決定,而是單獨指定的。在"默認的MapReduce做業"那一節,你將會看到對於給定的做業,如何選擇Reduce任務的個數。

當有多個reduce時,map任務會將它們的結果分區,每個map任務會爲每個reduce任務建立一個分區。每個分區裏能夠用不少個key和ke關聯的值,但某一個key的全部記錄必須在同一個分區裏。分區這個過程可以被用戶自定義的函數控制,但通常來說,默認的分區函數已經可以工做地很好了。它使用哈希函數來將key分類。

多個reduce的通常數據流程圖在圖表2-4顯示。這張圖表清楚地顯示了map和reduce之間的數據流爲何被通俗地叫作"洗牌"。"洗牌"的過程比這個圖表顯示的更復雜。你將會在"洗牌和排序"這一節中看到,調整它能夠對做業的運行時間有很大影響。圖2-4 多個reduce任務的MapReduce數據流

最後,也能夠有零個reduce任務。這種狀況發生在僅併發執行map任務就可以輸出結果的時候。此時數據的傳輸僅發生在map的輸出結果寫入HDFS的時候(如圖2-5)。
圖2-5 零reduce任務的MapReduce數據流

組合函數(Combiner Function)

許多MapReduce做業執行時間被集羣的帶寬資源限制。因此值得咱們去儘可能減小map與reduce之間傳輸的數據量。Hadoop容許用戶指定一個組合函數,以便在map輸出結果後執行。這個組合函數的輸出造成了reduce任務的輸入。因爲組合函數是優化函數,因此Hadoop不能確保爲每個map輸出記錄調用多少次組合函數。也就是說,零次,一次或屢次調用組合函數,reduce最終都應該輸出相同的結果。

組合函數的這種特性限制了它能被使用的業務情形。用一個例子能更好說明。假設最大的溫度,例如1950的,被兩個map任務處理,由於1950年數據分佈在不一樣的片段中。假如第一個map任務輸出以下結果:
(1950,0)
(1950,20)
(1950,10)
第二個map輸出以下結果:
(1950,25)
(1950,15)
Hadoop將會用以上全部值組成列表傳給reduce
(1950,[0,20,10,25,15])
輸出:
(1950,25)
既然25是當前列表最大的值。咱們就像使用reduce函數同樣用一個組合函數找出每個map結果中的最大溫度值。這樣的話,reduce獲得如下值:
(1950,[20,25])
而且產生與以前相同的結果。咱們能夠用一種更簡潔的方式表示上面的過程:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
然而,並非全部這樣的處理都是合適的,例如,咱們要計算平均溫度,就不能在組合函數中計算平均溫度,由於:mean(0, 20, 10, 25, 15) = 14,可是mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15。

組合函數不能代替Reduce函數(Reduce函數仍然須要用來處理來自不一樣map的含有相同key值的記錄),可是它能幫助減小在map與reduce之間傳遞的數據量。所以,在你的MapReduce做業中,老是值得咱們考慮是否使用組合函數。

指定組合函數

回到以前JAVA MapReduce程序,組合函數使用Reduce類定義,在這個應用中,它與Reduce功能同樣。咱們惟一要作的就是在做業中設定組合類(示例2-6)。

示例2-6
      public class MaxTemperatureWithCombiner {
       public static void main(String[] args) throws Exception {
          if (args.length != 2) {
             System.err.println("Usage: MaxTemperatureWithCombiner   
             <input path> " +"<output path>");
            System.exit(-1);
           }
       Job job = new Job();
       job.setJarByClass(MaxTemperatureWithCombiner.class);
       job.setJobName("Max temperature");
       FileInputFormat.addInputPath(job, new Path(args[0]));
       FileOutputFormat.setOutputPath(job, new Path(args[1]));
       job.setMapperClass(MaxTemperatureMapper.class);
       job.setCombinerClass(MaxTemperatureReducer.class);
       job.setReducerClass(MaxTemperatureReducer.class);
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);
       System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }

運行一個分佈式的MapReduce做業

相同的程序將在全量數據庫執行。MapReduce特性是無形中擴大了能處理的數據量大小和硬件體積,運行在10個節點的EC2集羣上,這個程序跑了6分鐘。在第6節中咱們將會看看在集羣中運行程序具體的一些技術特性。

Hadoop Streaming

Hadoop給MapReduce提供了API容許你用除了JAVA語言以外的其它語言寫map和reduce函數。Hadoop流使用Unix系統標準流做業Hadoop和你的程序之間的接口,因此你能使用任意其它的可以讀取Unix系統標準流輸入數據並可以將數據寫到標準輸出的語言來寫MapReduce程序。

流天生地就適用於文本處理。Map的輸入數據經過標準的輸入流輸入到你自定義的map函數中。在map函數中,你將會一行一行的處理數據,而後將這些數據寫入到輸出流中。map會用Tab分隔key和value,並將它們作爲鍵值對單獨一行輸出。這些數據將會以相同的格式作爲reduce函數的輸入。在輸入之間,框架將會把它們按照鍵值排序,而後reduce會處理這些行,而後將結果輸出到標準的輸出流。

讓咱們以流的方式重寫查找每年最高溫度的MapReduce程序來講明。

Ruby

map函數以Ruby語言編寫,見示例2-7

示例2-7
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

在示例2-7代碼塊中,Ruby從標準全局IO常量類型STDIN中讀取輸入數據,而後遍歷每一行數據,找到行中相關的字段,若是有效,則輸出到標準的輸出流。

有必要看一下Streaming與Java MapReduce API之間的區別。 Java API會一條條記錄地調用map函數,而後若是使用Streaming形式,map函數能夠本身決定怎麼樣處理輸入數據,能夠多行一塊兒處理也能夠單行處理。JAVA map實現的函數是被推數據,可是它仍然能夠考慮經過將多條記錄放到一個實例變量中來實現一次處理多行的操做。這種狀況下,你須要實現cleanup()方法,以便知道最後一條記錄處理完的時候,可以結束處理。

因爲示例2-7基於標準的輸入輸出操做,能夠不經過Hadoop測試,直接經過Unix命令。

% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078

reduce函數稍微有點複雜,如示例2-8

#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
 if last_key && last_key != key
   puts "#{last_key}\t#{max_val}"
   last_key, max_val = key, val.to_i 
 else
   last_key, max_val = key, [max_val, val.to_i].max
 end
end
puts "#{last_key}\t#{max_val}" if last_key

同map函數同樣,reduce函數也會從標準輸入中遍歷行,但不同的是,當處理每個key組的時候,須要存儲某個狀態。在這個示例中,關鍵字是年,咱們存儲最後一次遍歷的key,並保存每個key組中最大的溫度。MapReduce框架會確保輸入數據會按照關鍵值排序,因此咱們知道若是當前key值不一樣於上一次遍歷的key值時,咱們就進入了新的key組。當使用JAVA API時,reduce函數輸入的數據就已經按照key值分好了組,而不像Streaming同樣須要人爲地去判斷key組邊界。

對於每一行,咱們取得key和value值,而後看看是否到達了一組的最後( last_key && last_key != key), 若是到達了,咱們記錄下這組的Key和最高溫度,以Tab製表符分隔,而後初始化最高溫度,若是沒有到達組的最後,則更新當前Key值的最高溫度。最後一行做用是確保最後一個Key組的最高溫度可以被記錄。

咱們如今可以用Unix命令來模擬整個的MapReduce傳輸通道(等效於圖2-1中所示的Unix通道)。

% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \
sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
1949 111
1950 22

輸出結果與Java程序的同樣。下一步使用Hadoop來運行。
Hadoop命令不支持流選項,不過,你能夠在jar選項中指定Streaming JAR文件,而後指定輸入和輸出文件路徑,以及map和redeuce腳本文件,看起來以下:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

當在一個集羣中基於大數據執行時,咱們須要使用-combiner選項來指定組合函數。

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-input input/ncdc/all \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

注意咱們使用了-files選項,當咱們在集羣上運行流程序時,須要將map和reduce腳本文件複製到集羣中。

Python

流程序支持任意可以從標準輸入讀取數據並將數據寫入標準輸出的語言。因此使用讀者更熟悉的Python,再寫一遍以上例子。map腳本如示例2-9,reduce腳本如示例2-10.

示例2-9:map script
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)

示例:2-10 reduce script
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
  if last_key and last_key != key:
     print "%s\t%s" % (last_key, max_val)
     (last_key, max_val) = (key, int(val))
else:
     (last_key, max_val) = (key, max(max_val, int(val)))
 if last_key: 
 print "%s\t%s" % (last_key, max_val)

咱們能像Ruby中同樣以相同的方法來運行這個做業。

% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort | ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949 111
1950 22

本文是筆者翻譯自《OReilly.Hadoop.The.Definitive.Guide.4th.Edition》第一部分第2節,後續將繼續翻譯其它章節。雖盡力翻譯,但奈何水平有限,錯誤再所不免,若是有問題,請不吝指出!但願本文對你有所幫助。
本文轉自個人簡書博客

相關文章
相關標籤/搜索