Hadoop Streaming 使用及參數設置

1. MapReduce 與 HDFS 簡介html

  什麼是 Hadoop ?java

  Google 爲本身的業務須要提出了編程模型 MapReduce 和分佈式文件系統 Google File System,併發布了相關論文(可在 Google Research 的網站上得到:GFS、MapReduce)。Doug Cutting 和 Mike Cafarella 在開發搜索引擎 Nutch 時對這兩篇論文進行了本身的實現,即同名的 MapReduce 和 HDFS,合起來就是 Hadoop。python

  MapReduce 的 Data Flow 以下圖所示,原始數據通過 mapper 處理,再進行 partition 和 sort,到達 reducer,輸出最後結果。web

2. Hadoop Streaming 原理shell

  Hadoop 自己是用 Java 開發的,程序也須要用 Java 編寫,可是經過 Hadoop Streaming,咱們可使用任意語言來編寫程序,讓 Hadoop 運行。apache

  Hadoop Streaming 就是經過將其餘語言編寫的 mapper 和 reducer 經過參數傳給一個事先寫好的 Java 程序(Hadoop 自帶的 *-streaming.jar),這個 Java 程序會負責建立 MR 做業,另開一個進程來運行 mapper,將獲得的輸入經過 stdin 傳給它,再將 mapper 處理後輸出到 stdout 的數據交給 Hadoop,通過 partition 和 sort 以後,再另開進程運行 reducer,一樣經過 stdin/stdout 獲得最終結果。所以,咱們只須要在其餘語言編寫的程序中,經過 stdin 接收數據,再將處理過的數據輸出到 stdout,Hadoop Streaming 就能經過這個 Java 的 wrapper 幫咱們解決中間繁瑣的步驟,運行分佈式程序。編程

  原理上只要是可以處理 stdio 的語言都能用來寫 mapper 和 reducer,也能夠指定 mapper 或 reducer 爲 Linux 下的程序(如 awk、grep、cat)或者按照必定格式寫好的 java class。所以,mapper 和 reducer 也沒必要是同一類的程序。網絡

  1. Hadoop Streaming 的優缺點併發

    優勢:app

      1. 可使用本身喜歡的語言來編寫 MapReduce 程序(沒必要非得使用 Java)

      2. 不須要像寫 Java 的 MR 程序那樣 import 一大堆褲,在代碼裏作不少配置,不少東西都抽象到了 stdio 上,代碼量顯著減小。

      3. 由於沒有庫的依賴,調試方便,而且能夠脫離 Hadoop 先在本地用管道模擬調試。

    缺點:

      1. 只能經過命令行參數來控制 MapReduce 框架,不像 Java 的程序那樣能夠在代碼裏使用 API,控制力比較弱。

      2. 由於中間隔着一層處理,效率會比較慢。

      3. 因此 Hadoop Streaming 比較適合作一些簡單的任務,好比用 Python 寫只有一兩百行的腳本。若是項目比較複雜,或者須要進行比較細緻的優化,使用 Streaming 就容易出現一些束手束腳的地方。

  2. 用 Python 編寫簡單的 Hadoop Streaming 程序

    使用 Python 編寫 Hadoop Streaming 程序有幾點須要注意:

      1. 在能使用 iterator 的狀況下,儘可能使用 iterator,避免將 stdin 的輸入大量儲存在內存裏,不然會嚴重下降性能。

      2. Streaming 不會幫你分割 key 和 value 傳進來,傳進來的只是一個個字符串而已,須要你本身在代碼裏手動調用 split()。

      3. 從 stdin 獲得的每一行數據末尾彷佛會有 '\n' ,保險起見通常都須要用 rstrip() 來去掉。

      4. 在想得到 key-value list 而不是一個個處理 key-value pair 時,可使用 groupby 配合 itemgetter 將 key 相同的 key-value pair 組成一個個 group,獲得相似 Java 編寫的 reduce 能夠直接獲取一個 Text 類型的 key 和一個 iterable 做爲 value 的效果。注意 itemgetter 的效率比 lambda 表達式的效率要高,因此用 itemgetter 比較好。

    編寫 Hadoop Streaming 程序的基本模版:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Some description here...
"""
import sys
from operator import itemgetter
from itertools import groupby

def read_input(file):
"""Read input and split."""
    for line in file:
    yield line.rstrip().split('\t')

def main():
    data = read_input(sys.stdin)
    for key, kviter in groupby(data, itemgetter(0)):
        # some code here..

if __name__ == "__main__":
    main()    

  若是對輸入輸出格式有不一樣於默認的控制,主要會在 read_input() 裏調整。

  3. 本地調試

    本地調試用於 Hadoop Streaming 的 Python 程序的基本模式是:

$ cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>

    這裏有幾點須要注意:

      1. Hadoop 默認按照 tab 來分割 key 和 value,以第一個分割出的部分爲 key,按 key 進行排序,所以這裏使用 sort -t $'\t' -k1,1 來模擬。若是有其餘需求,在交給 Hadoop Streaming 執行時能夠經過命令行參數設置,本地調試也能夠進行相應的調整,主要是調整 sort 的參數。

      2. 若是在 Python 腳本里加上了 shebang,而且爲它們添加了執行權限,也能夠用相似於 ./mapper.py (會根據 shebang 自動調用指定的解釋器來執行文件)來代替 python mapper.py。

  4. 在集羣上運行與監控

    1. 察看文檔

      首先須要知道用於 Streaming 的 Java 程序在哪裏。在 1.0.x 的版本中,應該都在 $HADOOP_HOME/contrib/streaming/ 下:

$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar

      經過執行 Hadoop 命令

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info

就會看到一系列 Streaming 自帶的幫助,帶有各類參數的說明和使用樣例。

  5. 運行命令

    用 Hadoop Streaming 執行 Python 程序的通常步驟是:

    1. 將輸入文件放到 HDFS 上,建議使用 copyFromLocal 而不是 put 命令。參見Difference between hadoop fs -put and hadoop fs -copyFromLocal

      1. 通常能夠新建一個文件夾用於存放輸入文件,假設叫 input

$ hadoop fs -mkdir input

而後用

$ hadoop fs -ls

查看目錄,能夠看到出現了一個 /user/hadoop/input 文件夾。/user/hadoop 是默認的用戶文件夾,至關於本地文件系統中的 /home/hadoop。

      2. 再使用

$ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(s)> input/

將本地文件放到 input 文件夾下。

    2. 開始 MapReduce 做業,假設你如今正在放有 mapper 和 reducer 兩個腳本的目錄下,並且它們恰好就叫 mapper.py 和 reducer.py,在不須要作其餘配置的狀況下,執行

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
-mapper mapper.py \
-file mapper.py \
-reducer reducer.py \
-file reducer.py \
-input input/* \
-output output

    第一行是告訴 Hadoop 運行 Streaming 的 Java 程序,接下來的是參數:

    這裏的 mapper.py 和 reducer.py 是 mapper 所對應 python 程序的路徑。爲了讓 Hadoop 將程序分發給其餘機器,須要再加一個 -file 參數用於指明要分發的程序放在哪裏。

    注意這樣寫的前提是這個 Python 程序裏有 Shebang 並且添加了執行權限。若是沒有的話能夠改爲

-mapper 'python mapper.py'

    加上解釋器命令,用引號擴住(注意在參數中傳入解釋器命令,再也不是用`符擴住,而是'符)。準確來講,mapper 後面跟的騎士應該是一個命令而不是文件名。

    假如你執行的程序不放在當前目錄下,好比說在當前目錄的 src 文件夾下,能夠這樣寫

-mapper 'python mapper.py' -file src/mapper.py \
-reducer 'python reducer.py' -file src/reducer.py \

    也就是說,-mapper 和 -reducer 後面跟的文件名不須要帶上路徑,而 -file 後的參數須要。注意若是你在 mapper 後的命令用了引號,加上路徑名反而會報錯說找不到這個程序。(由於 -file 選項會將對應的本地參數文件上傳至 Hadoop Streaming 的工做路徑下,因此再執行 -mapper 對應的參數命令能直接找到對應的文件。

    -input 和 -output 後面跟的是 HDFS 上的路徑名,這裏的 input/* 指的是"input 文件夾下的全部文件",注意 -output 後面跟着的須要是一個不存在於 HDFS 上的路徑,在產生輸出的時候 Hadoop 會幫你建立這個文件夾,若是已經存在的話就會產生衝突。(所以每次執行 Hadoop Streaming 前能夠經過腳本命令 hadoop fs -rmr 清除輸出路徑)。

    有時候 Shebang 不必定能用,尤爲是在執行環境比較複雜的時候,最保險的作法是:

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
-mapper 'python mapper.py' -file mapper.py \
-reducer 'python reducer.py' -file reducer.py \
-input input/* -output output

    這樣寫還有一個好處,就是能夠在引號裏寫上提供給 python 程序的命令行參數,甚至作目錄的變動以及環境變量的初始化等一系列 shell 命令。

    因爲 mapper 和 reducer 參數跟的其實是命令,因此若是每臺機器上 python 的環境配置不同的話,會用每臺機器本身的配置去執行 python 程序。

  6. 獲得結果

    成功執行完這個任務以後,使用 output 參數在 HDFS 上指定的輸出文件夾裏就會多出幾個文件:一個空白文件 _SUCCESS,表面 job 運行成功,這個文件可讓其餘程序只要查看一下 HDFS 就能判斷此次 job 是否運行成功,從而進行相關處理。

    一個 _logs 文件夾,裝着任務日誌。

    part-00000,.....,part-xxxxx 文件,有多少個 reducer 後面的數字就會有多大,對應每一個 reducer 的輸出結果。

    假如你的輸出不多,好比是一個只有幾行的計數,你能夠用

$ hadoop fs -cat <PATH ON HDFS>

直接將輸出打印到終端查看。

    假如你的輸出不少,則須要拷貝到本地文件系統來查看。可使用 copyToLocal 來獲取整個文件夾。若是你不須要 _SUCCESS 和 _logs,而且想要將全部 reducer 的輸出合併,可使用 getmerge 命令。

$ hadoop fs -getmerge output ./

    上述命令將 output 下的 part-xxxxx 合併,放到當前目錄的一個叫 output 的文件裏。

  7. 如何串聯多趟 MapReduce

    若是有屢次任務要執行,下一步須要用上一步的任務作輸入,解決辦法很簡單。假設上一步在 HDFS 的輸出文件夾是 output1,那麼在下一步的運行命令中,指明

-input output1/part-*

    即指定上一次的全部輸出爲本次任務的輸入便可。

  8. 使用額外的文件

    假如 MapReduce 的 job 除了輸入之外還須要一些額外的文件,有兩種選擇:

    1. 大文件

      所謂的大文件就是大小大於設置的 local.cache.size 的文件,默認是10GB。這個時候能夠用 -file 來分發。除此以外代碼自己也能夠用 file 來分發。

      格式:假如我要加多一個 sideData.txt 給 python 腳本使用:

$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input inputDir \
-output outputDir \
-mapper mapper.py \
-file mapper.py \
-reducer reducer.py \
-file reducer.py \
-file sideData.txt

      這樣 -file 選項的參數文件都會被上傳至 MapReduce 的工做目錄下,因此 mapper 和 reducer 代碼均可以經過文件名直接訪問到文件。在 python 腳本中,只要把這個文件當成本身同一目錄下的本地文件來打開就能夠了。好比:

f = open('sideData.txt')

      注意這個 file 是隻讀的,不能夠寫。

    2. 小文件

      若是是比較小的文件,想要提升讀寫速度能夠將它放在 distributed cache 裏(也就是每臺機器都有本身的一份 copy,不須要網絡 IO 就能夠拿到數據)。這裏要用到的參數是 -cachefile,寫法和用法與上一個同樣,就是將 -file 改爲 -cachefile 而已。

    3. 若是上傳目錄或者多個目錄時使用 -files 選項

      -files dir1,dir2 #多個目錄用','隔開,且不能有空格

      上傳目錄後,能夠直接訪問當前目錄

    4. 上傳 HDFS 上的文件或者目錄

      只能 -files 命令上傳 HDFS 路徑下的文件或目錄,而後就能夠像訪問本地文件同樣訪問 HDFS 文件。

      好比:

hdfs_file="hdfs://webboss-10-166-133-95:9100/user/hive/conf/part-00000"

input=/user/hive/input/*
output=/user/hive/output
mapper_script=mapper.py
reducer_script=reducer.py
map_file=./mapper.py
reduce_file=./reducer.py

hadoop fs -rmr $output
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar \
-D mapred.reduce.tasks=0 \
-files $hdfs_file \
-input $input \
-output $output \
-mapper $mapper_script \
-file $map_file \
-reducer $reducer_script \
-file $reduce_file            

    而後 map 腳本中就能夠直接讀取名爲 part-00000 的文件。詳情參考:http://www.cnblogs.com/zhengrunjian/p/4536572.html

  9. 控制 partitioner

    partitioning 指的是數據通過 mapper 處理後,被分發到 reducer 上的過程。partitioner 控制的,就是「怎樣的 mapper 輸出會被分發到哪個 reducer 上」。

    Hadoop 有幾個自帶的 partitoner,解釋能夠看這裏。默認的是 HashPartitioner,也就是把第一個 '\t' 前的 key 作 hash 以後用於分配 partition。寫 Hadoop Streaming 程序是能夠選擇其餘 partitioner 的,你能夠選擇自帶的其餘幾種裏的一種,也能夠本身寫一個繼承 Partitioner 的 java 類而後編譯成 jar,在運行參數裏指定爲你用的 partitioner。

    官方自帶的 partionner 裏最經常使用的是 KeyFieldBasedPartitioner。它會按照 key 的一部分來作 partition,而不是用整個 key 來作 partition。

    在學會用 KeyFieldBasedPartitioner 以前,必然要先學怎麼控制 key-value 的分割。分割 key 的步驟能夠分紅兩步,用 python 來描述一下大約是

fields = output.split(separator)
key = fields[:numKeyfields]

    1. 選擇用什麼符號來分割 key,也就是選擇 separator

      map.output.key.field.separator 能夠指定用於分割 key 的符號。好比指定爲一點的話,就要加上參數。

-D stream.map.output.field.separator=.

      假設你的 mapper 輸出是

11.22.33.44

      這時會用 '.' 進行分割,看準 [11, 22, 33, 44] 這裏的其中一個或幾個做爲 key。

    2. 選擇 key 的範圍,也就是選擇 numKeyfields

      控制 key 的範圍的參數是這個,假設要設置被分割出的前 2 個元素爲 key:

-D stream.num.map.output.key.fields=2

      那麼 key 就是上面的 1122。值得注意的是假如這個數字設置到覆蓋整個輸出,在這個例子裏是4的話,那麼整一行都會變成 key。

      上面分割出 key 以後,KeyFieldBasedPartitioner 還須要知道你想要用 key 裏的哪部分做爲 partition 的依據。它進行配置的過程能夠看源代碼來理解。

      假設在上一步咱們經過使用

-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 

      將 11.22.33.44 的整個字符串都設置成了 key,下一步就是在這個 key 的內部再進行一次分割。map.output.key.field.separator 能夠用來設置第二次分割用的分割符,mapred.text.key.partitioner.options 能夠接受參數來劃分被分割出來的 partition key,好比:

-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2    

      指的就是在 key 的內部裏,將第1到第2個被點分割的元素做爲 partition key,這個例子裏也就是 1122。這裏的值 -ki,j 表示從 i 到 j 個元素(inclusive)會做爲 partition key。若是終點省略不寫,像 -ki 的話,那麼 i 和 i 以後的元素都會做爲 partition key。

      partition key 相同的輸出會保證分到同一個 reducer 上,也就是全部 11.22.xx.xx 的輸出都會到同一個 partitioner,11.22 換成其餘各類組合也是同樣。

      實例說明一下,就是這樣的:

      1. mapper 的輸出是

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

      2. 指定前 4 個元素作 key,key 裏的前兩個元素作 partition key,分紅 3 個 partition 的話,就會被分紅

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

      3. 下一步 reducer 會對本身獲得的每一個 partition 內進行排序,結果就是

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

      Streaming 命令格式以下:

$ hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=4 \
-D mapred.text.key.partitioner.options=-k1,2 \
-input inputDir \
-output outputDir \
-mapper mapper.py -file mapper.py \
-reducer reducer.py -file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

      注意:

        Hadoop 執行命令時的選項是有順序的,順序是 bin/hadoop command [genericOptions] [commandOptions].

        對於 Streaming,-D 屬於 genericOptions,即 hadoop 的通用選項,因此必須寫在前面。

        Streaming 的全部選項可參考:

          hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -info

    3. 控制 comparator 與自定義排序

      上面說到 mapper 的輸出被 partition 到各個 reducer 以後,會有一步排序。這個排序的標準也是能夠經過設置 comparator 控制的。和上面同樣,要先設置分割出 key 用的分割符、key 的範圍,key 內部分隔用的分割符

-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=.

      這裏要控制的就是 key 內部的哪些元素用來作排序依據,是排字典序仍是數字序,倒敘仍是正序。用來控制的參數是 mapred.text.key.comparator.options,接受的值格式相似於 unix sort。好比我要按第二個元素的數字序(默認字典序)+倒序來排元素的話,就用 -D mapred.text.key.comparator.options=-k2,2nr

      n表示數字序,r表示倒序。這樣一來

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

      就會被排成

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1

    

參考:http://www.uml.org.cn/sjjm/201512111.asp

相關文章
相關標籤/搜索