MapReduce分佈式計算框架簡稱MR,比較適合作數據離線計算;其他計算框架如spark 基於內存的迭代式計算,適合作實時計算框架;Storm適合作流計算。java
分佈式離線計算框架程序員
主要適用於大批量的集羣任務,因爲是批量執行,故時效性偏低。面試
原生支持 Java 語言開發 MapReduce ,其它語言須要使用到 Hadoop Streaming 來開發。算法
Spark 是專爲大規模數據處理而設計的快速通用的計算引擎,其是基於內存的迭代式計算。apache
Spark 保留了MapReduce 的優勢,並且在時效性上有了很大提升,從而對須要迭代計算和有較高時效性要求的系統提供了很好的支持。網絡
開發人員能夠經過Java、Scala或者Python等語言進行數據分析做業編寫,並使用超過80種高級運算符。架構
Spark與HDFS全面兼容,同時還能與其它Hadoop組件—包括YARN以及HBase並行協做。app
Spark能夠被用於處理多種做業類型,好比實時數據分析、機器學習與圖形處理。多用於能容忍小延時的推薦與計算系統。負載均衡
Storm是一個分佈式的、可靠的、容錯的流式計算框架。框架
Storm 一開始就是爲實時處理設計,所以在實時分析/性能監測等須要高時效性的領域普遍採用。
Storm在理論上支持全部語言,只須要少許代碼便可完成適配。
Storm把集羣的狀態存在Zookeeper或者本地磁盤,因此後臺進程都是無狀態的(不須要保存本身的狀態,都在zookeeper上),能夠在不影響系統健康運行的同時失敗或重啓。
Storm可應用於--數據流處理、持續計算(持續地向客戶端發送數據,它們能夠實時的更新以及展示數據,好比網站指標)、分佈式遠程過程調用(輕鬆地並行化CPU密集型操做)。
參考http://blog.51cto.com/ijiajia/1958741。
核心思想:移動計算而非移動數據;通俗說就是把預先寫好的算法在不一樣的節點運行,而數據不動。
步驟:
input:hdfs 存儲的數據做爲mr的輸入,也稱爲原始數據,數據比較大,能夠是視頻 圖片 文檔等。。。
split: 切片,對輸入數據進行分割 切片,分發到不一樣的節點計算
map: 映射 也能夠叫建模,對數據切片並行的進行建模,有多少個切片就有多少個map進程。
SM:sort&merge 合併排序,對map的而結果進行合併排序操做
shuff:對相同的key值的數據移動到同一個block中
redu:對shuff的結果計算,數據清洗和處理,
計算框架shuffer:
partiton:分區算法,能夠由程序員自定義也可使用系統默認的哈希模運算。每一個對象取哈希值而後模reducer進程數獲得結果,按照結果規則進行分區。分區是爲了把mapper數據進行從新分配,達到負載均衡目的,解決數據傾斜問題。數據傾斜通常發生在reducer階段,mapper不會發生數據傾斜問題。默認的partiton算法有可能發生數據傾斜問題。
sort:排序,系統默認的排序是按照對象的ascii碼排序,也能夠說是按照字典排序。
merge:合併,相同的K進行合併,若有combiner框架則按照框架規則合併,沒有則按照系統默認的合併規則
最後把處理好的數據固化到磁盤,把數據拷貝到reducer節點,按照分區不一樣拷貝到不一樣的的reducer進程。而後按照相同的K進行合併,這些數值有可能來自於不一樣的mapper進程。
partiton,sort和combiner在面試中常常會被問到。
若是客戶端設置了combiner,那麼將會使用combiner對數據合併,將相同的K合併,減小數據量(後面的reducer task 從task tracker 拷貝數據。)。拷貝過來的數據先存放在內存中,在內存合併的時候會對數據作排序
當整個maptask結束後在對磁盤中的這個maptask產生的臨時文件作合併。
MR配置:
主節點 jobtracker配置:
conf/mapred-site.xml
<property> <name>mapred.job.tracker</name> <VALUE>localhost:9001</VALUE> </property>
從tasktracker 默認在DN節點,能夠不用配置。
mapper函數:封裝數據,構造map<Key,Value>鍵-值對。
Key 文本行號,hadoop自動生成。
Value 每一行文件內容。
context 封裝map<Key,Value>輸出給reduce函數。
reducer函數:接受mapper函數輸出的map<Key,Value>值做爲輸入值,構造context輸出。
* 1.定義做業
* 2.設置Job主函數
* 3.定義Job輸入,輸出路徑
* 4.設置mapper,reducer函數,Job在運行的時候會主動去加載
* 5.設置輸出Key,Value格式
程序打包成*.jar格式
執行 export HADOOP_CLASSPATH=../../*.jar
hadoop com.crbc.TimpJob input output
例如:hadoop -Xmx1024m com.crbc.TimpJob file:///D:\timpfile\*.gz D:\timpfile\out\
2.集羣模式
上傳*.jar到集羣主機
將要處理的文件上傳到hdfs文件系統
hadoop jar *.jar /input /output
Mapper---->Reducer------->Job
(構造) (計算) (運行)
mapper類:
package com.crbc.www; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * Mapper * LongWritable :輸入參數 ,內部定義行號 * Text :輸入參數,文件value值 * Text :輸出參數,輸出給reduce函數處理的 值 * IntWritable:輸出參數,輸出給reduce函數處理的值 * */ public class TimpMapper extends Mapper<LongWritable, Text,Text, IntWritable> { /* * 重寫map函數 * LongWritable :內部定義行號 * Text :文件value值 * context:輸出函數, */ protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line=value.toString(); String year=line.substring(15,19); int airt; if(line.charAt(87)=='+') { airt=Integer.parseInt(line.substring(88,92)); }else { airt=Integer.parseInt(line.substring(87,92)); } String quality=line.substring(92,93); if(airt !=9999 && quality.matches("[01459]")) { //寫上下文,maper函數輸出做爲reduce函數的輸入值,封裝map<Key,Values> context.write(new Text(year), new IntWritable(airt)); } } }
reducer類:
package com.crbc.www; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * Reducer函數 * * */ public class TimpReduces extends Reducer<Text, IntWritable, Text,IntWritable> { /* * Text :輸入函數, * IntWritable:輸入函數,可迭代 * * context:輸出函數 */ protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException { int maxValues=Integer.MIN_VALUE; for(IntWritable values:value) { maxValues = Math.max(maxValues, values.get()); } //寫上下文,封裝map<Key,Values>輸出 context.write(key,new IntWritable(maxValues)); } }
Job類:
package com.crbc.www; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /* * 1.定義做業 * 2.設置Job主函數 * 3.定義Job輸入,輸出路徑 * 4.設置mapper,reducer函數,Job在運行的時候會主動去加載 * 5.設置輸出Key,Value格式 */ public class TimpJob { public static void main(String[] args) throws Exception { //定義一個做業 Job job = new Job(); //設置做業主函數 job.setJarByClass(TimpJob.class); //設置做業名稱,便於調試 job.setJobName("MapperReducer"); //設置job輸入參數,輸入函數能夠是一個文件路徑 FileInputFormat.addInputPath(job,new Path(args[0]) ); //設置job輸出參數,輸出函數可使一個路徑,把計算計算結果輸出到此路徑下。 //注意此路徑是函數建立的,不能跟現有的重名 FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置Mapper函數 job.setMapperClass(TimpMapper.class); //設置Reduce函數 job.setReducerClass(TimpReduces.class); //設置輸出key格式 job.setOutputKeyClass(Text.class); //設置輸出Value格式 job.setOutputValueClass(IntWritable.class); //等待做業完成 job.waitForCompletion(true); } }