Hadoop streaming相似於Unix管道數據流,從標準輸入(STDIN)輸入,輸出到標準輸出(STDOUT),數據必須是基於文本的,文本的每一行被認爲是一條記錄。這也是不少Unix命令,例如:awk的工做方式。在Hadoop streaming中整個數據流就像是一個管道(Pipe),依次通過mapper,mapper sorter,reducer,Unix僞代碼表示以下: java
cat [input_file] | [mapper] | sort | [reducer] > [output] 正則表達式
在streaming中使用Unix命令 shell
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar 數組
-input input/cite75_99.txt -output output -mapper 'cut -f 2 -d ,' -reducer 'uniq' app
streaming也可使用腳本,只要該腳本從STDIN輸入,輸出到STDOUT
dom
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar 函數
-input input/cite75_99.txt -output output -mapper ‘RandomSample.py 10’ oop
-file RandomSample.py -D mapred.reduce.tasks=1 spa
3. mapper的輸出默認以Tab鍵分割key/value,若沒有Tab,則整條記錄爲key,value爲空。Hadoop shuffling將依據key將key/value發送給不一樣的reducer。但mapper和reducer的工做依然是從STDIN處理一行一行的文本。 命令行
hadoop streaming提供參數-inputformat、-outputformat、-partitioner、-combiner、-numReduceTasks指定相應的hadoop job參數。其中-mapper、-reducer、-combiner既能夠指定爲類名,也能夠指定爲unix可執行程序。-file參數指定一個文件,將文件打包,和job一塊兒被髮送到各個執行節點。-files參數能夠指定多個文件,文件名之間採用逗號分隔。-D參數能夠指定鍵值對型的參數,例如:-D mapred.reduce.tasks=0,-D stream.map.output.field.separator=.,-D stream.num.map.output.key.fields=4,分別指定reduce數量爲0,map輸出字符串以.分割成字段,並設置前4個字段以前的字符串爲map輸出的key(即第四個.以前的字符串),第四個.以後的字符串爲value。
4. Hadoop Stream中常結合awk命令使用。
awk適用於處理字段型數據行,Pattern中能夠調用~和!~支持正則表達式匹配。
awk能夠從shell命令獲取輸入數據,例如命令「ls」| getline。
awk能夠在print和printf語句後使用>和>>將輸出重定向到文件,例如:print $1>"myfiles"。
awk中能夠直接調用shell命令,只需將shell命令置入system函數中,例如:system("ls > myfiles"),可是使用system函數時,awk沒法直接給shell命令傳遞參數,shell命令也沒法將輸出直接傳遞給awk。
awk的getline函數也可使用<運算符,從文件中逐行讀取記錄。
能夠將awk程序寫進一個腳本中,而後調用命令awk -f my.awk input.file執行,也能夠直接在shell命令行中執行awk命令,或者將awk命令寫進myawk.sh文件執行。
awk命令以單引號開始,以單引號結束,awk命令中的字符串一概用雙引號。
字段分隔符FS,是awk的內建變量,其默認是空白和\t,能夠在BEGIN pattern中指定:FS="[\t,:]+",表示使用\t、,和:分割字段。
awk提供close(filename)函數關閉打開的文件,用於確保輸出到文件的順序。
awk採用內建變量RS,默認爲\n分割多行數據。
awk提供函數split(原字符串,字符串數組,分隔字符),按照分隔字符將原字符串分割,保存在字符串數組中。
awk提供內建變量ARGC,表示出了-f、-c等參數以外,命令行參數的個數,經過ARGV,讀取命令行參數。
awk中指定ARGV[1] = "-",表示由鍵盤輸入數據。
awk可使用function關鍵字定義函數,函數中定義的變量在函數外也是可見的,所以在定義遞歸函數時,應特別注意。
awk正則表達式,置入//之間。
awk提供next指令,跳過當前數據行,處理下一行,提供exit指令,退出awk程序。
awk提供內建的字符串函數和數學函數,index(原字符串,找尋的字符串)返回找尋的字符串在原字符串中第一次出現的index,length(字符串)計算字符串的長度,match(原字符串,正則表達式)函數返回匹配結果,sprintf函數將格式化輸出到字符串,sub(正則表達式,將替換的新字符串,原字符串)將首次匹配正則表達式的部分替代,substr(原字符串,起始位置,長度)返回子字符串;數學函數包括int取整,sqrt求根號,exp,log,sin,cos,rand,srand。
5. hadoop streaming中使用awk注意事項
下面是一個awk命令實例:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar \
-libjars parquet_test_1.0.jar \
-input /user/algo/lujianfeng/parquet-mz-input-test08 \
-output /user/algo/lujianfeng/parquet-mz/countfield10 \
-mapper "awk -F'[=\^]' '{for(i=1;i<=NF;i++){printf(\"%s\t1\n\",\$i);}}'" \
-reducer "awk -F'[\t]' 'BEGIN{key=\"\";num=0;}{if(key!=\$1 && length(key)>0) {printf(\"%s\t%d\n\",key,num);num=0;} key=\$1;num++;}'"
注意:1. awk命令中的$、「字符必須轉義
2. hadoop streaming中的reducer不一樣於寫java MR程序時的reduce函數,全部發送給一個Reducer的Map輸出結果,hadoop streaming中都由同一個awk腳本執行,不會按照key進行分組
3. hadoop streaming中,reducer處理的數據就是map的輸出結果,即一行文本