Hadoop學習筆記(6) 算法
——從新認識Hadoop 數據庫
以前,咱們把hadoop從下載包部署到編寫了helloworld,看到告終果。現是得開始稍微更深刻地瞭解hadoop了。 編程
Hadoop包含了兩大功能DFS和MapReduce, DFS能夠理解爲一個分佈式文件系統,存儲而已,因此這裏暫時就不深刻研究了,等後面讀了其源碼後,再來深刻分析。 因此這裏主要來研究一下MapReduce。 數組
這樣,咱們先來看一下MapReduce的思想來源: 服務器
alert("I'd like some Spaghetti!"); 網絡
alert("I'd like some ChocolateMousse!"); app
看到這代碼,感受不順眼,改之: 負載均衡
function SwedishChef(food) 框架
{ 分佈式
alert("I'd like some " + food + " !");
}
SwedishChef("Spaghetti");
SwedishChef("Chocolate Mousse");
這類用函數取代重複代碼的重構方式好處不少,優勢不少,就很少說了!
再看這段:
alert("get the lobster");
PutInPot("lobster");
PutInPot("water");
alert("get the chicken");
BoomBoom("chicken");
BoomBoom("coconut");
看起來這段代碼好像也有點重複的味道在裏面,改之:
function Cook(i1,i2,f)
{
alert("get the " + i1);
f(i1);
f(i2);
}
Cook("lobster","water",PutInPot);
Cook("chicken","coconut",BoomBoom);
OK,這裏面把一個函數當成了一個參數傳來傳去的。並且能夠再省略點,把函數定義直接放入到函數調用者的地方:
Cook("lobster","water",
Function(x) { alert("pot " + x); } );
Cook("chicken","coconut",
Function(x) { alert("boom" + x); } );
嘿嘿,方便多了吧,這就是如今比較流程的匿名函數。
一旦你開始意識到能夠將匿名函數做爲參數,你就會發現有更多的應用了,來看:
var a=[1,2,3];
for(i = 0; I < a.length; i++)
{
a[i] = a[i] * 2;
}
for(i = 0; I < a.length; i++)
{
alert(a[i]);
}
遍歷數組元素是一種很覺的操做,那好,既然很經常使用,咱們提取出來寫成一個函數得了:
function map(fn,a)
{
for(i = 0; i < a.length; i++)
{
a[i] = fn(a[i]);
}
}
有了這個map函數後,上面的程序咱們就可來調用咯:
map(function (x) {return x*2; } , a);
map(alert, a);
除了遍歷,通常咱們還會有一些經常使用的語句,如:
function sum(a)
{
var s = 0;
for( i = 0; i < a.length ;i ++)
s += a[i];
return s;
}
function join(a)
{
var s = "";
for( i = 0; i < a.length ;i ++)
s += a[i];
return s;
}
alert(sum([1,2,3]);
alert(join(["a","b","c"]);
彷佛sum和join這兩個函數看起來很象,可否再抽象呢:
function reduce(fn, a, init)
{
var s = init;
for (i= 0;i < a.length; i++)
s = fn(s, a[i]);
return s;
}
function sum(a)
{
return reduce( function(a,b){return a+b;}, a, 0);
}
Function join(a)
{
return reduce( function(a,b){return a+b;}, a, "");
}
OK,這個函數都提取好了,那麼,一個這麼微小的函數,只能對數組中的元素進行遍歷,到底能給你帶來多大好處呢?
讓咱們回頭再來看map函數,當你須要對數組中每個元素爲依次進行處理時,那麼實際狀況極可能是,到底按照哪種次序進行遍歷是可有可無的,你能夠從頭到尾,也能夠從尾到頭遍歷,都會獲得相同的結果。事實上,若是你有兩個CPU能夠用,也許你能夠寫一些代碼,使得每一個CPU來處理一半的元素,因而一瞬間這個map函數的運行速度就快了一倍。
進一步設想,你有幾十萬臺服務器,分佈在世界各地的機房中,而後你有一個超級龐大的數組,好比互聯網的N多個網頁信息,因而你讓這些服務器來運行你的map函數,要快N多吧,每臺機器實際上只處理很小的一部份運算。
OK,運算提升了很多,有了這個,你就敢去想互聯網的搜索是怎麼來的了。可是機器好像也是有極限的,不可能有這麼多資源讓你無限量的擴展,那咋辦,不要緊,你能夠找一個超級天才,讓他寫出可以在全世界龐大的服務器陣列上分佈式運行的map函數和reduce函數。
簡單的函數提出來了,加上函數指定的涉入,那這個高效的函數,可讓咱們在各個地方去用,而後讓咱們不一樣的程序都能在陣列上運行,甚至能夠不用告訴這位天才你真正在實現啥功能,只要讓他作出這個map函數來。
這是函數式編程裏的理念,如是不知道的函數式編程天然也就想不出mapReduce,正是這種算法使得Google的可擴展性達到如此巨大的規模。其中術語Map(映射)和Reduce(化簡)分別來自Lisp語言和函數式編程。
再來看看Hadoop中的mapRecude。
Hadoop就是由天才想出來的,可以在N多龐大服務器羣上運行你的map和reduce函數的框架。有了這個平臺,咱們需要作的,僅僅要咱們實現傳入的map和reduce中的處理函數而已。
MapReudue採用"分而治之"的思想,把對大規模數據集的操做,分發給一個主節點管理下的各分節點共同完成,而後經過整合各分節點中間結果,獲得最終的結果。簡單地說,MapRedue就是"任務的分解與結果的彙總"。上述處理過程被高度的抽象爲兩個函數:mapper和reducer,mapper負責把任務分解成多個任務,reducer負責把分解後多任務處理的結果彙總起來。至於在並行編程中的其餘種種複雜問題,如分式。。。。 工做調度、負載均衡、容錯處理、網絡通信等,均由MapReduce框架負責處理。
在Mapper階段,框架將任務的輸入數據分割成固定大小的片斷(split),隨後將每一個split進一步分解成一批鍵值對<K1,V1>。Hadoop爲每個split建立一個Map任務用於執行用戶自定義的map函數,並將對應的split中的<K1,V1>對做爲輸入,獲得計算的中間結果<K2,V2>。接着將中間結果按照K2進行排序,並將key值相同的value放在一塊兒造成一個新的列表,開成<K2,list(V2)>元組。最後再根據key值的範圍將這些元組進行分組,對應不一樣的Reduce任務。
在Reducre階段,Reducer把從不一樣Mapper接收來的數據整合在一塊兒並進行排序,而後調用用戶自定義的reduce函數,對輸入的<K2,list(V2)>對進行相應的處理,獲得健值對<K3,V3>並輸出到HD
框架爲每一個split建立一個Mapper,那麼誰來肯定Reducers的數量呢? 是配置文件,在mapred-site.xml中,有配置Reducers的數量,默認是1。通常要設置多少爲宜呢?等後面更深刻了再找答案吧。
看看Hello World程序
看了前面的概念有點抽象,因此咱們仍是來看點實際的,看看咱們以前寫的helloword程序。
咱們第一個helloworld程序是WordCount,——單詞計數程序。這個程序的輸入輸出以下:
這是一個怎樣的過程呢?咱們一步步分解,WordCount程序通過了如下過程:
爲了好解釋,咱們將輸入數據中各增長了一行 bye world 和bye hadoop
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
System.out.println("key=" +key.toString());
System.out.println("Value=" + value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
這段代碼中,爲了方便調試,每調用咱們的map函數,都會輸入key和value值。因此在執行結果中,咱們能夠看到程序的確有2次輸入,與上面的值一至。StringTokenizer是一個單詞切詞的類,將字符串中的單詞一個個切出來,而後while循環,往context中輸出key-value,分爲爲切出來的單詞,value爲固定值1。
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}}
這段代碼將收到的key和values值,將values進行累加,而後key不變輸出到key-value裏面,最終獲得結果:
好了,這個Hello World就程序整個過程就這樣。寫了map和recude類,如何讓它關聯執行呢? 因此回到示例程序的main函數:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
System.out.println("url:" + conf.get("fs.default.name"));
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
這裏,咱們定義了一個Job類,而後把TokenizerMapper和IntSumReducer類指派給job,而後一切就交給job. waitForCompletion去執行,並等待其完成結果了。
OK了, 這個Hello World也總算是讀懂了,不過。。。 仍是有好多疑問,中間那麼多過程,能不有個性化呢?如:輸入能不能從數據庫?輸出能不能到Excel?等等,好了,後面再來分析吧。
本文參考:
《軟件隨想錄》Joel Spolsky著
《實踐Hadoop》劉鵬著