map階段:將每行文本數據變成<單詞,1>這樣的k,v數據html
reduce階段:將相同單詞的一組kv數據進行聚合,累加全部的vjava
1.1注意事項node
mapreduce程序中: 1.map階段的進,出數據 2.reduce階段的進,出數據 類型都應該是實現了Hadoop序列化框架類型 好比:String對應Text;Integer對應IntWritable;Long對應LongWritable
1.2wordcount程序總體運行流程示意圖linux
yarn是一個分佈式程序的運行調度平臺 yarn中有兩大核心角色: 一、Resource Manager 接受用戶提交的分佈式計算程序,併爲其劃分資源 管理、監控各個Node Manager上的資源狀況,以便於均衡負載 二、Node Manager 管理它所在機器的運算資源(cpu + 內存) 負責接受Resource Manager分配的任務,建立容器、回收資源
2.1.YARN的安裝spring
node manager在物理上應該跟data node部署在一塊兒 resource manager在物理上應該獨立部署在一臺專門的機器上
2.2修改配置文件apache
參考官網:https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-common/yarn-default.xmlwindows
cd /root/apps/hadoop-2.7.2/etc/hadoop vi yarn-site.xml
2.3在<configuratiomn></configuration>裏面添加瀏覽器
<property> <name>yarn.resourcemanager.hostname</name> <value>hdp-01</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>2</value> </property>
2.4拷貝配置文件到其它節點上springboot
scp yarn-site.xml hdp-02:$PWD scp yarn-site.xml hdp-03:$PWD scp yarn-site.xml hdp-04:$PWD
3.啓動和中止hdfs集羣和yarn集羣命令app
1.hdfs: stop-dfs.sh:中止配置的namenode datanode start-dfs.sh:啓動namenode datanode 2.yarn: start-yarn.sh:啓動resourcemanager和nodemanager(注:該命令應該在resourcemanager所在的機器上執行) stop-yarn.sh:中止resourcemanager和nodemanager
4.其它命令
jps查看ResourceManager進程號 netstat -nltp | grep 進程號 8088是網頁的 free -m:查看還剩多少內存
5.編碼實現
1.WordcountMapper類開發 2.WordcountReducer類開發 3.JobSubmitter客戶端類開發
5.1.WordcountMapper類開發
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 1.KEYIN:是map task讀取到的數據的key的類型,是一行的起始偏移量Long * 2.VALUEIN:是map task讀取到的數據的value的類型,是一行的內容String * 3.KEYOUT:是用戶的自定義map方法要返回的結果kv數據的key類型,在 * word count邏輯中,返回單詞String * 4.VALUEOUT:是用戶的自定義map方法要返回的結果kv數據的value類型, * 在word count邏輯返回Integer * * 可是在mapreduce中,map 產生的數據須要傳輸給reduce,須要進行序列化和反序列化, * 而Jdk 中的原生序列化機制產生的數據比較冗餘就會致使數據在mapreduce運行過程比 * 較慢,Hadoop專門設計了本身序列化機制,那麼,mapreduce 中傳輸的數據的數據類型 * 就必須實現Hadoop本身的序列化接口 * Hadoop爲jdk 中經常使用的基本類型Long,String,Integer,Float等數據類型封裝了本身 * 的實現Hadoop序列化接口類型:LongWritable,Text(String),IntWritable.. */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.切單詞 String line = value.toString(); String[] words = line.split(" "); for(String word:words){ context.write(new Text(word),new IntWritable(1)); } } }
5.2.WordcountReducer類開發
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 1.前面的Text,IntWritable:表示接收到map傳過來的參數 * 2.後面的Text, IntWritable:表示Reduce返回的數據類型 */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //idea快捷鍵(ctrl+o)查看重寫的方法 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; Iterator<IntWritable> iterator = values.iterator(); while (iterator.hasNext()){ IntWritable value = iterator.next(); count += value.get(); } context.write(key,new IntWritable(count)); } }
5.3.JobSubmitter客戶端類開發
/** * 用於提交MapReduce的客戶端程序 * 功能: * 1,封裝本次job運行時所須要的必要參數 * 2.跟yarn進行交互,將mapreduce 程序成功的啓動,運行 */ public class JobSubmitter { public static void main(String[] args)throws Exception { //在代碼中設置JVM系統參數,用於給job對象來獲取訪問HDFS的用戶身份 System.setProperty("HADOOP_USER_NAME","root"); Configuration conf = new Configuration(); //1.設置job運行時默認要訪問的文件系統 conf.set("fs.defaultFS","hdfs://hdp-01:9000"); //2.設置job提交到哪裏去運行(放本地local,這裏放在yarn上運行) conf.set("mapreduce.framework.name","yarn"); //3.指定位置 conf.set("yarn.resourcemanager.hostname","hdp-01"); //4.若是須要在Windows系統運行這個job提交客戶端程序,則須要加這個跨平臺提交參數 conf.set("mapreduce.app-submission.cross-platform","true"); Job job = Job.getInstance(conf); //1.封裝參數:jar包所在的位置 job.setJar("d:/wc.jar"); //動態獲取jar包在哪裏 //job.setJarByClass(JobSubmitter.class); //2.封裝參數:本次job所要調用的mapper實現類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //3.封裝參數:本次job的Mapper實現類產生的數據key,value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //4.封裝參數:本次Reduce返回的key,value數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path output=new Path("/wordcount/output5"); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root"); if(fs.exists(output)){ fs.delete(output,true); } //5.封裝參數:本次job要處理的輸入數據集所在路徑,最終結果的輸出路徑 FileInputFormat.setInputPaths(job,new Path("/wordcount/input")); FileOutputFormat.setOutputPath(job,output); //6.封裝參數:想要啓動的reduce task的數量 job.setNumReduceTasks(2); //7.向yarn提交本次job //job.submit(); //等待任務完成,把ResourceManage反饋的信息打印出來 boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
5.4.pom依賴
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.8.1</version> </dependency> </dependencies>
5.5.運行mapreduce程序
1.將工程總體打成一個jar包並上傳到linux機器上, 2.準備好要處理的數據文件放到hdfs的指定目錄中 3.用命令啓動jar包中的Jobsubmitter,讓它去提交jar包給yarn來運行其中的mapreduce程序 : hadoop jar wc.jar cn.xuyu.JobSubmitter ..... 4.去hdfs的輸出目錄中查看結果
5.6.測試說明
本次測試在Windows環境,因此須要打成jar包,更名爲wc.jar放在本地D:/盤目錄下
5.7.運行結果
5.7.1.訪問:http://hdp-01:8088/cluster/apps
5.7.2.訪問:http://hdp-01:50070/explorer.html#/wordcount
5.7.3.命令行輸入命令查看統計結果
[root@hdp-01 ~]# hadoop fs -ls /wordcount/output Found 1 items -rw-r--r-- 2 root supergroup 59 2019-05-25 22:13 /wordcount/output/res .dat [root@hdp-01 ~]# hadoop fs -ls /wordcount/output5 Found 3 items -rw-r--r-- 2 root supergroup 0 2019-05-27 03:58 /wordcount/output5/_S UCCESS -rw-r--r-- 2 root supergroup 13 2019-05-27 03:58 /wordcount/output5/pa rt-r-00000 -rw-r--r-- 2 root supergroup 46 2019-05-27 03:58 /wordcount/output5/pa rt-r-00001 [root@hdp-01 ~]# hadoop fs -cat /wordcount/output5/part-r-00001 3 FFH 3 GGH 3 Helllo 3 Hello 15 Jasd 3 Tom 3 [root@hdp-01 ~]# hadoop fs -cat /wordcount/output5/part-r-00000 GGG 3 xuyu 3
5.7.4.在瀏覽器中查看內容
5.7.5.下載下來能夠看到以下內容
6.1須要去修改JobSubmitter 代碼:以下
/** * 用於提交MapReduce的客戶端程序 * 功能: * 1,封裝本次job運行時所須要的必要參數 * 2.跟yarn進行交互,將mapreduce 程序成功的啓動,運行 *說明: * 若是要在hadoop集羣的某臺機器上啓動這個job提交客戶端的話 * conf裏面就不須要指定 fs.defaultFS mapreduce.framework.name * 由於在集羣機器上用 hadoop jar springboot-hdp-a-1.0-SNAPSHOT.jar com.xuyu.mapreduce.JobSubmitter 命令來啓動客戶端main方法時, * hadoop jar這個命令會將所在機器上的hadoop安裝目錄中的jar包和配置文件加入到運行時的classpath中 * * 那麼,咱們的客戶端main方法中的new Configuration()語句就會加載classpath中的配置文件,天然就有了 * fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 這些參數配置 */ public class JobSubmitter { public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //動態獲取jar包在哪裏 job.setJarByClass(JobSubmitter.class); //2.封裝參數:本次job所要調用的mapper實現類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //3.封裝參數:本次job的Mapper實現類產生的數據key,value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //4.封裝參數:本次Reduce返回的key,value數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path output=new Path("/wordcount/output5"); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root"); if(fs.exists(output)){ fs.delete(output,true); } //5.封裝參數:本次job要處理的輸入數據集所在路徑,最終結果的輸出路徑 FileInputFormat.setInputPaths(job,new Path("/wordcount/input")); FileOutputFormat.setOutputPath(job,output); //6.封裝參數:想要啓動的reduce task的數量 job.setNumReduceTasks(3); //7.向yarn提交本次job //job.submit(); //等待任務完成,把ResourceManage反饋的信息打印出來 boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
6.2須要從新打包發佈到Linux虛擬機上
這裏上傳到hdp-04這臺虛擬機上
進入目錄:cd /root/apps/hadoop-2.7.2/etc/hadoop
修改配置文件名字:mv mapred-site.xml.template mapred-site.xml編輯配置:vi mapred-site.xml
加入這些配置:
<property>
<name>mapreduce.framework.name</name>
<value>yarn</name>
</property>運行程序:
hadoop jar springboot-hdp-1.0-SNAPSHOT.jar com.xuyu.mapreduce.JobSubmitter
6.3效果展現
7.若是直接在windows上運行,進行測試代碼修改以下
import com.xuyu.mapreduce.WordcountMapper; import com.xuyu.mapreduce.WordcountReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobSubmitterWindowsLocal { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //conf.set("fs.defaultFS", "file:///"); //conf.set("mapreduce.framework.name", "local"); Job job = Job.getInstance(conf); job.setJarByClass(JobSubmitterWindowsLocal.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("f:/mrdata/wordcount/input")); FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/wordcount/output1")); job.setNumReduceTasks(3); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }