Hadoop學習筆記(6) ——從新認識Hadoop

Hadoop學習筆記(6) 算法

——從新認識Hadoop 數據庫

以前,咱們把hadoop從下載包部署到編寫了helloworld,看到告終果。現是得開始稍微更深刻地瞭解hadoop了。 編程

Hadoop包含了兩大功能DFS和MapReduce, DFS能夠理解爲一個分佈式文件系統,存儲而已,因此這裏暫時就不深刻研究了,等後面讀了其源碼後,再來深刻分析。 因此這裏主要來研究一下MapReduce。 數組

 

這樣,咱們先來看一下MapReduce的思想來源: 服務器

alert("I'd like some Spaghetti!"); 網絡

alert("I'd like some ChocolateMousse!"); app

看到這代碼,感受不順眼,改之: 負載均衡

function SwedishChef(food) 框架

{ 分佈式

alert("I'd like some " + food + " !");

}

SwedishChef("Spaghetti");

SwedishChef("Chocolate Mousse");

這類用函數取代重複代碼的重構方式好處不少,優勢不少,就很少說了!

 

再看這段:

alert("get the lobster");

PutInPot("lobster");

PutInPot("water");

 

alert("get the chicken");

BoomBoom("chicken");

BoomBoom("coconut");

    看起來這段代碼好像也有點重複的味道在裏面,改之:

function Cook(i1,i2,f)

{

alert("get the " + i1);

f(i1);

f(i2);

}

 

Cook("lobster","water",PutInPot);

Cook("chicken","coconut",BoomBoom);

    OK,這裏面把一個函數當成了一個參數傳來傳去的。並且能夠再省略點,把函數定義直接放入到函數調用者的地方:    

Cook("lobster","water",

Function(x) { alert("pot " + x); } );

Cook("chicken","coconut",

Function(x) { alert("boom" + x); } );

    嘿嘿,方便多了吧,這就是如今比較流程的匿名函數。

    

    一旦你開始意識到能夠將匿名函數做爲參數,你就會發現有更多的應用了,來看:

var a=[1,2,3];

for(i = 0; I < a.length; i++)

{

a[i] = a[i] * 2;

}

for(i = 0; I < a.length; i++)

{

alert(a[i]);

}

遍歷數組元素是一種很覺的操做,那好,既然很經常使用,咱們提取出來寫成一個函數得了:

     function map(fn,a)

{

for(i = 0; i < a.length; i++)

{

a[i] = fn(a[i]);

}

}

 

有了這個map函數後,上面的程序咱們就可來調用咯:

map(function (x) {return x*2; } , a);

map(alert, a);

 

除了遍歷,通常咱們還會有一些經常使用的語句,如:

function sum(a)

{

var s = 0;

for( i = 0; i < a.length ;i ++)

s += a[i];

return s;

}

function join(a)

{

var s = "";

for( i = 0; i < a.length ;i ++)

s += a[i];

return s;

}

alert(sum([1,2,3]);

alert(join(["a","b","c"]);

    彷佛sum和join這兩個函數看起來很象,可否再抽象呢:

function reduce(fn, a, init)

{

var s = init;

for (i= 0;i < a.length; i++)

s = fn(s, a[i]);

return s;

}

 

function sum(a)

{

return reduce( function(a,b){return a+b;}, a, 0);

}

Function join(a)

{

return reduce( function(a,b){return a+b;}, a, "");

}

    

OK,這個函數都提取好了,那麼,一個這麼微小的函數,只能對數組中的元素進行遍歷,到底能給你帶來多大好處呢?

    讓咱們回頭再來看map函數,當你須要對數組中每個元素爲依次進行處理時,那麼實際狀況極可能是,到底按照哪種次序進行遍歷是可有可無的,你能夠從頭到尾,也能夠從尾到頭遍歷,都會獲得相同的結果。事實上,若是你有兩個CPU能夠用,也許你能夠寫一些代碼,使得每一個CPU來處理一半的元素,因而一瞬間這個map函數的運行速度就快了一倍。

    進一步設想,你有幾十萬臺服務器,分佈在世界各地的機房中,而後你有一個超級龐大的數組,好比互聯網的N多個網頁信息,因而你讓這些服務器來運行你的map函數,要快N多吧,每臺機器實際上只處理很小的一部份運算。

    OK,運算提升了很多,有了這個,你就敢去想互聯網的搜索是怎麼來的了。可是機器好像也是有極限的,不可能有這麼多資源讓你無限量的擴展,那咋辦,不要緊,你能夠找一個超級天才,讓他寫出可以在全世界龐大的服務器陣列上分佈式運行的map函數和reduce函數。

簡單的函數提出來了,加上函數指定的涉入,那這個高效的函數,可讓咱們在各個地方去用,而後讓咱們不一樣的程序都能在陣列上運行,甚至能夠不用告訴這位天才你真正在實現啥功能,只要讓他作出這個map函數來。

這是函數式編程裏的理念,如是不知道的函數式編程天然也就想不出mapReduce,正是這種算法使得Google的可擴展性達到如此巨大的規模。其中術語Map(映射)和Reduce(化簡)分別來自Lisp語言和函數式編程。

 

再來看看Hadoop中的mapRecude。

Hadoop就是由天才想出來的,可以在N多龐大服務器羣上運行你的map和reduce函數的框架。有了這個平臺,咱們需要作的,僅僅要咱們實現傳入的map和reduce中的處理函數而已。

MapReudue採用"分而治之"的思想,把對大規模數據集的操做,分發給一個主節點管理下的各分節點共同完成,而後經過整合各分節點中間結果,獲得最終的結果。簡單地說,MapRedue就是"任務的分解與結果的彙總"。上述處理過程被高度的抽象爲兩個函數:mapper和reducer,mapper負責把任務分解成多個任務,reducer負責把分解後多任務處理的結果彙總起來。至於在並行編程中的其餘種種複雜問題,如分式。。。。 工做調度、負載均衡、容錯處理、網絡通信等,均由MapReduce框架負責處理。

 

 

在Mapper階段,框架將任務的輸入數據分割成固定大小的片斷(split),隨後將每一個split進一步分解成一批鍵值對<K1,V1>。Hadoop爲每個split建立一個Map任務用於執行用戶自定義的map函數,並將對應的split中的<K1,V1>對做爲輸入,獲得計算的中間結果<K2,V2>。接着將中間結果按照K2進行排序,並將key值相同的value放在一塊兒造成一個新的列表,開成<K2,list(V2)>元組。最後再根據key值的範圍將這些元組進行分組,對應不一樣的Reduce任務。

在Reducre階段,Reducer把從不一樣Mapper接收來的數據整合在一塊兒並進行排序,而後調用用戶自定義的reduce函數,對輸入的<K2,list(V2)>對進行相應的處理,獲得健值對<K3,V3>並輸出到HD

框架爲每一個split建立一個Mapper,那麼誰來肯定Reducers的數量呢? 是配置文件,在mapred-site.xml中,有配置Reducers的數量,默認是1。通常要設置多少爲宜呢?等後面更深刻了再找答案吧。

 

看看Hello World程序

看了前面的概念有點抽象,因此咱們仍是來看點實際的,看看咱們以前寫的helloword程序。

咱們第一個helloworld程序是WordCount,——單詞計數程序。這個程序的輸入輸出以下:

 

這是一個怎樣的過程呢?咱們一步步分解,WordCount程序通過了如下過程:

  1. 將文件拆分紅splits,因爲測試文件較小,因此一個文件即爲一個split,並將文件按行分割成<key,value>對,這一步由MapReduce框架自動完成,其中key值存的是偏移量。

    爲了好解釋,咱們將輸入數據中各增長了一行 bye world bye hadoop

     

  2. 將分割好的<key,value>交給用戶定義的map方法進行處理,生成新的<key,value>對,這段代碼,正是咱們所編寫過的:

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

 

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

 

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

System.out.println("key=" +key.toString());

System.out.println("Value=" + value.toString());

 

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

這段代碼中,爲了方便調試,每調用咱們的map函數,都會輸入key和value值。因此在執行結果中,咱們能夠看到程序的確有2次輸入,與上面的值一至。StringTokenizer是一個單詞切詞的類,將字符串中的單詞一個個切出來,而後while循環,往context中輸出key-value,分爲爲切出來的單詞,value爲固定值1。

 

  1. 獲得map方法輸入的<key,value>對後,Mapper會將它們按照key值進行排序,並執行Combine過程,將key值相同的value值累加,獲得Mapper的最終輸出結果

  1. Reducer先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,獲得新的<key,value>對,並做爲wordCount的輸出結果。

public static class IntSumReducer

extends Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable result = new IntWritable();

 

public void reduce(Text key, Iterable<IntWritable> values,

Context context

) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}}

這段代碼將收到的key和values值,將values進行累加,而後key不變輸出到key-value裏面,最終獲得結果:

 

好了,這個Hello World就程序整個過程就這樣。寫了map和recude類,如何讓它關聯執行呢? 因此回到示例程序的main函數:

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

System.out.println("url:" + conf.get("fs.default.name"));

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in> <out>");

System.exit(2);

}

Job job = new Job(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

這裏,咱們定義了一個Job類,而後把TokenizerMapper和IntSumReducer類指派給job,而後一切就交給job. waitForCompletion去執行,並等待其完成結果了。

 

OK了, 這個Hello World也總算是讀懂了,不過。。。 仍是有好多疑問,中間那麼多過程,能不有個性化呢?如:輸入能不能從數據庫?輸出能不能到Excel?等等,好了,後面再來分析吧。

 

本文參考:

《軟件隨想錄》Joel Spolsky著

《實踐Hadoop》劉鵬

相關文章
相關標籤/搜索