0. 補充(查詢源代碼的操做)node
(1)ctrl+shift+t 查找某個類linux
(2)crtl+t查看類的繼承結構程序員
(3)ctrl+o 查看類中的方法算法
1. MR程序數據處理全流程編程
第一步:FileInputFormat找到指定路徑或文件夾(如果文件夾且有多個文件,會開啓多個map任務,默認是一個文件用一個map去處理),經過調用LineRecordReader類中的createKey(),createValue()方法,獲得多個偏移量和每行數據(offset,line)windows
第二步:這個偏移量以及line會做爲MapTask的參數,調用map方法,對全部line數據進行處理,從而獲得k,v(如a,1 a,1 b,1等),以後是調用HashPartitioner組件對map獲得的key進行分區處理(key%n),獲得分區號(partition),而後MapOutputBuffer組件調用collect方法將k,v,partition收集到數組中去,即將meta以及kv寫入環形緩衝區,默認大小100mb,而且在配置文件裏爲這個緩衝區設定了一個閥值,默認是0.80,同時map還會爲輸出操做啓動一個守護線程,若是緩衝區的內存達到了閥值的80%時候,這個守護線程就會把緩衝區中的內容寫到磁盤上,這個過程叫spill(溢出),能夠屢次溢出(產生多個溢出文件),若寫入內存的數據小於默認的100mb,則會溢出一次(產生一個溢出文件)。另外的20%內存能夠繼續寫入數據,寫入磁盤和寫入內存操做是互不干擾的,若是緩存區被撐滿了,那麼map就會阻塞寫入內存的操做,讓寫入磁盤操做完成後再繼續執行寫入內存操做,如此循環使用 數組
每一對kv都會有一個meta來描述,一個meta佔固定4個字節緩存
第三步:溢出的數據寫入磁盤中(經過SequenceFileOutPutFormat進行),能夠進行屢次溢出(當寫入內存的數據小於默認的100mb時,溢出一次),每一次溢出時都會產生一個溢出文件來記錄這些數據。注意:內存中的數據寫入磁盤時會進行歸併而且排序 即獲得上圖所示的數據。磁盤中的數據能提供下載服務。shuffle會將各個分區的數據分發到指定的ReduceTask,接下來就是Reduce階段了網絡
第四步:經過Facher下載到map階段的數據,並進行歸併排序獲得一個完整的數據,而後經過GroupingPartition組件中的compare方法判斷key是否相同,進而將相同key的值放入同一個迭代器。最後就是從迭代器中獲取這些數據,進行必定的操做獲得本身想要的數據格式,而後存入指定地方,上圖是存入HDFS中,至此整個MR程序數據流程就算走完了多線程
2. yarn
2.1 概述:
yarn是一個資源管理系統。主要負責集羣資源的管理和調度,若是要將程序運行在yarn上須要兩個組件 , 客戶端和ApplicationMaster , 這個組件在編程的過程當中很是複雜 , 例如mapreduce運算框架有現成的實現類供程序員使用(JobClient , MRAppMaster)
程序運行的監控平臺 監控各個程序的運行情況 (程序任務的再處理分配)
MRAppMaster 是applicationMaster的一種實現 , 能夠將MapReduce程序運行在yarn上 .
MRAppMaster主要負責MapReduce程序的生命週期,做業管理 , 資源申請和再分配,Container的啓動和釋放
2.2 ResourceManager和NodeManager
(1) ResourceManager(RM)
RM是一個全局資源管理器,負責整個系統的資源管理和分配。它主要由兩個組件構成:調度器(Scheduler)和應用程序管理器(Applications Manager, ASM)。
調度器根據容量、隊列等限制條件(如每一個隊列分配必定的資源,最多執行必定數量的做業等),將系統中的資源分配給各個正在運行的應用程序。
須要注意的是:該調度器是一個「純調度器」,它再也不從事任何與具體應用程序相關的工做,好比不負責監控或者跟蹤應用的執行狀態等,也不負責從新啓動因應用執行失敗或者硬件故障而產生的失敗任務,這些均交由應用程序相關的ApplicationMaster完成。調度器僅根據各個應用程序的資源需求進行資源分配,而資源分配單位用一個抽象概念「資源容器」(Resource Container,簡稱Container)表示,Container是一個動態資源分配單位,它將內存、CPU、磁盤、網絡等資源封裝在一塊兒,從而限定每一個任務使用的資源量。此外,該調度器是一個可插拔的組件,用戶可根據本身的須要設計新的調度器,YARN提供了多種直接可用的調度器,好比Fair Scheduler和Capacity Scheduler等。
應用程序管理器負責管理整個系統中全部應用程序,包括應用程序提交、與調度器協商資源以啓動ApplicationMaster、監控ApplicationMaster運行狀態並在失敗時從新啓動它等
(2)NodeManager
a. 彙報本身的節點的資源狀況
b. 領取屬於本身的任務,運行任務
c 彙報本身處理的任務的執行狀況
2.3 多角度理解yarn
2.3.1 並行編程
在單機程序設計中,爲了快速處理一個大的數據集,一般採用多線程並行編程,其大體流程以下:先由操做系統啓動一個主線程,由他負責數據切分、任務分配、子線程啓動和銷燬等工做,而各個子線程只負責計算本身的數據,當全部子線程處理完數據後,主線程再退出。類比理解,YARN上的應用程序運行過程與之很是相近,只不過它是集羣上的分佈式並行編程。可將YARN看作一個雲操做系統,它負責爲應用程序啓動ApplicationMaster(至關於主線程),而後再由ApplicationMaster負責數據切分、任務分配、啓動監控等工做,而由ApplicationMaster啓動的各個Task(至關於子線程)僅負責本身的計算任務。當全部任務計算完成後,ApplicationMaster認爲應用程序運行完成,而後退出
2.3.2 資源管理系統
2.4 yarn的安裝
(1)由於安裝了hadoop,因此直接配置yarn-site.xml文件,配置文件以下:
<configuration> <!-- 配置resourcemanager的機器的位置,這樣全部的nodemanager就能找到其位置 --> <property> <name>yarn.resourcemanager.hostname</name> <value>feng01</value> </property> <!-- 爲mr程序提供shuffle服務,提升數據傳輸效率,rpc傳輸效率低 --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- 一臺NodeManager的總可用內存資源 --> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value> </property> <!-- 一臺NodeManager的總可用(邏輯)cpu核數 --> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>2</value> </property> <!-- 是否檢查容器的虛擬內存使用超標狀況 --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> <!-- 容器的虛擬內存使用上限:與物理內存的比率 --> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> </configuration>
(2)而後將yarn-site.xml遠程複製到feng02,feng03,feng04
(3)開啓:start-yarn.sh,其會讀取slaves文件,啓動nodemanager
(4) 訪問feng01:8088,便可獲得以下界面
2.5 程序提交到yarn
2.5.1 window提交
刪除HDFS根目錄上的全部文件命令hdfs dfs -rm -r hdfs://feng01:9000/*(要寫上絕對路徑)
以之前的單詞統計爲案例
JobDriver
public class JobDriver { public static void main(String[] args) throws Exception { // 當前操做的用戶名 System.setProperty("HADOOP_USER_NAME", "root") ; // 獲取mr程序運行時的初始化配置 Configuration conf = new Configuration(); // 默認 的狀況程序以本地模式local運行 // 修改運行模式 conf.set("mapreduce.framework.name", "yarn"); // 設置resource manage機器的位置 conf.set("yarn.resourcemanager.hostname", "feng01"); //設置操做的文件系統 HDFS conf.set("fs.defaultFS", "hdfs://feng01:9000/"); // windows運行程序在yarn上的誇平臺參數 conf.set("mapreduce.app-submission.cross-platform", "true"); Job job = Job.getInstance(conf); // 設置程序的jar包的路徑 job.setJar("d://wc.jar"); // job.setJarByClass(JobDriver.class); // 設置map和reduce類 調用類中自定義的map reduce方法的業務邏輯 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 設置map端輸出的key-value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置reduce的key-value的類型 結果的最終輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設置reducetask的個數 默認1個 //job.setNumReduceTasks(3); // 處理的文件的路徑 FileInputFormat.setInputPaths(job, new Path("/data/wc/input")); // 結果輸出路徑 FileOutputFormat.setOutputPath(job, new Path("/data/wc/output2")); // 提交任務 參數等待執行 job.waitForCompletion(true) ; } }
WordCountMapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /** * 當nextKeyValue() map * map方法是本身的自定義的map階段的業務邏 * map方法什麼時候執行??? * 一行數據執行一次 * 參數一 當前一行數據的偏移量 * 參數二 當前這行數據 * 參數三 context 上下文件 結果的輸出 輸出給reduce */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //獲取一行數據 String line = value.toString(); String[] words = line.split(" "); for (String word : words) { // context.write(new Text(word), new IntWritable(1));//a 1 a 1 a 1 } } }
WordCountReducer
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * context.nextKey() 相同的key會被聚合到一個reduce方法中 * 執行時機 一個單詞執行一次 * key 單詞 * values 將當前的單詞出現的全部的1 存儲在迭代器中 * */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count++ ; } // key是單詞 count是單詞對應的個數 context.write(key, new IntWritable(count)); } }
2.5.2 linux提交
public class JobDriver { public static void main(String[] args) throws Exception {// 獲取mr程序運行時的初始化配置 Configuration conf = new Configuration(); // 默認 的狀況程序以本地模式local運行 // 修改運行模式 conf.set("mapreduce.framework.name", "yarn"); // 設置resource manage機器的位置 conf.set("yarn.resourcemanager.hostname", "feng01"); //設置操做的文件系統 HDFS conf.set("fs.defaultFS", "hdfs://feng01:9000/"); // 獲取Job對象 Job job = Job.getInstance(conf); // 設置程序的jar包的路徑 job.setJar("d://wc.jar"); job.setJarByClass(JobDriver.class); // 設置map和reduce類 調用類中自定義的map reduce方法的業務邏輯 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 設置map端輸出的key-value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置reduce的key-value的類型 結果的最終輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設置reducetask的個數 默認1個 //job.setNumReduceTasks(3); // 處理的文件的路徑 FileInputFormat.setInputPaths(job, new Path("/data/wc/input")); // 結果輸出路徑 FileOutputFormat.setOutputPath(job, new Path("/data/wc/output2")); // 提交任務 參數等待執行 job.waitForCompletion(true) ; } }
3. merger案例(小文件合併)
因爲mr程序中map任務的個數是按照文件的個數來決定的,默認是一個map任務處理一個文件,如果不少小文件的話,就須要不少mapTask去處理,這樣就會很浪費資源
HDFS不適合存儲小文件,MR程序不適合處理小文件
若實在是有太多的小文件,解決方法:
在HDFS中有個文件夾中存在大量的小文件待處理
1) 默認的狀況下一個文件產生一個maptask任務去處理 產生大量的maptask 慢
2) 小文件存儲在HDFS中 每一個文件都有元數據的記錄 ,在namenode中的元數據是記錄在內存中一份
在存儲的數據必定大小的狀況下增長了namenode的工做壓力
記錄大量的元數據信息 ,整個集羣的存儲能力下降
4 數據傾斜
在數據按照key進行分區的時候 會產生一個區分的數據特別多 一個特別少 出現了數據傾斜問題。只有數據多的任務結束 任務纔算結束 總體的任務時間很長。
解決方法:
(1)避免分區
(2)key中拼接隨機數
5. join案例
有兩個文件user.txt, order.txt,以下
需求:獲取oid+user的拼接信息
代碼以下
JoinBean
public class JoinBean implements Writable { private String oid; private String uid; private String name; private int age; private String fd; /** * 標識類中存儲的是哪一個文件的數據 */ private String table; public String getOid() { return oid; } public void setOid(String oid) { this.oid = oid; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getFd() { return fd; } public void setFd(String fd) { this.fd = fd; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } @Override public String toString() { return uid + "," + name + "," + age + "," + fd; } /** * 在序列化的時候有讀寫的字段 讀寫的字段不能爲null */ @Override public void readFields(DataInput din) throws IOException { this.oid = din.readUTF(); this.uid = din.readUTF(); this.name = din.readUTF(); this.age = din.readInt(); this.fd = din.readUTF(); this.table = din.readUTF(); } /* @Override public String toString() { return "JoinBean [oid=" + oid + ", uid=" + uid + ", name=" + name + ", age=" + age + ", fd=" + fd + ", table=" + table + "]"; }*/ @Override public void write(DataOutput dout) throws IOException { dout.writeUTF(this.oid);// dout.writeUTF(this.uid); dout.writeUTF(this.name); dout.writeInt(this.age); dout.writeUTF(this.fd); dout.writeUTF(this.table); } }
此處的注意點:在序列化的時候有讀寫的字段,讀寫的字段不能爲null
Join
public class Join { static class JoinMapper extends Mapper<LongWritable, Text, Text, JoinBean> { // 讀取數據 根據文件名 將數據封裝在不一樣的table標識的類 String name = null ; @Override protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context) throws IOException, InterruptedException { FileSplit fs = (FileSplit)context.getInputSplit(); name = fs.getPath().getName(); } Text k = new Text() ; JoinBean joinBean = new JoinBean() ; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context) throws IOException, InterruptedException { try { String line = value.toString(); if(name.startsWith("orders")) {// 訂單數據 //處理數據 String[] split = line.split(","); joinBean.setOid(split[0]); joinBean.setUid(split[1]); joinBean.setName(""); joinBean.setAge(-1); joinBean.setFd(""); joinBean.setTable("orders"); }else {// 用戶數據 String[] split = line.split(","); joinBean.setOid(""); joinBean.setUid(split[0]); joinBean.setName(split[1]); joinBean.setAge(Integer.parseInt(split[2])); joinBean.setFd(split[3]); joinBean.setTable("user"); } k.set(joinBean.getUid()) ; context.write(k, joinBean); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } static class JoinReducer extends Reducer<Text, JoinBean, Text, NullWritable> { // 聚合同一我的的全部的數據 //iters 裏面既有用戶的訂單數據(多個) 也有用戶數據(1) @Override protected void reduce(Text uid, Iterable<JoinBean> iters, Reducer<Text, JoinBean, Text, NullWritable>.Context context) throws IOException, InterruptedException { JoinBean user = new JoinBean(); List<JoinBean> ordersList = new ArrayList<>() ; // 獲取當前用戶全部的數據 for (JoinBean joinBean : iters) { // 根據這個字段來判斷當前迭代的數據是訂單數據仍是用戶信息數據 String table = joinBean.getTable(); if("orders".equals(table)) { // 訂單數據 JoinBean order = new JoinBean(); order.setOid(joinBean.getOid()); order.setUid(joinBean.getUid()); //list ordersList.add(order) ; }else {// 用戶數據 user.setUid(joinBean.getUid()); user.setName(joinBean.getName()); user.setAge(joinBean.getAge()); user.setFd(joinBean.getFd()); } } for (JoinBean joinBean : ordersList) { // o0001,uid,name,age,fd String key = joinBean.getOid()+","+user.toString() ; context.write(new Text(key), NullWritable.get()); } } } public static void main(String[] args) throws Exception { // 獲取mr程序運行時的初始化配置 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 設置map和reduce類 調用類中自定義的map reduce方法的業務邏輯 job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); // 設置map端輸出的key-value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(JoinBean.class); // 設置reduce的key-value的類型 結果的最終輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //設置reducetask的個數 默認1個 //job.setNumReduceTasks(3); // 處理的文件的路徑 FileInputFormat.setInputPaths(job, new Path("D:\\data\\join\\input")); // 結果輸出路徑 FileOutputFormat.setOutputPath(job, new Path("D:\\data\\join\\res")); // 提交任務 參數等待執行 job.waitForCompletion(true) ; } }