若是超過10分鐘沒有收到某個DataNode 的心跳,則認爲該
DataNode 在其文件建立後三週驗證其checksum安全
等待DataNode註冊與發送Block Report
向NameNode註冊、發送Block Report
NameNode 啓動過程
$ bin/hdfs dfsadmin -safemode enter
$ bin/hdfs dfsadmin -safemode leave
<property> <name>dfs.namenode.safemode.threshold-pct</name> <value>0.999f</value> </property>
package com.ct.test; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Before; import org.junit.Test; public class TestDemo { FileSystem fs = null; // public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { // //// FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"), //// new Configuration(), //// "chen"); //// //// boolean success = fs.mkdirs(new Path("/test")); //// //// System.out.println(success); //// test.setUp(); //// test.testMkdir(); //// test.testDelete(); // // // // // } @Before //獲取文件對象 public void setUp() { Configuration conf = new Configuration(); conf.set("dfs.replication", "7"); try { fs = FileSystem.get(new URI("hdfs://centos01:8020"), conf, "chen"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //建立文件夾 @Test public void testMkdir() throws IllegalArgumentException, IOException { boolean success = fs.mkdirs(new Path("/result")); System.out.println(success); } //刪除文件夾 public void testDelete() throws IllegalArgumentException, IOException { fs.delete(new Path("/result"), true); } @Test //上傳文件 public void testUpload() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/input/testUpload.log")); FileInputStream input = new FileInputStream("F:/test.txt"); IOUtils.copy(input, out, 1024); } @Test public void testDownload() throws IllegalArgumentException, IOException { FSDataInputStream input = fs.open(new Path("/input/testUpload.log")); FileOutputStream out = new FileOutputStream("F:/test-copy.txt"); IOUtils.copy(input, out, 1024); } @Test public void testList() throws FileNotFoundException, IllegalArgumentException, IOException { RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true); while(ri.hasNext()) { LocatedFileStatus next = ri.next(); next.getBlockLocations(); String group = next.getGroup(); long len = next.getLen(); String owner = next.getOwner(); FsPermission permission = next.getPermission(); long blockSize = next.getBlockSize(); short rep = next.getReplication(); System.out.println(permission+"\t"+owner+"\t"+group); System.out.println(len+"\t"+blockSize+"\t"+rep); BlockLocation[] blockLocations = next.getBlockLocations(); for (BlockLocation blktn : blockLocations) { System.out.println("length:"+blktn.getLength()); System.out.println("offset:"+blktn.getOffset()); System.out.println(Arrays.toString(blktn.getHosts())); } } } }
Map和Reduce 計算框架,編程模型 「分而治之」的思想, 分佈式並行計算
// step 1: Map Class /** * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO update paragram public static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub }
// step 2: Reduce Class /** * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub } }
// step 3: Driver ,component job, implements Tool public int run(String[] args) throws Exception { // 1: get configration Configuration configuration = getConf(); // 2: create Job Job job = Job.getInstance(configuration, this.getClass() .getSimpleName()); // run jar job.setJarByClass(this.getClass()); // 3: set job // input -> map -> reduce -> output // 3.1 input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); // 3.2: map job.setMapperClass(ModuleMapper.class); //TODO update paragram job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 3.3: reduce job.setReducerClass(ModuleReducer.class); //TODO job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4: output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // 4: submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; }
package com.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountDemo extends Configured implements Tool { /** * map 任務的定義 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 偏移量 LongWritable * VALUEIN 一行文本 Text * KEYOUT 單詞 Text * VALUEOUT 1 IntWritable * * map任務 * 將一行文本拆分紅單詞 * * */ public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value); //1. 單詞拆分 String[] vals = value.toString().split(" "); //2. 遍歷輸出 for (String val : vals) { keyOut.set(val); valueOut.set(1); context.write(keyOut, valueOut); System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut); } } } /** * * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 單詞 Text * VALUEIN 單詞次數的集合 list的元素 IntWritable * KEYOUT 單詞 Text * VALUEOUT 總次數 IntWritable * */ public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable valueOut = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.print("keyIn:"+key+"\t\t["); //1. 求次數綜合 int sum = 0; for (IntWritable value : values) { sum += value.get(); System.out.print(value+",\t"); } System.out.println("]"); //2. 輸出 valueOut.set(sum); context.write(key, valueOut); } } @Override public int run(String[] args) throws Exception { //1 設置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2. 設置map類和reduce類 job.setMapperClass(WCMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new WordCountDemo(), args); System.out.println(run==1?"成功":"失敗"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package com.join; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapJoin extends Configured implements Tool { public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> { HashMap<String, String> cacheMap = new HashMap<String, String>(); // 首相將小表讀入內存 // 該方法只在每次任務開始時加載一次 @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String path = "F:\\input\\join\\dept.log"; FileReader fr = new FileReader(path); BufferedReader br = new BufferedReader(fr); String line = null; while((line=br.readLine()) != null) { String[] vals = line.split("\t"); cacheMap.put(vals[0], vals[1]); } br.close(); fr.close(); } // map端根據兩張表的key進行合併 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); String deptno = cacheMap.get(vals[2]); String dname = cacheMap.get(deptno); context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1])); } } @Override public int run(String[] args) throws Exception { //1 設置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 設置map類和reduce job.setMapperClass(MJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //3 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new MapJoin(), args); System.out.println(run==1?"成功":"失敗"); } catch (Exception e) { e.printStackTrace(); } } }
package com.join; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReduceJoin extends Configured implements Tool { /* * 1 技術部 * 1002 rose 1 */ public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{ Text keyOut = new Text(); Text valueOut = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); if(vals.length == 2) { keyOut.set(vals[0]); valueOut.set(vals[1]); }else { keyOut.set(vals[2]); valueOut.set(vals[0]+"\t"+vals[1]); } context.write(keyOut, valueOut); } } /* * keyIn:1 * valueIn List{[1007 lily], [1002 rose], [1001 jack], [技術部]} */ // reduce端合併是依靠MapReduce shuffle過程當中將相同key的行放入同一臺機器 public static class RJReducer extends Reducer<Text, Text, Text, Text> { ArrayList<String> employees = new ArrayList<String>(); @Override protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String department = null; employees.clear(); //這裏要注意清空list for (Text tmp : valueIn) { String[] vals = tmp.toString().split("\t"); // 根據length判斷這是張什麼表 if(vals.length == 1) { department = vals[0]; }else if(vals.length == 2) { employees.add(tmp.toString()); } } for (String employee : employees) { context.write(keyIn, new Text(employee+"\t"+department)); } } } @Override public int run(String[] args) throws Exception { //1 設置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 設置map類和reduce job.setMapperClass(RJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(RJReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //3 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new ReduceJoin(), args); System.out.println(run==1?"成功":"失敗"); } catch (Exception e) { e.printStackTrace(); } } }
->installations->add->勾選本身安裝的maven ->user settings->選擇mave家目錄/conf/settings
添加環境變量 HADOOP_HOME=hadoop解壓目錄 在PATH環境變量中追加 %HADOOP_HOME%/bin;
hadoop -h
<!--關閉hdfs的文件權限控制--> <property> <name>dfs.permissions</name> <value>false</value> </property>
eclipse->windows->show views->other->輸入MapReduce->點擊map reduce locations
右擊->new hadoop locations
Map/Reduce Master
Mapreduce(V2) host:[hostname] port:8032 //resourcemanager 的默認端口號
DFS Master
DFS Master host:[hostname] port:8020
yarn jar pv.jar /input/2015082818 /output
yarn jar pv.jar 類的全限定名 /input/2015082818 /output
map() 輸出開始 到 reduce()輸入開始 此階段是shuffle
input -> map -> shuffle -> reduce -> output
map shuffle phase
reduce shuffle phase
partitioner - map
sorter - map & reduce
combiner: map phase局部聚合操做 不是全部的MapReduce程序均可以進行局部聚合的
compress:map phase的輸出數據壓縮 針對全部MapReduce程序均可以進行設置
group - reduce
全部操做都是針對map()輸出的<key, value>數據進行的
依據此MapReduce Job中Reduce Task個數進行分區決定map輸出的數據被哪一個reduce任務進行處理分析默認狀況下,依據key採用HashPartitioner
// 經過取餘將數據分配到哪一個reduce處理 HashPartitioner int getParitition(key, value, numreducetask) { return ( key.hashCode&Integer.maxValue)%numreducetask; }
將相同key的value值存入到list集合,造成新的key, list(value),將key/value對數據傳遞給reduce()函數進行處理。
最後將(key, list(value))傳給 reduce()
FileInputFormat.setMaxInputSplitSize(job, size); 設置切片最大值 FileInputFormat.setMinInputSplitSize(job, size); 設置切片最小值
FileInputFormat public List<InputSplit> getSplits(JobContext job){。。。} protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } // minSize<=maxSize<blockSize 提升併發 // minSize>blockSize 下降併發
job.setNumReduceTasks(2); HashParitioner 決定map輸出的類被哪一個reduce處理
package com.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 不用serializable * * 用Hadoop的Writable * */ public class Flow implements Writable { private long up; private long down; private long sum; public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } public void setSum(long sum) { this.sum = sum; } @Override public String toString() { return up + "\t" + down + "\t" + sum; } @Override public void write(DataOutput out) throws IOException { out.writeLong(up); out.writeLong(down); out.writeLong(sum); } @Override public void readFields(DataInput in) throws IOException { up = in.readLong(); down = in.readLong(); sum = in.readLong(); } }
public static class MyPartitioner extends Partitioner<Text, Flow> { @Override public int getPartition(Text key, Flow value, int numPartitions) { if(value.getSum()<1024) { return 0; }else if(value.getSum()<10*1024) { return 1; } return 2; } }
public class MyKey implements WritableComparable<MyKey> //要排序的類要實現WritableComparable接口 @Override public int compareTo(MyKey o) { long result = o.getSum() - this.getSum(); if(result>0) { return 1; }else if(result<0) { return -1; } return o.getPhone().compareTo(this.getPhone()); }
原始數據 hello world hello hadoop hello world hello java keyIn:hadoop [1, ] keyIn:hello [1, 1, 1, 1, ] keyIn:java [1, ] keyIn:world [1, 1, ] keyIn:hadoop [1, ] keyIn:hello [2, 2, ] keyIn:java [1, ] keyIn:world [1, 1, ]
重寫 RawComparator 方法合併key中相同字段
經過 job.setGroupingComparatorClass(Mygroup.class); 調用
public static class Mygroup implements RawComparator<Person> { @Override public int compare(Person o1, Person o2) { // TODO Auto-generated method stub return 0; } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
