序:終於開始接觸hadoop了,從wordcount開始ios
1. 採用hadoop streamming模式web
優勢:支持C++ pathon shell 等多種語言,學習成本較低,不須要了解hadoop內部結構shell
調試方便:cat input | ./map | sort | ./reduce > outputbash
hadoop 就是提供了一個分佈式平臺實現了上述腳本的功能,這是一次mapreduce的過程app
一個例子:分佈式
1 #!/bin/bash 2 source build.env 3 $hadoop_bin fs -rmr $env_root 4 $hadoop_bin fs -mkdir $env_root 5 $hadoop_bin fs -copyFromLocal ./txt $env_root/txt 6 $hadoop_bin streaming \ 7 -jobconf mapred.job.name="word count fuck you man~!" \ 8 -input $env_root/txt \ //map程序的輸入:cat input | ./map 9 -output $env_root/outputxt \ //reduce程序的輸出 : ./reduce > output 10 -mapper "./wordcount_map"\ 11 -reducer "./wordcount_reducer"\ 12 -file ./wordcount_map\ 13 -file ./wordcount_reducer\ 14 -jobconf mapred.job.map.capacity=1000 \ 15 -jobconf mapred.job.reduce.capacity=1000 \ 16 -jobconf mapred.child.ulimit=20000000 \ 17 -jobconf mapred.job.queue.name=ns-webgis \ 18 -jobconf mapred.job.priority=HIGH \ 19 -jobconf mapred.map.tasks.speculative.execution=false \ 20 -jobconf mapred.map.tasks=10 \ 21 -jobconf mapred.reduce.tasks=2 22 if [ $? -ne 0 ] 23 then 24 echo "error~~~~" >&2 25 exit -1 26 fi 27 $hadoop_bin fs -get $env_root/outputxt .
2. map :cat input | ./map >> tempide
1)hadoop平臺作了什麼:oop
a.切分文件:把input文件按照必定的策略切割分紅若干個小文件學習
b.將若干個小文件分別分發到不一樣節點上ui
c. 每一個節點上都有一個map程序,而後將任務分發到不一樣的節點上
2)本身要寫的wordcount_map要作什麼:
wordcount_map從input中按行進行讀取,而後按照業務邏輯將讀取到的內容拼成 key \t value的形式 ,這個輸出將會做爲reduce程序的輸入
在這裏輸出的是 word 1 此處 word是key 1是value
注意:此處是標準輸出、輸入 std::cout std::cin in C++
key與value之間用\t分割,第一個\t以前的值做爲key,以後的都做爲value 注意:這個key會被hadoop平臺用到 平臺不關注value值
1 #include<iostream> 2 #include<string> 3 #include<vector> 4 using namespace std; 5 void split(string src,vector<string>& dest,string separator) 6 { 7 string str = src; 8 string substring; 9 string::size_type start = 0, index; 10 11 do 12 { 13 index = str.find_first_of(separator,start); 14 if (index != string::npos) 15 { 16 substring = str.substr(start,index-start); 17 dest.push_back(substring); 18 start = str.find_first_not_of(separator,index); 19 if (start == string::npos) return; } 20 }while(index != string::npos); 21 substring = str.substr(start); 22 dest.push_back(substring); 23 } 24 void map() 25 { 26 string line; 27 vector<string> vec(2); 28 while(cin>>line) 29 { 30 vec.clear(); 31 split(line,vec," "); 32 vector<string>::iterator it=vec.begin(); 33 for(;it!=vec.end();++it) 34 { 35 cout<<*it<<"\t"<<"1"<<"\t"<<"fuck"<<endl; 36 } 37 } 38 } 39 int main() 40 { 41 map(); 42 }
3. reduce: sort | ./reduce > output
等到全部的map任務都結束:
1)hadoop平臺作了這些事情
a.將全部的map程序的輸出結果中key相同的key value pair 放到相同的節點上,這點很重要,這是保證最終輸出結果正確的保證,後面會按照key進行hash , 而且相同 key之間不會有其餘的key,實際上是按照key值作了一個排序
注意:相同的key必定在一個節點上,可是一個節點上不止有一個個key
b 而後在各個節點上開始reduce任務
2)本身寫的wordcount_map作了什麼
a. 讀取這些具備相同key的鍵值對,處理本身的業務邏輯,此處就是將統計相同的key值的鍵值對一共出現了幾回,而後將結果輸出,此處也是標準輸入和標準輸出
#include<vector> #include<map> #include<string> #include<iostream> using namespace std; void reduce() { string key; string value; string value1; //vector<string> vec(2); map<string,int> mapTemp; while(cin>>key>>value>>value1) { if(mapTemp.find(key)!=mapTemp.end()) mapTemp[key]+=1; else mapTemp[key]=1; } map<string,int>::iterator it = mapTemp.begin(); for(;it!=mapTemp.end();++it) { cout<<it->first<<"\t"<<it->second<<endl; } } int main() { reduce(); }