1、MR計算模型的由來html
MapReduce最先是由Google公司研究提出的一種面向大規模數據處理的並行計算模型和方法。Google公司設計MapReduce的初衷,主要是爲了解決其搜索引擎中大規模網頁數據的並行化處理。java
Google公司發明了MapReduce以後,首先用其從新改寫了其搜索引擎中的Web文檔索引處理系統。但因爲MapReduce能夠廣泛應用於不少大規模數據的計算問題,所以自發明MapReduce之後,Google公司內部進一步將其普遍應用於不少大規模數據處理問題。到目前爲止,Google公司內有上萬個各類不一樣的算法問題和程序都使用MapReduce進行處理。 linux
2003年和2004年,Google公司在國際會議上分別發表了兩篇關於Google分佈式文件系統和MapReduce的論文,公佈了 Google的GFS和MapReduce的基本原理和主要設計思想。程序員
2004年,開源項目Lucene(搜索索引程序庫)和Nutch(搜索引擎)的創始人Doug Cutting發現MapReduce正是其所須要的解決大規模Web數據處理的重要技術,於是模仿Google MapReduce,基於Java設計開發了一個稱爲Hadoop的開源MapReduce並行計算框架和系統。web
自此,Hadoop成爲Apache開源組織下最重要的項目,自其推出後很快獲得了全球學術界和工業界的廣泛關注,並獲得推廣和普及應用。 MapReduce的推出給大數據並行處理帶來了巨大的革命性影響,使其已經成爲事實上的大數據處理的工業標準。算法
2、MapReduce基本設計思想apache
對付大數據並行處理:分而治之:編程
一個大數據若能夠分爲具備一樣計算過程的數據塊,而且這些數據塊之間不存在數據依賴關係,則提升處理速度的最好辦法就是採用「分而治之」的策略進行並行化計算。性能優化
MapReduce採用了這種「分而治之」的設計思想,對相互間不具備或者有較少數據依賴關係的大數據,用必定的數據劃分方法對數據分片,而後將每一個數據分片交由一個節點去處理,最後彙總處理結果。服務器
上升到抽象模型:Map與Reduce:
MapReduce借鑑了函數式程序設計語言Lisp的設計思想。
用Map和Reduce兩個函數提供了高層的並行編程抽象模型和接口,程序員只要實現這兩個基本接口便可快速完成並行化程序的設計。
MapReduce的設計目標是能夠對一組順序組織的數據元素/記錄進行處理。
現實生活中,大數據每每是由一組重複的數據元素/記錄組成,例如,一個Web訪問日誌文件數據會由大量的重複性的訪問日誌構成,對這種順序式數據元素/記錄的處理一般也是順序式掃描處理。
MapReduce提供瞭如下的主要功能:
數據劃分和計算任務調度:系統自動將一個做業(Job)待處理的大數據劃分爲不少個數據塊,每一個數據塊對應於一個計算任務(Task),並自動調度計算節點來處理相應的數據塊。做業和任務調度功能主要負責分配和調度計算節點(Map節點或Reduce節點),同時負責監控這些節點的執行狀態,並負責Map節點執行的同步控制。
數據/代碼互定位:爲了減小數據通訊,一個基本原則是本地化數據處理,即一個計算節點儘量處理其本地磁盤上所分佈存儲的數據,這實現了代碼向數據的遷移;當沒法進行這種本地化數據處理時,再尋找其餘可用節點並將數據從網絡上傳送給該節點(數據向代碼遷移),但將盡量從數據所在的本地機架上尋 找可用節點以減小通訊延遲。
系統優化:爲了減小數據通訊開銷,中間結果數據進入Reduce節點前會進行必定的合併處理;一個Reduce節點所處理的數據可能會來自多個Map節點,爲了不Reduce計算階段發生數據相關性,Map節點輸出的中間結果需使用必定的策略進行適當的劃分處理,保證相關性數據發送到同一個Reduce節點;此外,系統還進行一些計算性能優化處理,如對最慢的計算任務採用多備份執行、選最快完成者做爲結果。
出錯檢測和恢復:以低端商用服務器構成的大規模MapReduce計算集羣中,節點硬件(主機、磁盤、內存等)出錯和軟件出錯是常態,所以 MapReduce須要能檢測並隔離出錯節點,並調度分配新的節點接管出錯節點的計算任務。同時,系統還將維護數據存儲的可靠性,用多備份冗餘存儲機制提 高數據存儲的可靠性,並能及時檢測和恢復出錯的數據
3、MapReduce的編寫
1.加入依賴jar包--編寫pom.xml
<properties> <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version> </properties> <!--分佈式計算--> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <!--分佈式存儲--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> </dependencies>
2.在resources中添加core-site.xml文件,配置內容以下:
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://master2:9000</value> </property> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>master2:8030</value> </property> <property> <name>fs.hdfs.impl</name> <value>org.apache.hadoop.hdfs.DistributedFileSystem</value> </property> </configuration>
master2:表示我集羣中NameNode的主機名,填寫主機名須要在本地機中的hosts中添加IP配置,
若是不配置,請填寫主機名所對應的IP。
3..mapreduce函數(類)的編寫,有三個類分別是表明map、reduce、job
3.1:編寫WCMapper類(map)繼承Mapper並重寫map這個方法,具體內容以下:
package com.day01; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * java基本數據類型步支持序列化--存放在內存中不在磁盤中(不能被持久化) * 用來處理map任務:映射 * map任務接收的是kv,輸出的也是kv 1(行號),hello world * 第一個泛型表示:輸入key的數據類型 輸入的數據至關於文件開頭的偏移量(行號)沒有實際意義 * 第二個泛型表示:輸入value的數據類型 輸入的文件的一行內容 * 第三個泛型表示:輸出key的數據類型 輸出的key是一個字符串 * 第四個泛型表示:輸出value的數據類型 * * LongWritable:等價於java中的long * Text :等價與java中的string * IntWritable:等價於java中的int * * XXXWritable 是hadoop定義的基本數據類型,至關於對java中的數據類型作一個封裝,同時序列化(能夠網絡傳輸以及存儲到磁盤上) * */ public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> { //map方法每次執行一行數據,會被循環調用map方法(有多少行就調用多少次) @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將一行數據(text類型)變爲string類型 String line = value.toString(); String[] words = line.split(" "); //定義value IntWritable one = new IntWritable(1); //便利單詞,輸出word 1 for (int i = 0; i < words.length; i++) { Text keyOut = new Text(words[i]); //輸出word 1 context.write(keyOut,one); } } }
3.二、編寫WCReducer類(reduce)繼承Reducer並重寫reduce這個方法,具體內容以下:
package com.day01; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 用來處理reduce任務:合併 * 在reduce端框架會將相同的key的value放在一個集合(迭代器) */ public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //每次處理一個key,會被循環調用,有多少個key就會調用幾回 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //獲取迭代器 Iterator<IntWritable> iterator = values.iterator(); int count = 0; while (iterator.hasNext()){ IntWritable one = iterator.next(); count+=one.get(); } //context的write只接受hadoop的數據類型,不接受java的數據類型 context.write(key,new IntWritable(count)); } }
3.三、編寫WCJob類(job),具體內容以下:
package com.day01; import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; import java.util.ArrayList; //mapred是hadoop的1.X的包,mapreduce是2.X的API /** * 測試-設定任務的運行 * 輸入與輸出的路徑 */ public class WCJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site.xml")); //設置一個任務,後面是job的名稱 Job job = Job.getInstance(coreSiteConf, "wc"); //設置job的運行類,就是此類 job.setJarByClass(WCJob.class); //設置Map和Reduce處理類 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //設置map輸出類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設置job/reduce輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設置任務的輸入路徑 FileInputFormat.addInputPath(job, new Path("/wc")); //設置任務的輸出路徑--保存結果(這個目錄必須是不存在的目錄) //刪除存在的文件 deleteFileName("/wcout"); FileOutputFormat.setOutputPath(job, new Path("/wcout")); //運行任務 true:表示打印詳情 boolean flag = job.waitForCompletion(true); if (flag){ System.out.println(flag); readContent("/wcout/part-r-00000"); }else { System.out.println(flag+",讀取文件失敗"); } } //刪除已經存在在hdfs上面的文件文件 private static void deleteFileName(String path) throws IOException { //將要刪除的文件 Path fileName = new Path(path); Configuration entries = new Configuration(); //解析core-site-master2.xml文件 entries.addResource(Resources.getResource("core-site.xml")); //獲取客戶端文件系統 FileSystem fileSystem = FileSystem.get(entries); if (fileSystem.exists(fileName)){ System.out.println(fileName+"已經存在,正在刪除它..."); boolean flag = fileSystem.delete(fileName, true); if (flag){ System.out.println(fileName+"刪除成功"); }else { System.out.println(fileName+"刪除失敗"); return; } } //關閉資源 fileSystem.close(); } //讀取文件內容 private static void readContent(String path) throws IOException { //將要讀取的文件路徑 Path fileName = new Path(path); ArrayList<String> returnValue = new ArrayList<String>(); Configuration configuration = new Configuration(); configuration.addResource(Resources.getResource("core-site.xml")); //獲取客戶端系統文件 FileSystem fileSystem = FileSystem.get(configuration); //open打開文件--獲取文件的輸入流用於讀取數據 FSDataInputStream inputStream = fileSystem.open(fileName); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); //一行一行的讀取數據 LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader); //定義一個字符串變量用於接收每一行的數據 String str = null; //判斷什麼時候沒有數據 while ((str=lineNumberReader.readLine())!=null){ returnValue.add(str); } //打印數據到控制檯 System.out.println("文件內容以下:"); for (String read : returnValue) { System.out.println(read); } //關閉資源 lineNumberReader.close(); inputStream.close(); inputStreamReader.close(); } }
-----在本地運行
a.在你的集羣的hdfs上建立/wc
hadoop fs -mkdir /wc
b.將兩個文件寫入內容,內容間以空格隔開,並將文件put到hdfs中的/wc上
c.在C:\Windows\System32中添加hadoop.dll與winutils.exe文件
hadoop.dll與winutils.exe這兩個文件的連接:https://pan.baidu.com/s/10xa7wC3BwlH3oF7DoYFE1A
提取碼:c7ie
d.在D:\soft\hadoop\hadoop-2.7.5\bin中添加hadoop.dll與winutils.exe文件
e.將idea中core-site.xml中關於yarn的配置刪除掉、
-通過以上a-b-c-d-e操做以後,直接Run,出現如下內容就恭喜你成功!!!!!!!
----------在linux上運行
將寫的項目打包
1.點擊左下角的方框,再店家maven projects-->找到想要打包的項目
-->點擊Lifecycle-->雙擊package就會開始打包
2.將jar包上傳並更名(可改可不改)
3.開始運行jar包
例如運行mrdemo.jar中的WEJob類
hadoop jar mrdemo.jar com.day01.WCJob
也能夠在web中輸入http://192.168.228.13:8088/cluster/apps/FINISHED
192.168.228.13:修改爲本身的主機IP,出現SUCCEEDED表示執行成功
過程當中出現下面異常,在網上查看說是hadoop的一個BUG
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1252)
at java.lang.Thread.join(Thread.java:1326)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:716)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:476)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652)
異常問題:
保證本地運行成功
上傳jar包到hdfs上運行找不到類異常--沒有將第三方依賴一塊兒打包
---將第三方jar包放在lib裏面,再打包(就會自動加載lib裏面的jar包)就解決了--以下圖
還有其餘異常,歡迎分享一塊兒解決....