MR計算框架學習筆記-持續更新

MapReduce分佈式計算框架簡稱MR,比較適合作數據離線計算;其他計算框架如spark 基於內存的迭代式計算,適合作實時計算框架;Storm適合作流計算。java

MapReduce

  • 分佈式離線計算框架程序員

  • 主要適用於大批量的集羣任務,因爲是批量執行,故時效性偏低。面試

  • 原生支持 Java 語言開發 MapReduce ,其它語言須要使用到 Hadoop Streaming 來開發。算法

Spark

  • Spark 是專爲大規模數據處理而設計的快速通用的計算引擎,其是基於內存的迭代式計算。apache

  • Spark 保留了MapReduce 的優勢,並且在時效性上有了很大提升,從而對須要迭代計算和有較高時效性要求的系統提供了很好的支持。網絡

  • 開發人員能夠經過Java、Scala或者Python等語言進行數據分析做業編寫,並使用超過80種高級運算符。架構

  • Spark與HDFS全面兼容,同時還能與其它Hadoop組件—包括YARN以及HBase並行協做。app

  • Spark能夠被用於處理多種做業類型,好比實時數據分析、機器學習與圖形處理。多用於能容忍小延時的推薦與計算系統。負載均衡

Storm

  • Storm是一個分佈式的、可靠的、容錯的流式計算框架。框架

  • Storm 一開始就是爲實時處理設計,所以在實時分析/性能監測等須要高時效性的領域普遍採用。

  • Storm在理論上支持全部語言,只須要少許代碼便可完成適配。

  • Storm把集羣的狀態存在Zookeeper或者本地磁盤,因此後臺進程都是無狀態的(不須要保存本身的狀態,都在zookeeper上),能夠在不影響系統健康運行的同時失敗或重啓。

  • Storm可應用於--數據流處理、持續計算(持續地向客戶端發送數據,它們能夠實時的更新以及展示數據,好比網站指標)、分佈式遠程過程調用(輕鬆地並行化CPU密集型操做)。

參考http://blog.51cto.com/ijiajia/1958741。

核心思想:移動計算而非移動數據;通俗說就是把預先寫好的算法在不一樣的節點運行,而數據不動。

步驟:

input:hdfs 存儲的數據做爲mr的輸入,也稱爲原始數據,數據比較大,能夠是視頻 圖片 文檔等。。。

split: 切片,對輸入數據進行分割 切片,分發到不一樣的節點計算

map: 映射 也能夠叫建模,對數據切片並行的進行建模,有多少個切片就有多少個map進程。

SM:sort&merge 合併排序,對map的而結果進行合併排序操做

shuff:對相同的key值的數據移動到同一個block中

redu:對shuff的結果計算,數據清洗和處理,

計算框架shuffer:

  • mapeper和reducer中間步驟
  • 把mapper輸出結果按照某種k-v切分組合,數據處理以後輸出到reducer
  • 簡化reducer過程

partiton:分區算法,能夠由程序員自定義也可使用系統默認的哈希模運算。每一個對象取哈希值而後模reducer進程數獲得結果,按照結果規則進行分區。分區是爲了把mapper數據進行從新分配,達到負載均衡目的,解決數據傾斜問題。數據傾斜通常發生在reducer階段,mapper不會發生數據傾斜問題。默認的partiton算法有可能發生數據傾斜問題。

sort:排序,系統默認的排序是按照對象的ascii碼排序,也能夠說是按照字典排序。

merge:合併,相同的K進行合併,若有combiner框架則按照框架規則合併,沒有則按照系統默認的合併規則

最後把處理好的數據固化到磁盤,把數據拷貝到reducer節點,按照分區不一樣拷貝到不一樣的的reducer進程。而後按照相同的K進行合併,這些數值有可能來自於不一樣的mapper進程。

partiton,sort和combiner在面試中常常會被問到。

若是客戶端設置了combiner,那麼將會使用combiner對數據合併,將相同的K合併,減小數據量(後面的reducer task 從task tracker 拷貝數據。)。拷貝過來的數據先存放在內存中,在內存合併的時候會對數據作排序

當整個maptask結束後在對磁盤中的這個maptask產生的臨時文件作合併。

MR架構:主從架構  (1.0)  

主jobtracker:

  • 負責調度分配每個子任務運行在tasktracker上,若是發現有失敗的task就從新分配任務到其餘節點,一個集羣只有一個jobtracker節點,通常運行在master節點上。

從tasktracker:

  • tasktracker主動與jobtracker通訊,接收做業,負責執行每個任務,爲了減小    網絡帶寬tasktracker最好運行在hdfs上的DN節點上。

MR配置: 

主節點 jobtracker配置:

conf/mapred-site.xml

<property>
            <name>mapred.job.tracker</name>
            <VALUE>localhost:9001</VALUE>
        </property>

從tasktracker 默認在DN節點,能夠不用配置。

 

 

MR 簡單實現:

mapper函數:封裝數據,構造map<Key,Value>鍵-值對。
Key 文本行號,hadoop自動生成。
Value 每一行文件內容。
context 封裝map<Key,Value>輸出給reduce函數。


reducer函數:接受mapper函數輸出的map<Key,Value>值做爲輸入值,構造context輸出。

Job函數:

  •  * 1.定義做業

  •  * 2.設置Job主函數

  •  * 3.定義Job輸入,輸出路徑

  •  * 4.設置mapper,reducer函數,Job在運行的時候會主動去加載

  •  * 5.設置輸出Key,Value格式

 
 調用方法:

 

  1. 單節點

 程序打包成*.jar格式
 執行 export HADOOP_CLASSPATH=../../*.jar
    hadoop com.crbc.TimpJob input   output 
    例如:hadoop -Xmx1024m  com.crbc.TimpJob file:///D:\timpfile\*.gz  D:\timpfile\out\   
 2.集羣模式
       上傳*.jar到集羣主機
       將要處理的文件上傳到hdfs文件系統
   hadoop jar *.jar /input /output 

Mapper---->Reducer------->Job
(構造)    (計算)              (運行)

mapper類:

package com.crbc.www;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
 * Mapper 
 *  LongWritable :輸入參數 ,內部定義行號
 *  Text :輸入參數,文件value值
 *  Text :輸出參數,輸出給reduce函數處理的 值
 *  IntWritable:輸出參數,輸出給reduce函數處理的值
 * 
 */
public class TimpMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
	 
	/*
	 * 重寫map函數
	  *  LongWritable :內部定義行號
	  *  Text :文件value值 
	  *  context:輸出函數,
	 */
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		String line=value.toString();
		String year=line.substring(15,19);
		int airt;
		if(line.charAt(87)=='+') {
			airt=Integer.parseInt(line.substring(88,92));
		}else {
			airt=Integer.parseInt(line.substring(87,92));
		}
		String quality=line.substring(92,93);
		if(airt !=9999 && quality.matches("[01459]")) {
			//寫上下文,maper函數輸出做爲reduce函數的輸入值,封裝map<Key,Values>
			context.write(new Text(year), new IntWritable(airt));
		}
	}
 
}

reducer類:

package com.crbc.www;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * Reducer函數
 *  
 * 
 */
public class TimpReduces extends Reducer<Text, IntWritable, Text,IntWritable> {
 
	/*
	 * Text :輸入函數,
	 * IntWritable:輸入函數,可迭代
	 * 
	 * context:輸出函數
	 */
protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException {
		int maxValues=Integer.MIN_VALUE;
		for(IntWritable values:value) {
			maxValues = Math.max(maxValues, values.get());
		}
		//寫上下文,封裝map<Key,Values>輸出
		context.write(key,new IntWritable(maxValues));
}

}

Job類:

package com.crbc.www;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * 1.定義做業
 * 2.設置Job主函數
 * 3.定義Job輸入,輸出路徑
 * 4.設置mapper,reducer函數,Job在運行的時候會主動去加載
 * 5.設置輸出Key,Value格式
 */
public class TimpJob {

	public static void main(String[] args) throws Exception {
		
	 //定義一個做業
	 Job job = new Job();
	 
	 //設置做業主函數
	 job.setJarByClass(TimpJob.class);
	 
	 //設置做業名稱,便於調試
	 job.setJobName("MapperReducer");
	 
	 //設置job輸入參數,輸入函數能夠是一個文件路徑
	 FileInputFormat.addInputPath(job,new Path(args[0]) );
	 
	 //設置job輸出參數,輸出函數可使一個路徑,把計算計算結果輸出到此路徑下。
	 //注意此路徑是函數建立的,不能跟現有的重名
	 FileOutputFormat.setOutputPath(job, new Path(args[1]));
	 
	 //設置Mapper函數
     job.setMapperClass(TimpMapper.class);
     
     //設置Reduce函數
     job.setReducerClass(TimpReduces.class);
     
     //設置輸出key格式
     job.setOutputKeyClass(Text.class);
     
     //設置輸出Value格式
     job.setOutputValueClass(IntWritable.class);
     
     //等待做業完成
     job.waitForCompletion(true);
	}

}
相關文章
相關標籤/搜索