Hadoop是一個由Apache基金會所開發的分佈式系統基礎架構,主要解決,海量數據的存儲和海量數據的分析計算問題。java
向namenode詢問是否須要checkpointnode
checkpoint觸發條件:web
- 定時時間到了
- edits中數據滿了
在$HADOOP_HOME/data/tmp/dfs/namesecondary/current
這個目錄中查看SecondaryNameNode目錄結構。
在主namenode發生故障時(假設沒有及時備份數據),能夠從SecondaryNameNode恢復數據:
方案一:將SecondaryNameNode中數據拷貝到namenode存儲數據的目錄。
方案二:使用-importCheckpoint選項啓動namenode守護進程,從而將SecondaryNameNode中數據拷貝到namenode目錄中。(極慢)數據庫
1.在namenode主機的$HADOOP_HOME/etc/hadoop
目錄下建立dfs.hosts文件,在該文件中添加全部主機名(包括新節點)e.g.apache
hadoop102 hadoop103 hadoop104 hadoop105
2.在namenode的hdfs-site.xml配置文件中增長dfs.hosts屬性瀏覽器
<property> <name>dfs.hosts</name> <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value> </property>
3.刷新namenode:hdfs dfsadmin -refreshNodes
4.更新resourcemanager節點:yarn rmadmin -refreshNodes
5.在namenode的slaves文件中增長新主機名稱
6.單獨命令啓動新的數據節點和節點管理器hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager
緩存
1.在namenode的/opt/module/hadoop-2.7.2/etc/hadoop目錄下建立dfs.hosts.exclude文件,向其中添加要退役的主機名稱
2.在namenode的hdfs-site.xml配置文件中增長dfs.hosts.exclude屬性安全
<property> <name>dfs.hosts.exclude</name> <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value> </property>
3.刷新namenode、刷新resourcemanagerhdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
4.在web瀏覽器端檢查,退役節點的狀態爲decommission in progress(退役中),說明數據節點正在複製塊到其餘節點。
5.等待退役節點狀態爲decommissioned(全部塊已經複製完成),中止該節點及節點資源管理器。hadoop-daemon.sh stop datanode
yarn-daemon.sh stop nodemanager
性能優化
PS:若是副本數是3,服役的節點小於等於3,是不能退役成功的,須要修改副本數後才能退役。
6.從include文件中刪除退役節點,再運行刷新節點的命令
7.從namenode的slave文件中刪除退役節點服務器
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(各類校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。因此,hadoop本身開發了一套序列化機制(Writable),精簡、高效。
boolean
--> BooleanWritable
byte
--> ByteWritable
int
--> IntWritable
float
--> FloatWritable
long
--> LongWritable
double
--> DoubleWritable
string
--> Text
map
--> MapWritable
array
--> ArrayWritable
maptask調用InputFormat,InputFormat又調用RecordReader從輸入文件中解析出一個個K/V。
將解析出的K/V交給客戶端map()方法處理,併產生新的K/V。
當map()方法處理完數據後,通常會調用OutputCollector.collect()輸出結果,在該函數內部調用Partitioner對K/V進行分區,且根據K進行分區內排序,並寫入一個環形緩衝區中。
當環形緩衝區達到80%時,會將數據寫到本地磁盤上生成一個臨時文件。
將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做。
當全部數據處理完成後,MapTask對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。
編寫一個類繼承FileInputFormat
public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 自定義recordreader WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader; } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
自定義一個RecordReader
public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit split; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) split.getLength()]; Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); FSDataInputStream fis = null; try { fis = fs.open(path); IOUtils.readFully(fis, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(fis); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1 : 0; } @Override public void close() throws IOException { } }
自定義RecordReader重點是nextKeyValue()
方法,它定義瞭如何封裝向maptask輸入的鍵值對,本例中是將每一個文件的全部內容做爲value輸入到maptask中。
// 若是不設置InputFormat,它默認用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
通過以上設置,可有效減小切片數
MapReduce保證每一個reducer的輸入都是按鍵有序排列的,系統執行排序的過程(即將map輸出做爲輸入傳給reducer)稱爲shuffle。
public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
默認採用的是org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
,當中的getPartition()
方法是根據key的hashCode對reduceTasks個數取模獲得的。
1.繼承Partitioner,重寫getPartition()方法
public class MyPartitioner extends Partitioner<K, V> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { int partition=xxx; if (exp0) { partition = 0; }else if (exp1) { partition = 1; }else if (exp2) { partition = 2; }else if (exp3) { partition = 3; } return partition; } }
2.在job驅動中,設置自定義partitioner
job.setPartitionerClass(MyPartitioner.class)
3.根據自定義partitioner的邏輯設置相應數量的reduce task
job.setNumReduceTasks(5);
PS:
若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會產生異常;
若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000。
bean對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序
@Override public int compareTo(Bean o) { // 倒序排列,從大到小 return this.xxx > o.getXxx() ? -1 : 1; }
對reduce階段的數據根據某一個或幾個字段進行分組。
public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { // 記得此處設置true,不然會報空指針異常 super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; return abean.getOrder_id().compareTo(bbean.getOrder_id()); } }
combiner是MR程序中Mapper和Reducer以外的一種組件,combiner組件的父類就是Reducer,combiner和reducer的區別在於運行的位置:Combiner是在每個maptask所在的節點運行,Reducer是接收全局全部Mapper的輸出結果。combiner的意義就是對每個maptask的輸出進行局部彙總,以減少網絡傳輸量。
1.自定義一個combiner繼承Reducer,重寫reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }
2.在job驅動類中設置
job.setCombinerClass(WordcountCombiner.class);
combiner可以應用的前提是不能影響最終的業務邏輯,並且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來。
從控制檯觀察使用combiner先後變化:
若是是多張表的操做都是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜。
在map端緩存多張表,提早處理業務邏輯,這樣增長map端業務,減小reduce端數據的壓力,儘量的減小數據傾斜。
job.addCacheFile(new URI("file:/e:/cache/pd.txt")); // 緩存普通文件到task運行節點
1.自定義一個outputformat繼承org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 建立一個RecordWriter return new FilterRecordWriter(job); } }
2.具體的寫數據RecordWriter
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 獲取文件系統 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 建立輸出文件路徑 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 建立輸出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判斷是否包含「atguigu」輸出到不一樣文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 關閉資源 if (atguiguOut != null) { atguiguOut.close(); } if (otherOut != null) { otherOut.close(); } } }
// 開啓map端輸出壓縮 configuration.setBoolean("mapreduce.map.output.compress", true); // 設置map端輸出壓縮方式 configuration.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);
// 設置reduce端輸出壓縮開啓 FileOutputFormat.setCompressOutput(job, true); // 設置壓縮的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
ReduceTask從各個MapTask遠程拷貝一片數據,若是某一片數據大小超過閾值,則寫到磁盤上,不然直接放在內存中。
在遠程拷貝數據時,reducetask後臺啓動了兩個線程對內存和磁盤上的文件進行合併,以防止內存或硬盤使用過多。
用戶編寫的reduce()方法輸入數據是按照key進行彙集的,爲了將key相同的數據聚在一塊兒,Hadoop採用了基於排序的策略。因爲各個MapTask已經實現對本身的處理結果進行了局部排序,所以,ReduceTask只需對全部數據進行一次歸併排序便可。
reduce()函數將計算結果寫到HDFS上。
用戶在mr應用程序中配置能夠生效:
mapreduce.map.memory.mb
一個Map Task可以使用的資源上限(單位:MB),默認爲1024。若是Map Task實際使用的資源量超過該值,則會被強制殺死。mapreduce.reduce.memory.mb
一個Reduce Task可以使用的資源上限(單位:MB),默認爲1024。若是Reduce Task實際使用的資源量超過該值,則會被強制殺死。mapreduce.map.java.opts
Map Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc" (@taskid@會被Hadoop框架自動換爲相應的taskid), 默認值: ""mapreduce.reduce.java.opts
Reduce Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc", 默認值: ""mapreduce.map.cpu.vcores
每一個Map task可以使用的最多cpu core數目, 默認值: 1mapreduce.reduce.cpu.vcores
每一個Reduce task可以使用的最多cpu core數目, 默認值: 1應該在yarn啓動以前就配置在服務器的配置文件中才能生效:
yarn.scheduler.minimum-allocation-mb 1024
給應用程序container分配的最小內存yarn.scheduler.maximum-allocation-mb 8192
給應用程序container分配的最大內存shuffle性能優化的關鍵參數,應在yarn啓動以前就配置好:
mapreduce.task.io.sort.mb 100
shuffle的環形緩衝區大小,默認100mmapreduce.map.sort.spill.percent 0.8
環形緩衝區溢出的閾值,默認80%mapreduce.map.maxattempts
每一個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。mapreduce.reduce.maxattempts
每一個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。mapreduce.map.failures.maxpercent
當失敗的Map Task失敗比例超過該值爲,整個做業則失敗,默認值爲0. 若是你的應用程序容許丟棄部分輸入數據,則該該值設爲一個大於0的值,好比5,表示若是有低於5%的Map Task失敗(若是一個Map Task重試次數超過mapreduce.map.maxattempts,則認爲這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個做業扔認爲成功。mapreduce.reduce.failures.maxpercent
當失敗的Reduce Task失敗比例超過該值爲,整個做業則失敗,默認值爲0。mapreduce.task.timeout
Task超時時間,常常須要設置的一個參數,該參數表達的意思爲:若是一個task在必定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該task處於block狀態,多是卡住了,也許永遠會卡主,爲了防止由於用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。若是你的程序對每條輸入數據的處理時間過長(好比會訪問數據庫,經過網絡拉取數據等),建議將該參數調大,該參數太小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。Yarn是一個資源調度平臺,負責爲運算程序提供服務器運算資源,至關於一個分佈式的操做系統平臺,而mapreduce等運算程序則至關於運行於操做系統之上的應用程序。