從wordcount 開始 mapreduce (C++\hadoop streaming模式)

序:終於開始接觸hadoop了,從wordcount開始ios

1. 採用hadoop streamming模式web

 優勢:支持C++ pathon shell 等多種語言,學習成本較低,不須要了解hadoop內部結構shell

 調試方便:cat input | ./map | sort | ./reduce > outputbash

 hadoop 就是提供了一個分佈式平臺實現了上述腳本的功能,這是一次mapreduce的過程app

 一個例子:分佈式

 1 #!/bin/bash
 2 source build.env
 3 $hadoop_bin fs -rmr $env_root    
 4 $hadoop_bin fs -mkdir $env_root 
 5 $hadoop_bin fs -copyFromLocal ./txt  $env_root/txt
 6 $hadoop_bin streaming \
 7         -jobconf mapred.job.name="word count fuck you man~!" \
 8         -input  $env_root/txt \       //map程序的輸入:cat input | ./map
 9         -output $env_root/outputxt \  //reduce程序的輸出 : ./reduce > output
10         -mapper "./wordcount_map"\
11         -reducer "./wordcount_reducer"\
12         -file    ./wordcount_map\
13         -file    ./wordcount_reducer\
14         -jobconf mapred.job.map.capacity=1000 \
15         -jobconf mapred.job.reduce.capacity=1000 \
16         -jobconf mapred.child.ulimit=20000000 \
17         -jobconf mapred.job.queue.name=ns-webgis \
18         -jobconf mapred.job.priority=HIGH \
19         -jobconf mapred.map.tasks.speculative.execution=false \
20         -jobconf mapred.map.tasks=10 \
21         -jobconf mapred.reduce.tasks=2 
22 if [ $? -ne 0 ]
23 then
24     echo "error~~~~" >&2
25     exit -1
26 fi
27 $hadoop_bin fs -get $env_root/outputxt .

2. map :cat input | ./map >> tempide

   1)hadoop平臺作了什麼:oop

   a.切分文件:把input文件按照必定的策略切割分紅若干個小文件學習

   b.將若干個小文件分別分發到不一樣節點上ui

   c. 每一個節點上都有一個map程序,而後將任務分發到不一樣的節點上

   2)本身要寫的wordcount_map要作什麼:

   wordcount_map從input中按行進行讀取,而後按照業務邏輯將讀取到的內容拼成 key \t value的形式  ,這個輸出將會做爲reduce程序的輸入

   在這裏輸出的是 word 1  此處 word是key  1是value

   注意:此處是標準輸出、輸入 std::cout std::cin  in C++

   key與value之間用\t分割,第一個\t以前的值做爲key,以後的都做爲value 注意:這個key會被hadoop平臺用到 平臺不關注value值

 1 #include<iostream>
 2 #include<string>
 3 #include<vector>
 4 using namespace std;
 5 void split(string src,vector<string>& dest,string separator)
 6 {
 7    string str = src;
 8    string substring;
 9    string::size_type start = 0, index;
10 
11    do
12    {
13        index = str.find_first_of(separator,start);
14        if (index != string::npos)
15        {    
16          substring = str.substr(start,index-start);
17          dest.push_back(substring);
18          start = str.find_first_not_of(separator,index);
19          if (start == string::npos) return;                                                                         }    
20    }while(index != string::npos);
21    substring = str.substr(start);
22    dest.push_back(substring);
23 }
24 void map()
25 {
26     string line;
27     vector<string> vec(2);
28     while(cin>>line)
29     {
30         vec.clear();
31         split(line,vec," ");
32         vector<string>::iterator it=vec.begin();
33         for(;it!=vec.end();++it)
34         {
35             cout<<*it<<"\t"<<"1"<<"\t"<<"fuck"<<endl;
36         }
37     }
38 }
39 int main()
40 {
41     map();
42 }
wordcount_map

   3. reduce: sort  |  ./reduce > output

   等到全部的map任務都結束:

   1)hadoop平臺作了這些事情

        a.將全部的map程序的輸出結果中key相同的key value pair 放到相同的節點上,這點很重要,這是保證最終輸出結果正確的保證,後面會按照key進行hash , 而且相同                 key之間不會有其餘的key,實際上是按照key值作了一個排序

          注意:相同的key必定在一個節點上,可是一個節點上不止有一個個key

        b 而後在各個節點上開始reduce任務

   2)本身寫的wordcount_map作了什麼

        a. 讀取這些具備相同key的鍵值對,處理本身的業務邏輯,此處就是將統計相同的key值的鍵值對一共出現了幾回,而後將結果輸出,此處也是標準輸入和標準輸出

        

#include<vector>
#include<map>
#include<string>
#include<iostream>
using namespace std;
void reduce()
{
    string key;
    string value;
    string value1;
    //vector<string> vec(2);
    map<string,int> mapTemp;
    while(cin>>key>>value>>value1)
    {        
        if(mapTemp.find(key)!=mapTemp.end())
            mapTemp[key]+=1;
        else
            mapTemp[key]=1;
    }
    map<string,int>::iterator it = mapTemp.begin();
    for(;it!=mapTemp.end();++it)
    {
        cout<<it->first<<"\t"<<it->second<<endl;
    }
}
int main()
{
    reduce();
}
wordcount_reduce
相關文章
相關標籤/搜索