5V特徵:java
GFS --> HDFS
MapReduce --> MapReduce
BigTable -- > HBasenode
心跳是每3秒一次,linux
心跳返回結果帶有NameNode給該DataNode的命令如刪除塊,
複製塊等apache
若是超過10分鐘沒有收到某個DataNode 的心跳,則認爲該
節點不可用編程
DataNode啓動後向NameNode註冊,windows
經過後,週期性(1小時)的向NameNode上報全部的塊信息centos
當DataNode讀取block的時候,從新計算checksum,和建立
時的對比緩存
DataNode 在其文件建立後三週驗證其checksum安全
NameNode服務器
DataNode
NodeManager
ResourceManager
建立fsimage文件,存儲fsimage信息
建立edits文件
加載fsimage和edits文件
生成新的fsimage和edits文件
等待DataNode註冊與發送Block Report
向NameNode註冊、發送Block Report
namenode啓動時會進入安全模式,此時只可讀不可寫
NameNode 啓動過程
在安全模式下,文件系統不容許修改
目的,是在系統啓動時檢查各個datanode數據的有效性
進入安全模式的三種方式
$ bin/hdfs dfsadmin -safemode enter
$ bin/hdfs dfsadmin -safemode leave
<property> <name>dfs.namenode.safemode.threshold-pct</name> <value>0.999f</value> </property>
優勢
缺點:
package com.ct.test; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Before; import org.junit.Test; public class TestDemo { FileSystem fs = null; // public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { // //// FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"), //// new Configuration(), //// "chen"); //// //// boolean success = fs.mkdirs(new Path("/test")); //// //// System.out.println(success); //// test.setUp(); //// test.testMkdir(); //// test.testDelete(); // // // // // } @Before //獲取文件對象 public void setUp() { Configuration conf = new Configuration(); conf.set("dfs.replication", "7"); try { fs = FileSystem.get(new URI("hdfs://centos01:8020"), conf, "chen"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //建立文件夾 @Test public void testMkdir() throws IllegalArgumentException, IOException { boolean success = fs.mkdirs(new Path("/result")); System.out.println(success); } //刪除文件夾 public void testDelete() throws IllegalArgumentException, IOException { fs.delete(new Path("/result"), true); } @Test //上傳文件 public void testUpload() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/input/testUpload.log")); FileInputStream input = new FileInputStream("F:/test.txt"); IOUtils.copy(input, out, 1024); } @Test public void testDownload() throws IllegalArgumentException, IOException { FSDataInputStream input = fs.open(new Path("/input/testUpload.log")); FileOutputStream out = new FileOutputStream("F:/test-copy.txt"); IOUtils.copy(input, out, 1024); } @Test public void testList() throws FileNotFoundException, IllegalArgumentException, IOException { RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true); while(ri.hasNext()) { LocatedFileStatus next = ri.next(); next.getBlockLocations(); String group = next.getGroup(); long len = next.getLen(); String owner = next.getOwner(); FsPermission permission = next.getPermission(); long blockSize = next.getBlockSize(); short rep = next.getReplication(); System.out.println(permission+"\t"+owner+"\t"+group); System.out.println(len+"\t"+blockSize+"\t"+rep); BlockLocation[] blockLocations = next.getBlockLocations(); for (BlockLocation blktn : blockLocations) { System.out.println("length:"+blktn.getLength()); System.out.println("offset:"+blktn.getOffset()); System.out.println(Arrays.toString(blktn.getHosts())); } } } }
ResourceManager
NodeManager
ApplicationMaster
Container
Map和Reduce 計算框架,編程模型 「分而治之」的思想, 分佈式並行計算
對一些獨立元素組成的列表的每個元素進行制定的操做,可高度並行
// step 1: Map Class /** * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO update paragram public static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub }
對一個列表元素進行合併
// step 2: Reduce Class /** * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub } }
// step 3: Driver ,component job, implements Tool public int run(String[] args) throws Exception { // 1: get configration Configuration configuration = getConf(); // 2: create Job Job job = Job.getInstance(configuration, this.getClass() .getSimpleName()); // run jar job.setJarByClass(this.getClass()); // 3: set job // input -> map -> reduce -> output // 3.1 input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); // 3.2: map job.setMapperClass(ModuleMapper.class); //TODO update paragram job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 3.3: reduce job.setReducerClass(ModuleReducer.class); //TODO job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4: output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // 4: submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; }
package com.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountDemo extends Configured implements Tool { /** * map 任務的定義 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 偏移量 LongWritable * VALUEIN 一行文本 Text * KEYOUT 單詞 Text * VALUEOUT 1 IntWritable * * map任務 * 將一行文本拆分紅單詞 * * */ public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value); //1. 單詞拆分 String[] vals = value.toString().split(" "); //2. 遍歷輸出 for (String val : vals) { keyOut.set(val); valueOut.set(1); context.write(keyOut, valueOut); System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut); } } } /** * * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 單詞 Text * VALUEIN 單詞次數的集合 list的元素 IntWritable * KEYOUT 單詞 Text * VALUEOUT 總次數 IntWritable * */ public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable valueOut = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.print("keyIn:"+key+"\t\t["); //1. 求次數綜合 int sum = 0; for (IntWritable value : values) { sum += value.get(); System.out.print(value+",\t"); } System.out.println("]"); //2. 輸出 valueOut.set(sum); context.write(key, valueOut); } } @Override public int run(String[] args) throws Exception { //1 設置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2. 設置map類和reduce類 job.setMapperClass(WCMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new WordCountDemo(), args); System.out.println(run==1?"成功":"失敗"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
適合大小表join,將小表緩存在內存中,join發生在map端
只緩存一次,在Mapper子類中重寫setup方法,在setup方法中將小表文件裝入內存中
Mapper子類中map方法讀取大表
package com.join; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapJoin extends Configured implements Tool { public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> { HashMap<String, String> cacheMap = new HashMap<String, String>(); // 首相將小表讀入內存 // 該方法只在每次任務開始時加載一次 @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String path = "F:\\input\\join\\dept.log"; FileReader fr = new FileReader(path); BufferedReader br = new BufferedReader(fr); String line = null; while((line=br.readLine()) != null) { String[] vals = line.split("\t"); cacheMap.put(vals[0], vals[1]); } br.close(); fr.close(); } // map端根據兩張表的key進行合併 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); String deptno = cacheMap.get(vals[2]); String dname = cacheMap.get(deptno); context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1])); } } @Override public int run(String[] args) throws Exception { //1 設置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 設置map類和reduce job.setMapperClass(MJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //3 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new MapJoin(), args); System.out.println(run==1?"成功":"失敗"); } catch (Exception e) { e.printStackTrace(); } } }
適合兩張大表join
package com.join; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReduceJoin extends Configured implements Tool { /* * 1 技術部 * 1002 rose 1 */ public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{ Text keyOut = new Text(); Text valueOut = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); if(vals.length == 2) { keyOut.set(vals[0]); valueOut.set(vals[1]); }else { keyOut.set(vals[2]); valueOut.set(vals[0]+"\t"+vals[1]); } context.write(keyOut, valueOut); } } /* * keyIn:1 * valueIn List{[1007 lily], [1002 rose], [1001 jack], [技術部]} */ // reduce端合併是依靠MapReduce shuffle過程當中將相同key的行放入同一臺機器 public static class RJReducer extends Reducer<Text, Text, Text, Text> { ArrayList<String> employees = new ArrayList<String>(); @Override protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String department = null; employees.clear(); //這裏要注意清空list for (Text tmp : valueIn) { String[] vals = tmp.toString().split("\t"); // 根據length判斷這是張什麼表 if(vals.length == 1) { department = vals[0]; }else if(vals.length == 2) { employees.add(tmp.toString()); } } for (String employee : employees) { context.write(keyIn, new Text(employee+"\t"+department)); } } } @Override public int run(String[] args) throws Exception { //1 設置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 設置map類和reduce job.setMapperClass(RJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(RJReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //3 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new ReduceJoin(), args); System.out.println(run==1?"成功":"失敗"); } catch (Exception e) { e.printStackTrace(); } } }
安裝maven
修改settings.xml配置文件中指定的repository(修改apache-maven-3.0.5confsettings.xml)
<localRepository>D:/repository</localRepository>
配置eclipse的maven環境
windows->preferences->maven->
->installations->add->勾選本身安裝的maven ->user settings->選擇mave家目錄/conf/settings
Windows安裝hadoop
配置hadoop的環境變量
添加環境變量 HADOOP_HOME=hadoop解壓目錄 在PATH環境變量中追加 %HADOOP_HOME%/bin;
測試
hadoop -h
eclipse安裝插件
eclipse配置插件參數,鏈接HDFS
<!--關閉hdfs的文件權限控制--> <property> <name>dfs.permissions</name> <value>false</value> </property>
eclipse->windows->show views->other->輸入MapReduce->點擊map reduce locations
右擊->new hadoop locations
Map/Reduce Master
Mapreduce(V2) host:[hostname] port:8032 //resourcemanager 的默認端口號
DFS Master
DFS Master host:[hostname] port:8020
將lo4j.perperties文件拷貝到src/main/resources
yarn jar pv.jar /input/2015082818 /output
yarn jar pv.jar 類的全限定名 /input/2015082818 /output
不一樣包中可能有相同類名,因此要指定類的全限定名
MapReduce框架核心部分(設計精髓):內核
map() 輸出開始 到 reduce()輸入開始 此階段是shuffle
input -> map -> shuffle -> reduce -> output
map shuffle phase
reduce shuffle phase
shuffle主要操做
partitioner - map
sorter - map & reduce
combiner: map phase局部聚合操做 不是全部的MapReduce程序均可以進行局部聚合的
compress:map phase的輸出數據壓縮 針對全部MapReduce程序均可以進行設置
group - reduce
全部操做都是針對map()輸出的<key, value>數據進行的
當達到環形緩衝區內存的80%默認狀況下,將會將緩衝區中的數據spill到本地磁盤中(溢出到MapTask所運行的NodeManager機器的本地磁盤中)
溢寫
並非當即將緩衝區中的數據溢寫到本地磁盤,而是須要通過一些操做
依據此MapReduce Job中Reduce Task個數進行分區決定map輸出的數據被哪一個reduce任務進行處理分析默認狀況下,依據key採用HashPartitioner
// 經過取餘將數據分配到哪一個reduce處理 HashPartitioner int getParitition(key, value, numreducetask) { return ( key.hashCode&Integer.maxValue)%numreducetask; }
會對每一個分區中的數據進行排序,默認狀況下依據key進行排序
將分區排序後的數據寫到本地磁盤的一個文件中
反覆上述的操做,產生多個小文件
當溢寫結束後
各個分區的數據合併在一塊兒(當MapTask處理數據完成之後,告知AppMaster,而後AppMaster通知全部的ReduceTask,各個ReduceTask主動到已經完成的MapTask的本地磁盤,去拉取屬於本身要處理的數據(分區中))
最後每一個分區造成一個文件(map輸出的數據最後在個文件中),分區的,而且各個分區的數據已經進行了排序。
分組group
將相同key的value值存入到list集合,造成新的key, list(value),將key/value對數據傳遞給reduce()函數進行處理。
最後將(key, list(value))傳給 reduce()
FileInputFormat.setMaxInputSplitSize(job, size); 設置切片最大值 FileInputFormat.setMinInputSplitSize(job, size); 設置切片最小值
FileInputFormat public List<InputSplit> getSplits(JobContext job){。。。} protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } // minSize<=maxSize<blockSize 提升併發 // minSize>blockSize 下降併發
job.setNumReduceTasks(2); HashParitioner 決定map輸出的類被哪一個reduce處理
package com.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 不用serializable * * 用Hadoop的Writable * */ public class Flow implements Writable { private long up; private long down; private long sum; public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } public void setSum(long sum) { this.sum = sum; } @Override public String toString() { return up + "\t" + down + "\t" + sum; } @Override public void write(DataOutput out) throws IOException { out.writeLong(up); out.writeLong(down); out.writeLong(sum); } @Override public void readFields(DataInput in) throws IOException { up = in.readLong(); down = in.readLong(); sum = in.readLong(); } }
public static class MyPartitioner extends Partitioner<Text, Flow> { @Override public int getPartition(Text key, Flow value, int numPartitions) { if(value.getSum()<1024) { return 0; }else if(value.getSum()<10*1024) { return 1; } return 2; } }
只能按照key排序,若是須要多重排序,須要自定義key
在shuffle過程當中自動排序,無需手動調用方法
public class MyKey implements WritableComparable<MyKey> //要排序的類要實現WritableComparable接口 @Override public int compareTo(MyKey o) { long result = o.getSum() - this.getSum(); if(result>0) { return 1; }else if(result<0) { return -1; } return o.getPhone().compareTo(this.getPhone()); }
map端的小reduce,對每一個map後的value進行reduce,減小數據傳輸
能夠經過設置job.setCombinerClass(WCReducer.class);設置combiner
先後效果對比
原始數據 hello world hello hadoop hello world hello java keyIn:hadoop [1, ] keyIn:hello [1, 1, 1, 1, ] keyIn:java [1, ] keyIn:world [1, 1, ] keyIn:hadoop [1, ] keyIn:hello [2, 2, ] keyIn:java [1, ] keyIn:world [1, 1, ]
根據需求將key中相同的字段做爲同一個key以減小鍵值對,做爲一種優化的手段
重寫 RawComparator 方法合併key中相同字段
經過 job.setGroupingComparatorClass(Mygroup.class); 調用
public static class Mygroup implements RawComparator<Person> { @Override public int compare(Person o1, Person o2) { // TODO Auto-generated method stub return 0; } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable
map方法把文件的行號當成key,因此要用LongWritable。