上ftp://ftp.ncdc.noaa.gov下載,下載下來的目錄結構:
每下個文件夾存放了每年全部氣象臺的氣象數據:
每個文件就是一個氣象站一年的數據
將上面目錄上傳到Linux中:
編寫如下Shell腳本,將每年的全部不現氣象站所產生的文件合併成一個文件,即每一年只有一個文件,並上傳到Hadoop系統中:
#!/bin/bash
#將Hadoop權威指南氣像數據按每年合併成一個文件,並上傳到Hadoop系統中
rm -rf /root/ncdc/all/*
/root/hadoop-1.2.1/bin/hadoop fs -rm -r /ncdc/all/*
#這裏的/*/*中第一個*表示年份文件夾,其下面存放的就是每一年不一樣氣象站的氣象文件
for file in /root/ncdc/raw/*/*
do
echo "追加$file.."
path=`dirname $file`
target=${path##*/}
gunzip -c $file >> /root/ncdc/all/$target.all
done
for file in /root/ncdc/all/*
do
echo "上傳$file.."
/root/hadoop-1.2.1/bin/hadoop fs -put $file /ncdc/all
done
腳本運行完後,HDFS上的文件以下:
每一個Mapper都須要繼承org.apache.hadoop.mapreduce.Mapper類,需重寫其map方法:
protectedvoid map(KEYIN key, VALUEIN value, Context context)
每一個Reducer都須要繼承org.apache.hadoop.mapreduce.Reducer類,需重寫其
protectedvoid reduce(KEYIN key, Iterable<VALUEIN> values, Context context )
publicclassMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT//Map父類中的方法定義以下
/**
* Called once at the beginning of the task.在任務開始執行前會執行一次
*/
protectedvoid setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.會被run()方法循環調用,每對鍵值都會被調用一次
*/
@SuppressWarnings("unchecked")
protectedvoid map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);//map()方法提供了默認實現,即直接輸出,不作處理
}
/**
* Called once at the end of the task.任務結束後會調用一次
*/
protectedvoid cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.map()方法實質上就是被run()循環調用的,咱們能夠重寫這個方法,加一些處理邏輯
*/
publicvoid run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {//每對鍵值對都會調用一次map()方法
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
publicclassReducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
/**
* Called once at the start of the task.在任務開始執行前會執行一次
*/
protectedvoid setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.reduce()方法會被run()循環調用
*/
@SuppressWarnings("unchecked")
protectedvoid reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);//提供了默認實現,不作處理直接輸出
}
}
/**
* Called once at the end of the task.任務結束後會調用一次
*/
protectedvoid cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
publicvoid run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {//每鍵值對都會調用一次reduce()
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iterinstanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
Reducer的reduce方法每執行完一次,就會產生一個結果文件
reduce方法的輸入類型必須匹配map方法的輸出類型
map的輸出文件名爲 part-m-nnnnn ,而reduce的輸出文件名爲 part-r-nnnnn (nnnnn爲分區號,即該文件存放的是哪一個分區的數據,從0開始),其中part文件名能夠修改
publicclass Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {,Mapper類有4個範型參數:
KEYIN:Map Key輸入類型,若是輸入是文本文件,固定爲LongWritable,表示每一行文本所在文件的起始位置,從0開始(即第一行起始爲位置爲0)
publicvoid map(Object key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("key=" + key + "; value=" + value);
[root@hadoop-master /root]# hadoop fs -get /wordcount/input/wordcount /root/wordcount
換行顯示 $('\n'),Tab字符顯示^I,^M 是'\r', 回車符
[root@hadoop-master /root]# cat -A /root/wordcount
hello world^M$
hello hadoop
VALUEIN:Map value輸入類型,若是輸入是文本文件,則通常爲Text,表示文本文件中讀取到的一行內容(注:Map是以行爲單位進行處理的,即每跑一次Map,即處理一行文本,即輸入也是以行爲單位進行輸入的)
KEYOUT, VALUEOUT:爲Reduce輸出Key與輸出Value的類型
//文本文件是按照一行一行傳輸到Mapper中的
publicclass MaxTemperatureMapper
extends Mapper<LongWritable/*輸入鍵類型:行的起始位置,從0開始*/, Text/*輸入值類型:爲文本的一行內容*/, Text/*輸出鍵類型:年份*/, IntWritable/*輸出值類型:氣溫*/> {
privatestaticfinalintMISSING = 9999;
@Override
publicvoid map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);//取年份
int airTemperature;
if (line.charAt(87) == '+') { //若是溫度值前有加號時,去掉,由於parseInt不支持加號
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);//空氣質量
//若是是有效天氣,則輸出
if (airTemperature != MISSING && quality.matches("[01459]")) {
//每執行一次map方法,可能會輸出多個鍵值對,但這裏只輸出一次,這些輸出合併後傳遞給reduce做用輸入
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
publicclass MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
//reduce的輸入即爲Map的輸出,這裏的輸入值爲一個集合,Map輸出後會將相同Key的值合併成一個數組後
//再傳遞給reduce,因此值類型爲Iterable
publicvoid reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
//write出去的結果會寫入到輸出結果文件
context.write(key, new IntWritable(maxValue));
}
}
publicclass MaxTemperature {
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "weather");
// 根據設置的calss找到它所在的JAR任務包,而不須要明確指定JAR文件名
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
//設置map與reduce的輸出類型,通常它們的輸出類型都相同,若是不一樣,則map能夠使用setMapOutputKeyClass、setMapOutputValueClass來設置
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// addInputPath除了支持文件、目錄,還能夠使用文件通匹符?
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1901.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1902.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1903.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1904.all"));
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1905.all"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MapReduce邏輯數據流:
./hadoop jar /root/ncdc/weather.jar ch02.MaxTemperature
若是weather.jar包裏的MANIFEST.MF 文件裏指定了Main Class:
則運行時能夠不用指定主類:
./hadoop jar /root/ncdc/weather.jar
hadoop2裏能夠這樣執行:
./yarn jar /root/ncdc/weather.jar
若是在運行前指定了export HADOOP_CLASSPATH=/root/ncdc/weather.jar,若是設置了HADOOP_CLASSPATH應用程序類路徑環境變量,則能夠直接運行:
./hadoop MaxTemperature
./yarn MaxTemperature
以上都沒有寫輸入輸出文件夾,由於應用程序啓動類裏寫了
map是移動算法而不是數據。在集羣上,map任務(算法代碼)會移動到數據節點Datanode(計算數據在哪就移動到哪臺數據節點上),但reduce過程通常不能避免數據的移動(即不具有本地化數據的優點),單個reduce任務的輸入一般來自於全部mapper的輸出,所以map的輸出會傳輸到運行reduce任務的節點上,數據在reduce端合併,而後執行用戶自定義的reduce方法
reduce任務的完整數據流:
虛線表示節點,虛線箭頭表示節點內部數據傳輸,實線箭頭表示不一樣節點間的數據傳輸
有時,map任務(程序)所須要的三臺機(假設配置的副本數據爲3)正在處理其餘的任務時,則Jobtracker就會在這三份副本所在機器的同一機架上找一臺空親的機器,這樣數據只會在同一機架上的不一樣機器上進行傳輸,這樣比起在不一樣機架之間的傳輸效率要高
數據與map程序可能在同一機器上,可能在同一機架上的不一樣機器上,還有多是在不一樣機架上的不一樣機器上,即數據與map程序分佈狀況有如下三種:
a(本地數據):同一機器,b(本地機架):同一機架上不一樣機器,c(跨機架):不一樣機架上不一樣機器。顯然a這種狀況下,執行效率是最高的
從上圖來看,應該儘可能讓數據與map任務程序在一機器上,這就是爲何分片最大的大小與HDFS塊大小相同,由於若是分片跨越多個數據塊時,而這些塊又不在同一機器上時,就須要將其餘的塊傳輸到map任務所在節點上,這本地數據相比,這種效率低
爲了不計算時不移動數據,TaskTracker是跑在DataName上的
reduce的數量並非由輸入數據大小決定的,而是能夠單獨指定的
若是一個任務有不少個reduce任務,則每一個map任務就須要對輸出數據進行分區partition處理,即輸入數據交給哪一個reduce進行處理。每一個reduce須要創建一個分區,每一個分區就對應一個reduce,默認的分區算法是根據輸出的鍵哈希法:Key的哈希值 MOD Reduce數量),等到分區號,分區號 小於等於 Reduce數量的整數,從0開始。好比有3個reduce任務,則會分紅三個分區。
分區算法也是能夠自定義的
在map與reduce之間,還有一個shuffle過程:包括分區、排序、合併
多reduce任務數據流:
一個Map輸出數據可能輸出到不一樣的reduce,一個reduce的輸入也可能來自不一樣的map輸出
一個做業能夠沒有reduce任務,即無shuffle過程
Hadoop將做業分紅若干個小任務進行執行,其中包括兩類任務:map任務與reduce任務。
有兩類節點控制着任務的執行:一個JobTracker,與若干TaskTracker,JobTracker至關於NameNode的,是用來管理、調度TaskTracker,TaskTracker至關於DataName,須要將任務執行狀態報告給JobTracker。
Hadoop將MapReduce的輸入數據劃分紅等長的小數據塊,稱爲輸入分片——input split。
每一個分片構建一個map任務,一個map任務就是咱們繼承Mapper並重寫的map方法
數據分片,能夠多個map任務進行併發處理,這樣就會縮短整個計算時間,而且分片能夠很好的解決負載均衡問題,分片越細(小),則負載均衡越高,但分片過小須要建造不少的小的任務,這樣可能會影響整個執行時間,因此,一個合理的分片大小爲HDFS塊的大小,默認爲64M
map任務將其輸出結果直接寫到本地硬盤上,而不是HDFS中,這是由於map任務輸出的是中間結果,該輸出傳遞給reduce任務處理後,就能夠刪除了,因此沒有必要存儲在HDFS上
能夠爲map輸出指定一個combiner(就像map經過分區輸出到reduce同樣),combiner函數的輸出做爲reduce的輸入。
combiner屬於優化,沒法肯定map輸出要調用combiner多少次,有多是0、1、屢次,但無論調用多少次,reduce的輸出結果都是同樣的
假設1950年的氣象數據很大,map前被分紅了兩片,這樣1950的數據就會由兩個map任務去執行,假設第一個map輸出爲:
(1950, 0)
(1950, 20)
(1950, 10)
第二個map任務輸出爲:
(1950, 25)
(1950, 15)
若是在沒有使用combiner時,reducer的輸入會是這樣的:(1950, [0, 20, 10, 25, 15]),最後輸入結果爲:(1950, 25);爲了減小map的數據輸出,這裏能夠使用combiner函數對每一個map的輸出結果進行查找最高氣溫(第一個map任務最高爲20,第二個map任務最高爲25),這樣一來,最後傳遞給reducer的輸入數據爲:(1950, [20, 25]),最後的計算結果也是(1950, 25),這一過程即爲:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
上面是找最高氣溫,並非全部業務需求都具備此特性,如求平均氣溫時,就不適用combiner,如:
mean(0, 20, 10, 25, 15) = 14
但:
mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15
combiner與reducer的計算邏輯是同樣的,因此不須要重定義combiner類(若是輸入類型與reducer不一樣,則須要重定義一個,但輸入類型必定相同),而是在Job啓動內中經過job.setCombinerClass(MaxTemperatureReducer.class);便可,即combiner與reducer是同一實現類
publicclass MaxTemperatureWithCombiner {
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "weather");
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/all/1950.all"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在Map端,用戶自定義實現的Combine優化機制類Combiner在執行Map端任務的節點自己運行,至關於對map函數的輸出作了一次reduce。使用Combine機制的意義就在於使Map端輸出更緊湊,使得寫到本地磁盤和傳給Reduce端的數據更少
Combiner一般被看做是一個Map端的本地reduce函數的實現類Reducer
選用Combine機制下的Combiner雖然減小了IO,可是等於多作了一次reduce,因此應該查看做業日誌來判斷combine函數的輸出記錄數是否明顯少於輸入記錄的數量,以肯定這種減小和花費額外的時間來運行Combiner相比是否值得
Combine優化機制執行時機
⑴ Map端spill的時候
在Map端內存緩衝區進行溢寫的時候,數據會被劃分紅相應分區,後臺線程在每一個partition內按鍵進行內排序。這時若是指定了Combiner,而且溢寫次數最少爲 3(min.num.spills.for.combine屬性的取值)時,Combiner就會在排序後輸出文件寫到磁盤以前運行。 ⑵ Map端merge的時候
在Map端寫磁盤完畢前,這些中間的輸出文件會合併成一個已分區且已排序的輸出文件,按partition循環處理全部文件,合併會分屢次,這個過程也會伴隨着Combiner的運行。
⑶ Reduce端merge的時候
從Map端複製過來數據後,Reduce端在進行merge合併數據時也會調用Combiner來壓縮數據。
Combine優化機制運行條件
⑴ 知足交換和結合律[10]
結合律:
(1)+(2+3)+(4+5+6)==(1+2)+(3+4)+(5)+(6)== ...
交換律:
1+2+3+4+5+6==2+4+6+1+2+3== ...
應用程序在知足如上的交換律和結合律的狀況下,combine函數的執行纔是正確的,由於求平均值問題是不知足結合律和交換律的,因此這類問題不能運用Combine優化機制來求解。
例如:mean(10,20,30,40,50)=30
但mean(mean(10,20),mean(30,40,50))=22.5
這時在求平均氣溫等相似問題的應用程序中使用Combine優化機制就會出錯。
下面全部的內容是針對Hadoop 2.x版本進行說明的,Hadoop 1.x和這裏有點不同。
在第一次部署好Hadoop集羣的時候,咱們須要在NameNode(NN)節點上格式化磁盤:
[wyp@wyp hadoop-2.2.0]$ $HADOOP_HOME/bin/hdfs namenode -format
格式化完成以後,將會在$dfs.namenode.name.dir/current目錄下以下的文件結構
current/
|-- VERSION
|-- edits_*
|-- fsimage_0000000000008547077
|-- fsimage_0000000000008547077.md5
|-- seen_txid
其中的dfs.namenode.name.dir是在hdfs-site.xml文件中配置的,默認值以下:
<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/name</value>
</property>
hadoop.tmp.dir是在core-site.xml中配置的,默認值以下
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop-${user.name}</value>
<description>A base for other temporary directories.</description>
</property>
dfs.namenode.name.dir屬性能夠配置多個目錄,如/data1/dfs/name,/data2/dfs/name,/data3/dfs/name,....。各個目錄存儲的文件結構和內容都徹底同樣,至關於備份,這樣作的好處是當其中一個目錄損壞了,也不會影響到Hadoop的元數據,特別是當其中一個目錄是NFS(網絡文件系統Network File System,NFS)之上,即便你這臺機器損壞了,元數據也獲得保存。
下面對$dfs.namenode.name.dir/current/目錄下的文件進行解釋。
一、 VERSION文件是Java屬性文件,內容大體以下:
#Fri Nov 15 19:47:46 CST 2013
namespaceID=934548976
clusterID=CID-cdff7d73-93cd-4783-9399-0a22e6dce196
cTime=0
storageType=NAME_NODE
blockpoolID=BP-893790215-192.168.24.72-1383809616115
layoutVersion=-47
其中
(1)、namespaceID是文件系統的惟一標識符,在文件系統首次格式化以後生成的;
(2)、storageType說明這個文件存儲的是什麼進程的數據結構信息(若是是DataNode,storageType=DATA_NODE);
(3)、cTime表示NameNode存儲時間的建立時間,因爲個人NameNode沒有更新過,因此這裏的記錄值爲0,之後對NameNode升級以後,cTime將會記錄更新時間戳;
(4)、layoutVersion表示HDFS永久性數據結構的版本信息, 只要數據結構變動,版本號也要遞減,此時的HDFS也須要升級,不然磁盤仍舊是使用舊版本的數據結構,這會致使新版本的NameNode沒法使用;
(5)、clusterID是系統生成或手動指定的集羣ID,在-clusterid選項中能夠使用它;以下說明
a、使用以下命令格式化一個Namenode:
$ $HADOOP_HOME/bin/hdfs namenode -format [-clusterId <cluster_id>]
選擇一個惟一的cluster_id,而且這個cluster_id不能與環境中其餘集羣有衝突。若是沒有提供cluster_id,則會自動生成一個惟一的ClusterID。
b、使用以下命令格式化其餘Namenode:
$ $HADOOP_HOME/bin/hdfs namenode -format -clusterId <cluster_id>
c、升級集羣至最新版本。在升級過程當中須要提供一個ClusterID,例如:
$ $HADOOP_PREFIX_HOME/bin/hdfs start namenode --config $HADOOP_CONF_DIR -upgrade -clusterId <cluster_ID>
若是沒有提供ClusterID,則會自動生成一個ClusterID。
(6)、blockpoolID:是針對每個Namespace所對應的blockpool的ID,上面的這個BP-893790215-192.168.24.72-1383809616115就是在個人ns1(NameNode節點)的namespace下的存儲塊池的ID,這個ID包括了 其對應的NameNode節點的ip地址。
二、 $dfs.namenode.name.dir/current/seen_txid很是重要,是存放transactionId的文件,format以後是0,它表明的是namenode裏面的edits_*文件的尾數,namenode重啓的時候,會按照seen_txid的數字,循序從頭跑edits_0000001~到seen_txid的數字。因此當你的hdfs發生異常重啓的時候,必定要比對seen_txid內的數字是否是你edits最後的尾數,否則會發生建置namenode時metaData的資料有缺乏,致使誤刪Datanode上多餘Block的資訊。
三、 $dfs.namenode.name.dir/current目錄下在format的同時也會生成fsimage和edits文件,及其對應的md5校驗文件。fsimage和edits是Hadoop元數據相關的重要文件,請參考Hadoop文件系統元數據fsimage和編輯日誌edits。
通常來講,map函數輸入的健/值類型(K1和V1)不一樣於輸出類型(K2和V2),雖然reduce函數的輸入類型必須與map函數的輸出類型相同,但reduce函數的輸出類型(K3和V3)能夠不一樣於輸入類型
若是使用combine函數,它與reduce函數的形式相同(它也是Reducer的一個實現),不一樣之處是它的輸出類型是中間的鍵/值對類型(K2和V2),這些中間值能夠輸入到reduce函數:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
partition(K2, V2) → integer //將中間鍵值對分區,返回分區索引號。分區內的鍵會排序,相同的鍵的全部值會合並
reduce: (K2, list(V2)) → list(K3, V3)
上面是map、combine、reduce的輸入輸出格式,如map輸入的是單獨的一對key/value(值也是值);而combine與reduce的輸入也是鍵值對,只不過它們的值不是單值,而是一個列表即多值;它們的輸出都是同樣,鍵值對列表;另外,reduce函數的輸入類型必須與map函數的輸出類型相同,因此都是K2與V2類型
job.setOutputKeyClass和job.setOutputValueClas在默認狀況下是同時設置map階段和reduce階段的輸出(包括Key與Value輸出),也就是說只有map和reduce輸出是同樣的時候纔會這樣設置;當map和reduce輸出類型不同的時候就須要經過job.setMapOutputKeyClass和job.setMapOutputValueClas來單獨對map階段的輸出進行設置,當使用job.setMapOutputKeyClass和job.setMapOutputValueClas後,setOutputKeyClass()與setOutputValueClas()此時則只對reduce輸出設置有效了。
一、新API傾向於使用抽像類,而不是接口,這樣更容易擴展。在舊API中使用Mapper和Reducer接口,而在新API中使用抽像類
二、新API放在org.apache.hadoop.mapreduce包或其子包中,而舊API則是放在org.apache.hadoop.mapred中
三、新API充分使用上下文對象,使用戶很好的與MapReduce交互。如,新的Context基本統一了舊API中的JobConf OutputCollector Reporter的功能,使用一個Context就能夠搞定,易使用
四、新API容許mapper和reducer經過重寫run()方法控制執行流程。如,便可以批處理鍵值對記錄,也能夠在處理完全部的記錄以前中止。這在舊API中能夠經過寫MapRunnable類在mapper中實現上述功能,但在reducer中沒法實現
五、新的API中做業是Job類實現,而非舊API中的JobClient類,新的API中刪除了JobClient類
六、新API實現了配置的統一。舊API中的做業配置是經過JobConf完成的,它是Configuration的子類。在新API中,做業的配置由Configuration,或經過Job類中的一些輔助方法來完成配置
輸出的文件命名方法稍有不一樣。在舊的API中map和reduce的輸出被統一命名爲 part-nnmm,但在新API中map的輸出文件名爲 part-m-nnnnn,而reduce的輸出文件名爲 part-r-nnnnn(nnnnn爲分區號,即該文件存放的是哪一個分區的數據,從0開始)其中part文件名能夠修改
七、
八、新API中的可重寫的用戶方法拋出ava.lang.InterruptedException異常,這意味着能夠使用代碼來實現中斷響應,從而能夠中斷那些長時間運行的做業
九、新API中,reduce()傳遞的值是java.lang.Iterable類型的,而非舊API中使用java.lang.Iterator類型,這就能夠很容易的使用for-each循環結構來迭代這些值:for (VALUEIN value : values) { ... }
存放的本地目錄是能夠經過hdfs-site.xml配置的:
hadoop1:
<property>
<name>dfs.name.dir</name>
<value>${hadoop.tmp.dir}/dfs/name</value>
<description>Determines where on the local filesystem the DFS name node
should store the name table(fsimage). If this is a comma-delimited list
of directories then the name table is replicated in all of the
directories, for redundancy. </description>
</property>
在《Hadoop NameNode元數據相關文件目錄解析》文章中提到NameNode的$dfs.namenode.name.dir/current/文件夾的幾個文件:
current/
|-- VERSION
|-- edits_*
|-- fsimage_0000000000008547077
|-- fsimage_0000000000008547077.md5
`-- seen_txid
其中存在大量的以edits開頭的文件和少許的以fsimage開頭的文件。那麼這兩種文件究竟是什麼,有什麼用?下面對這兩中類型的文件進行詳解。在進入下面的主題以前先來搞清楚edits和fsimage文件的概念:
(1)、fsimage文件實際上是Hadoop文件系統元數據的一個永久性的檢查點,其中包含Hadoop文件系統中的全部目錄和文件idnode的序列化信息;
(2)、edits文件存放的是Hadoop文件系統的全部更新操做的路徑,文件系統客戶端執行的全部寫操做首先會被記錄到edits文件中。
fsimage和edits文件都是通過序列化的,在NameNode啓動的時候,它會將fsimage文件中的內容加載到內存中,以後再執行edits文件中的各項操做,使得內存中的元數據和實際的同步,存在內存中的元數據支持客戶端的讀操做。
NameNode起來以後,HDFS中的更新操做會從新寫到edits文件中,由於fsimage文件通常都很大(GB級別的很常見),若是全部的更新操做都往fsimage文件中添加,這樣會致使系統運行的十分緩慢,可是若是往edits文件裏面寫就不會這樣,每次執行寫操做以後,且在向客戶端發送成功代碼以前,edits文件都須要同步更新。若是一個文件比較大,使得寫操做須要向多臺機器進行操做,只有當全部的寫操做都執行完成以後,寫操做纔會返回成功,這樣的好處是任何的操做都不會由於機器的故障而致使元數據的不一樣步。
fsimage包含Hadoop文件系統中的全部目錄和文件idnode的序列化信息;對於文件來講,包含的信息有修改時間、訪問時間、塊大小和組成一個文件塊信息等;而對於目錄來講,包含的信息主要有修改時間、訪問控制權限等信息。fsimage並不包含DataNode的信息,而是包含DataNode上塊的映射信息,並存放到內存中,當一個新的DataNode加入到集羣中,DataNode都會向NameNode提供塊的信息,而NameNode會按期的「索取」塊的信息,以使得NameNode擁有最新的塊映射。由於fsimage包含Hadoop文件系統中的全部目錄和文件idnode的序列化信息,因此若是fsimage丟失或者損壞了,那麼即便DataNode上有塊的數據,可是咱們沒有文件到塊的映射關係,咱們也沒法用DataNode上的數據!因此按期及時的備份fsimage和edits文件很是重要!
在前面咱們也提到,文件系統客戶端執行的因此寫操做首先會被記錄到edits文件中,那麼長此以往,edits會很是的大,而NameNode在重啓的時候須要執行edits文件中的各項操做,那麼這樣會致使NameNode啓動的時候很是長!在下篇文章中我會談到在Hadoop 1.x版本和Hadoop 2.x版本是怎麼處理edits文件和fsimage文件的。
在NameNode運行期間,HDFS的全部更新操做都是直接寫到edits中,長此以往edits文件將會變得很大;雖然這對NameNode運行時候是沒有什麼影響的,可是咱們知道當NameNode重啓的時候,NameNode先將fsimage裏面的全部內容映像到內存中,而後再一條一條地執行edits中的記錄,當edits文件很是大的時候,會致使NameNode啓動操做很是地慢,而在這段時間內HDFS系統處於安全模式,這顯然不是用戶要求的。能不能在NameNode運行的時候使得edits文件變小一些呢?實際上是能夠的,本文主要是針對Hadoop 1.x版本,說明其是怎麼將edits和fsimage文件合併的,Hadoop 2.x版本edits和fsimage文件合併是不一樣的。
用過Hadoop的用戶應該都知道在Hadoop裏面有個SecondaryNamenode進程,從名字看來你們很容易將它看成NameNode的熱備進程。其實真實的狀況不是這樣的。SecondaryNamenode是HDFS架構中的一個組成部分,它是用來保存namenode中對HDFS metadata的信息的備份,並減小namenode重啓的時間而設定的!通常都是將SecondaryNamenode單獨運行在一臺機器上,那麼SecondaryNamenode是如何減小namenode重啓的時間的呢?來看看SecondaryNamenode的工做狀況:
(1)、SecondaryNamenode會按期的和NameNode通訊,請求其中止使用edits文件,暫時將新的寫操做寫到一個新的文件edit.new上來,這個操做是瞬間完成,上層寫日誌的函數徹底感受不到差異;
(2)、SecondaryNamenode經過HTTP GET方式從NameNode上獲取到fsimage和edits文件,並下載到本地的相應目錄下;
(3)、SecondaryNamenode將下載下來的fsimage載入到內存,而後一條一條地執行edits文件中的各項更新操做,使得內存中的fsimage保存最新;這個過程就是edits和fsimage文件合併;
(4)、SecondaryNamenode執行完(3)操做以後,會經過post方式將新的fsimage文件發送到NameNode節點上
(5)、NameNode將從SecondaryNamenode接收到的新的fsimage替換舊的fsimage文件,同時將edit.new替換edits文件,經過這個過程edits就變小了!整個過程的執行能夠經過下面的圖說明:
在(1)步驟中,咱們談到SecondaryNamenode會按期的和NameNode通訊,這個是須要配置的,能夠經過core-site.xml進行配置,下面是默認的配置:
<property>
<name>fs.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
其實若是當fs.checkpoint.period配置的時間尚未到期,咱們也能夠經過判斷當前的edits大小來觸發一次合併的操做,能夠經過下面配置:
<property>
<name>fs.checkpoint.size</name>
<value>67108864</value>
<description>The size of the current edit log (in bytes) that triggers
a periodic checkpoint even if the fs.checkpoint.period hasn't expired.
</description>
</property>
當edits文件大小超過以上配置,即便fs.checkpoint.period還沒到,也會進行一次合併。順便說說SecondaryNamenode下載下來的fsimage和edits暫時存放的路徑能夠經過下面的屬性進行配置:
<property>
<name>fs.checkpoint.dir</name>
<value>${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
replicated in all of the directories for redundancy.
</description>
</property>
<property>
<name>fs.checkpoint.edits.dir</name>
<value>${fs.checkpoint.dir}</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary edits to merge.
If this is a comma-delimited list of directoires then teh edits is
replicated in all of the directoires for redundancy.
Default value is same as fs.checkpoint.dir
</description>
</property>
從上面的描述咱們能夠看出,SecondaryNamenode根本就不是Namenode的一個熱備,其只是將fsimage和edits合併。其擁有的fsimage不是最新的,由於在他從NameNode下載fsimage和edits文件時候,新的更新操做已經寫到edit.new文件中去了。而這些更新在SecondaryNamenode是沒有同步到的!固然,若是NameNode中的fsimage真的出問題了,仍是能夠用SecondaryNamenode中的fsimage替換一下NameNode上的fsimage,雖然已經不是最新的fsimage,可是咱們能夠將損失減少到最少!
在Hadoop 2.x經過配置JournalNode來實現Hadoop的高可用性,能夠參見《Hadoop2.2.0中HDFS的高可用性實現原理》,這樣主被NameNode上的fsimage和edits都是最新的,任什麼時候候只要有一臺NameNode掛了,也能夠使得集羣中的fsimage是最新狀態!關於Hadoop 2.x是如何合併fsimage和edits的,能夠參考《Hadoop 2.x中fsimage和edits合併實現》
在《Hadoop 1.x中fsimage和edits合併實現》文章中,咱們談到了Hadoop 1.x上的fsimage和edits合併實現,裏面也提到了Hadoop 2.x版本的fsimage和edits合併實現和Hadoop 1.x徹底不同,今天就來談談Hadoop 2.x中fsimage和edits合併的實現。
咱們知道,在Hadoop 2.x中解決了NameNode的單點故障問題;同時SecondaryName已經不用了,而以前的Hadoop 1.x中是經過SecondaryName來合併fsimage和edits以此來減少edits文件的大小,從而減小NameNode重啓的時間。而在Hadoop 2.x中已經不用SecondaryName,那它是怎麼來實現fsimage和edits合併的呢?首先咱們得知道,在Hadoop 2.x中提供了HA機制(解決NameNode單點故障),能夠經過配置奇數個JournalNode來實現HA,如何配置今天就不談了!HA機制經過在同一個集羣中運行兩個NN(active NN & standby NN)來解決NameNode的單點故障,在任什麼時候間,只有一臺機器處於Active狀態;另外一臺機器是處於Standby狀態。Active NN負責集羣中全部客戶端的操做;而Standby NN主要用於備用,它主要維持足夠的狀態,若是必要,能夠提供快速的故障恢復。
爲了讓Standby NN的狀態和Active NN保持同步,即元數據保持一致,它們都將會和JournalNodes守護進程通訊。當Active NN執行任何有關命名空間的修改(如增刪文件),它須要持久化到一半(因爲JournalNode最少爲三臺奇數臺,因此最少要存儲到其中兩臺上)以上的JournalNodes上(經過edits log持久化存儲),而Standby NN負責觀察edits log的變化,它可以讀取從JNs中讀取edits信息,並更新其內部的命名空間。一旦Active NN出現故障,Standby NN將會保證從JNs中讀出了所有的Edits,而後切換成Active狀態。Standby NN讀取所有的edits可確保發生故障轉移以前,是和Active NN擁有徹底同步的命名空間狀態(更多的關於Hadoop 2.x的HA相關知識,能夠參考本博客的《Hadoop2.2.0中HDFS的高可用性實現原理》)。
那麼這種機制是如何實現fsimage和edits的合併?在standby NameNode節點上會一直運行一個叫作CheckpointerThread的線程,這個線程調用StandbyCheckpointer類的doWork()函數,而doWork函數會每隔Math.min(checkpointCheckPeriod, checkpointPeriod)秒來作一次合併操做,相關代碼以下:
try {
Thread.sleep(1000 * checkpointConf.getCheckPeriod());
} catch (InterruptedException ie) {}
publiclong getCheckPeriod() {
return Math.min(checkpointCheckPeriod, checkpointPeriod);
}
checkpointCheckPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT);
checkpointPeriod = conf.getLong(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
上面的checkpointCheckPeriod和checkpointPeriod變量是經過獲取hdfs-site.xml如下兩個屬性的值獲得:
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description>The SecondaryNameNode and CheckpointNode will poll the NameNode
every 'dfs.namenode.checkpoint.check.period' seconds to query the number
of uncheckpointed transactions.
</description>
</property>
當達到下面兩個條件的狀況下,將會執行一次checkpoint:
boolean needCheckpoint = false;
if (uncheckpointed >= checkpointConf.getTxnCount()) {
LOG.info("Triggering checkpoint because there have been " +
uncheckpointed + " txns since the last checkpoint, which " +
"exceeds the configured threshold " +
checkpointConf.getTxnCount());
needCheckpoint = true;
} else if (secsSinceLast >= checkpointConf.getPeriod()) {
LOG.info("Triggering checkpoint because it has been " +
secsSinceLast + " seconds since the last checkpoint, which " +
"exceeds the configured interval " + checkpointConf.getPeriod());
needCheckpoint = true;
}
當上述needCheckpoint被設置成true的時候,StandbyCheckpointer類的doWork()函數將會調用doCheckpoint()函數正式處理checkpoint。當fsimage和edits的合併完成以後,它將會把合併後的fsimage上傳到Active NameNode節點上,Active NameNode節點下載完合併後的fsimage,再將舊的fsimage刪掉(Active NameNode上的)同時清除舊的edits文件。步驟能夠歸類以下:
(1)、配置好HA後,客戶端全部的更新操做將會寫到JournalNodes節點的共享目錄中,能夠經過下面配置
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://XXXX/mycluster</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/export1/hadoop2x/dfs/journal</value>
</property>
(2)、Active Namenode和Standby NameNode從JournalNodes的edits共享目錄中同步edits到本身edits目錄中;
(3)、Standby NameNode中的StandbyCheckpointer類會按期的檢查合併的條件是否成立,若是成立會合並fsimage和edits文件;
(4)、Standby NameNode中的StandbyCheckpointer類合併完以後,將合併以後的fsimage上傳到Active NameNode相應目錄中;
(5)、Active NameNode接到最新的fsimage文件以後,將舊的fsimage和edits文件清理掉;
(6)、經過上面的幾步,fsimage和edits文件就完成了合併,因爲HA機制,會使得Standby NameNode和Active NameNode都擁有最新的fsimage和edits文件(以前Hadoop 1.x的SecondaryNameNode中的fsimage和edits不是最新的)
在Hadoop2.0.0以前,NameNode(NN)在HDFS集羣中存在單點故障(single point of failure),每個集羣中存在一個NameNode,若是NN所在的機器出現了故障,那麼將致使整個集羣沒法利用,直到NN重啓或者在另外一臺主機上啓動NN守護線程。
主要在兩方面影響了HDFS的可用性:
(1)、在不可預測的狀況下,若是NN所在的機器崩潰了,整個集羣將沒法利用,直到NN被從新啓動;
(2)、在可預知的狀況下,好比NN所在的機器硬件或者軟件須要升級,將致使集羣宕機。
HDFS的高可用性將經過在同一個集羣中運行兩個NN(active NN & standby NN)來解決上面兩個問題,這種方案容許在機器破潰或者機器維護快速地啓用一個新的NN來恢復故障。
在典型的HA集羣中,一般有兩臺不一樣的機器充當NN。在任什麼時候間,只有一臺機器處於Active狀態;另外一臺機器是處於Standby狀態。Active NN負責集羣中全部客戶端的操做;而Standby NN主要用於備用,它主要維持足夠的狀態,若是必要,能夠提供快速的故障恢復。
爲了讓Standby NN的狀態和Active NN保持同步,即元數據保持一致,它們都將會和JournalNodes守護進程通訊。當Active NN執行任何有關命名空間的修改,它須要持久化到一半(奇數個,通常爲3,因此須要持久到2臺)以上的JournalNodes上(經過edits log持久化存儲),而Standby NN負責觀察edits log的變化,它可以讀取從JNs中讀取edits信息,並更新其內部的命名空間。一旦Active NN出現故障,Standby NN將會保證從JNs中讀出了所有的Edits,而後切換成Active狀態。Standby NN讀取所有的edits可確保發生故障轉移以前,是和Active NN擁有徹底同步的命名空間狀態。
爲了提供快速的故障恢復,Standby NN也須要保存集羣中各個文件塊的存儲位置。爲了實現這個,集羣中全部的DataNode將配置好Active NN和Standby NN的位置,並向它們發送塊文件所在的位置及心跳,以下圖所示:
在任什麼時候候,集羣中只有一個NN處於Active 狀態是極其重要的。不然,在兩個Active NN的狀態下NameSpace狀態將會出現分歧,這將會致使數據的丟失及其它不正確的結果。爲了保證這種狀況不會發生,在任什麼時候間,JNs只容許一個NN充當writer。在故障恢復期間,將要變成Active 狀態的NN將取得writer的角色,並阻止另一個NN繼續處於Active狀態。
爲了部署HA集羣,你須要準備如下事項:
(1)、NameNode machines:運行Active NN和Standby NN的機器須要相同的硬件配置;
(2)、JournalNode machines:也就是運行JN的機器。JN守護進程相對來講比較輕量,因此這些守護進程能夠與其餘守護線程(好比NN,YARN ResourceManager)運行在同一臺機器上。在一個集羣中,最少要運行3個JN守護進程,這將使得系統有必定的容錯能力。固然,你也能夠運行3個以上的JN,可是爲了增長系統的容錯能力,你應該運行奇數個JN(3、5、7等),當運行N個JN,系統將最多容忍(N-1)/2個JN崩潰。
在HA集羣中,Standby NN也執行namespace狀態的checkpoints,因此沒必要要運行Secondary NN、CheckpointNode和BackupNode;事實上,運行這些守護進程是錯誤的。
hadoop fs –help
% hadoop fs -copyFromLocal /input/docs/quangle.txt quangle.txt 將本地文件複製到HDFS中,目的地爲相對地址,相對的是HDFS上的/user/root目錄,root爲用戶,不一樣的用戶執行,則不一樣
hadoop fs -copyToLocal quangle.txt quangle.copy.txt 從HDFS中下載文件到本地
% hadoop fs -mkdir books
% hadoop fs -ls .
Found 2 items
drwxr-xr-x - tom supergroup 0 2009-04-02 22:41 /user/tom/books
-rw-r--r-- 1 tom supergroup 118 2009-04-02 22:29 /user/tom/quangle.txt
第一列爲文件模式,第二列文件的副本數,若是目錄則沒有;第3、四列表示文件的所屬用戶和用戶組;第五列爲文件大小,目錄沒有。
一個文件的執行權限X沒有什麼意義,由於你不可能在HDFS系統時執行一個文件,但它對目錄是有用的,由於在訪問一個目錄的子項時是不須要此種權限的
處理超大的文件:一個文件能夠是幾百M,甚至是TB級文件
流式數據訪問:一次寫入,屢次讀取
廉價的機器
不適用於低延遲數據訪問,若是須要能夠使用Hbase
不適用於大量的小文件,由於每一個文件存儲在DFS中時,都會在NameNode上保存文件的元數據信息,這會會急速加太NameNode節點的內存佔用,目前每一個文件的元數據信息大概佔用150字節,若是有一百萬個文件,則要佔用300MB的內存
不支持併發寫,而且也不支持文件任意位置的寫入,只能一個用戶寫在文件最末
默認塊大小爲64M,若是某個文件不足64,則不會佔64,而是文件自己文件大小,這與操做系統文件最小存儲單元塊不一樣
分塊存儲的好處:
一個文件的大小能夠大於網絡中的任意一個硬盤的容量,若是不分塊,則不能存儲在硬盤中,當分塊後,就能夠將這個大文件分塊存儲到集羣中的不一樣硬盤中
分塊後適合多副本數據備份,保證了數據的安全
能夠經過如下命令查看塊信息:
hadoop fsck / -files -blocks
一個namenode(管理者),多個datanode(工做者,存放數據)
namenode管理文件系統的命名空間,它維護着文件系統樹及整個樹裏的文件和目錄,這些系統以兩個文件持久化硬盤上永久保存着:命名空間鏡像文件fsimage和編輯日誌文件edits
namenode記錄了每一個文件中各個塊所在的datanode信息,但它並不將這些位置信息持久化硬盤上,由於這些信息會在系統啓動時由datanode上報給namenode節點後重建
若是在沒有任何備份,namenode若是壞了,則整個文件系統將沒法使用,因此就有了secondnamenode輔助節點:
secondnamenode除了備份namenode上的元數據持久信息外,最主要的做用是按期的將namenode上的fsimage、edits兩個文件拷貝過來進行合併後,將傳回給namenode。secondnamenode的備份並不是namenode上全部信息的徹底備份,它所保存的信息就是滯後於namenode的,因此namenode壞掉後,儘管能夠手動從secondnamenode恢復,但也不免丟失部分數據(最近一次合併到當前時間內所作的數據操做)
因爲1.X只能有一個namenode,隨着文件愈來愈多,namenode的內存就會受到限制,到某個時候確定是存放不了更多的文件了(雖然datanode能夠加入新的datanode能夠解決存儲容量問題),不能夠無限在一臺機器上加內存。在2.X版本中,引入了聯邦HDFS容許系統經過添加namenode進行擴展,這樣每一個namenode管理着文件系統命名空間的一部分元數據信息
聯邦HDFS只解決了內存數據擴展的問題,但並無解決namenode單節點問題,即當某個namenode壞掉所,因爲namenode沒有備用,因此一旦毀壞後仍是會致使文件系統沒法使用。
HDFS高可用性包括:水平擴展namenode以實現內存擴展、高安全(壞掉還有其餘備用的節點)及熱切換(壞掉後無需手動切換到備用節點)到備用機
在2.x中增長了高可用性支持,經過活動、備用兩臺namenode來實現,當活動namenode失效後,備用namenode就會接管安的任務並開始服務於來自客戶端的請求,不會有任何明顯中斷服務,這須要架構以下:
n Namenode之間(活動的與備用的兩個節點)之間須要經過共享存儲編輯日誌文件edits,即這edits文件放在一個兩臺機器都能訪問獲得的地方存儲,當活動的namenode毀壞後,備用namenode自動切換爲活動時,備用機將edits文件恢復備用機內存
n Datanode須要現時向兩個namenode發送數據塊處理報告
n 客戶端不能直接訪問某個namenode了,由於一旦某個出問題後,就須要經過另外一備用節點來訪問,這須要用戶對namenode訪問是透明的,不能直接訪問namenode,而是經過管理這些namenode集羣入口地址透明訪問
在活動namenode失效後,備用namenode可以快速(幾十秒的時間)實現任務接管,由於最新的狀態存儲在內存中:包括最新的編輯日誌和最新的數據塊映射信息
備用切換是經過failover_controller故障轉移控制器來完成的,故障轉移控制器是基於ZooKeeper實現的;每一個namenode節點上都運行着一個輕量級的故障轉移控制器,它的工做就是監視宿主namenode是否失效(經過一個簡單的心跳機制實現)並在namenode失效時進行故障切換;用戶也能夠在namenode沒有失效的狀況下手動發起切換,例如在進行平常維護時;另外,有時沒法確切知道失效的namenode是否已經中止運行,例如在網絡異常狀況下,一樣也可能激發故障轉換,但先前的活動着的namenode依然運行着而且依舊是活動的namenode,這會出現其餘問題,但高可用實現作了「規避」措施,如殺死行前的namenode進程,收回訪問共享存儲目錄的權限等
僞分佈式: fs.default.name=hdfs://localhost:8020;dfs.replication=1,若是設置爲3,將會持續收到塊副本不中的警告,設置這個屬性後就不會再有問題了
Hadoop自己是由Java編寫的
org.apache.hadoop.fs.FileSystem是文件系統的抽象類,常見有如下實現類:
文件系統 |
URI scheme |
Java實現類 |
描述 |
Local |
file |
org.apache.hadoop.fs.LocalFileSystem |
使用了客戶端校驗和的本地文件系統(未使用校驗和的本地文件系統請使用RawLocalFileSystem) |
HDFS |
hdfs |
org.apache.hadoop.hdfs.DistributedFileSystem |
Hadoop分佈式文件系統 |
HFTP |
hftp |
org.apache.hadoop.hdfs.HftpFileSystem |
經過Http對Hdfs進行只讀訪問的文件系統,用於實現不一樣版本HDFS集羣間的數據複製 |
HSFTP |
hsftp |
org.apache.hadoop.hdfs.HsftpFileSystem |
同上,只是https協議 |
|
|
org.apache.hadoop. |
|
獲取FileSystem實例有如下幾個靜態方法:
publicstatic FileSystem get(Configuration conf) throws IOException//獲取core-sit.xml中fs.default.name配置屬性所配置的URI來返回相應的文件系統,因爲core-sit.xml已配置,因此通常調用這個方法便可
publicstatic FileSystem get(URI uri, Configuration conf) throws IOException//根據uri參數裏提供scheme信息返回相應的文件系統,即hdfs://hadoop-master:9000,則返回的是hdfs文件系統
publicstatic FileSystem get(URI uri, Configuration conf, String user) throws IOException
有了FileSystem後,就能夠調用open()方法獲取文件輸入流:
public FSDataInputStream open(Path f) throws IOException //默認緩衝4K
publicabstract FSDataInputStream open(Path f, int bufferSize) throws IOException
示例:將hdfs文件系統中的文件內容在標準輸出顯示
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
publicclass FileSystemCat {
publicstaticvoid main(String[] args) throws Exception {
// 若是爲默認端口8020,則能夠省略端口
String uri = "hdfs://hadoop-master:9000/wordcount/input/wordcount.txt";
Configuration conf = new Configuration();
// FileSystem fs = FileSystem.get(URI.create(uri), conf);
// 由於get方法的URI參數只須要URI scheme,因此只需指定服務地址便可,無需同具體到某個文件
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
//或者這樣使用
// conf.set("fs.default.name", "hdfs://hadoop-master:9000");
// FileSystem fs = FileSystem.get(conf);
InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false); //無需使用循環對流進行拷貝,藉助於工具類IOUtils便可
} finally {
IOUtils.closeStream(in);//不直接調用輸入輸出流的close方法,而是使用IOUtils工具類
}
}
}
實際上,FileSystem的open方法返回的是FSDataInputStream類型的對象,而非Java標準輸入流,這個類繼承了標準輸入流DataInputStream:
publicclassFSDataInputStreamextends DataInputStream
implements Seekable, PositionedReadable, Closeable, HasFileDescriptor {
而且FSDataInputStream類實現了Seekable接口,支持隨機訪問,所以能夠從流的任意位置讀取數據。
Seekable接口支持在文件中找到指定的位置,並提供了一個查詢當前位置至關於文件起始位置偏移量的方法getPos():
publicinterfaceSeekable {
// 定位到文件指定的位置,與標準輸入流的InputStream.skip不一樣的是,seek能夠定位到文件中的任意絕對位置,而
// skip只能相對於當前位置才能定位到新的位置。這裏會傳遞的是相對於文件開頭的絕對位置,不能超過文件長度。注:seek開銷很高,謹慎調用
void seek(long pos) throws IOException;
// 返回當前相對於文件開頭的偏移量
long getPos() throws IOException;
boolean seekToNewSource(long targetPos) throws IOException;
}
示例:改寫上面實例,讓它輸出兩次
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
publicclass FileSystemCat {
publicstaticvoid main(String[] args) throws Exception {
String uri = "hdfs://hadoop-master:9000/wordcount/input/wordcount.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
System.out.println("\n");
in.seek(0);//跳到文件的開頭
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
FSDataInputStream類還實現了PositionedReadable接口,這能夠從一個指定的偏移量處讀取文件的一部分:
publicinterfacePositionedReadable {
// 從文件指定的position處讀取最多length字節的數據並存入緩衝區buffer的指定偏移量offset處,返回的值是
// 實際讀取到的字節數:調用者須要檢查這個值,有可能小於參數length
publicint read(long position, byte[] buffer, int offset, int length) throws IOException;
// 與上面方法至關,只是若是讀取到文件最末時,被讀取的字節數可能不滿length,此時則會拋異常
publicvoid readFully(long position, byte[] buffer, int offset, int length) throws IOException;
// 與上面方法至關,只是每次讀取的字節數爲buffer.length
publicvoid readFully(long position, byte[] buffer) throws IOException;
}
注:上面這些方法都不會修改當前所在文件偏移量
FileSystem類有一系列參數不一樣的create建立文件方法,最簡單的方法:
publicFSDataOutputStreamcreate(Path f) throws IOException {
還有一系列不一樣參數的重載方法,他們最終都是調用下面這個抽象方法實現的:
publicabstract FSDataOutputStream create(Path f,
FsPermission permission, //權限
boolean overwrite, //若是文件存在,傳false時會拋異常,不然覆蓋已存在的文件
int bufferSize, //緩衝區的大小
short replication, //副本數量
long blockSize, //塊大小
Progressable progress) throws IOException; //處理進度的回調接口
通常調用簡單方法時,若是文件存在,則是會覆蓋,若是不想覆蓋,能夠指定overwrite參數爲false,或者使用FileSystem類的exists(Path f)方法進行判斷:
publicbooleanexists(Path f) throws IOException {//能夠用來測試文件或文件夾是否存在
進度回調接口,當數據每次寫完緩衝數據後,就會回調該接口顯示進度信息:
package org.apache.hadoop.util;
publicinterface Progressable {
publicvoid progress();//返回處理進度給Hadoop應用框架
}
另外一種新建文件的方法是使用append方法在一個已有文件末尾追加數據(該方法也有一些重載版本):
public FSDataOutputStream append(Path f) throws IOException {
示例:帶進度的文件上傳
publicclass FileCopyWithProgress {
publicstaticvoid main(String[] args) throws Exception {
InputStream in = new BufferedInputStream(new FileInputStream("d://1901.all"));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
OutputStream out = fs.create(new Path("hdfs://hadoop-master:9000/ncdc/all/1901.all"), new Progressable() {
publicvoid progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}
}
像FSDataInputStream 同樣,FSDataOutputStream類也有一個getPos方法,用來查詢當前位置,但與FSDataInputStream不一樣的是,不容許在文件中定位,這是由於HDFS只容許對一個已打開的文件順序寫入,或在現有文件的末尾追加數據,安不支持在除文件末尾以外的其餘位置進行寫入,因此就沒有seek定位方法了
publicvoid copyFromLocalFile(Path src, Path dst)
publicvoid copyFromLocalFile(boolean delSrc, Path src, Path dst)
publicvoid copyFromLocalFile(boolean delSrc, boolean overwrite,Path[] srcs, Path dst)
publicvoid copyFromLocalFile(boolean delSrc, boolean overwrite,Path src, Path dst)
delSrc - whether to delete the src是否刪除源文件
overwrite - whether to overwrite an existing file是否覆蓋已存在的文件
srcs - array of paths which are source 能夠上傳多個文件數組方式
dst – path 目標路徑,若是存在,且都是目錄的話,會將文件存入它下面,而且上傳文件名不變;若是不存在,則會建立並認爲它是文件,即上傳的文件名最終會成爲dst指定的文件名
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoop-master:9000");
FileSystem fs = FileSystem.get(conf);
fs.copyFromLocalFile(new Path("c:/t_stud.txt"), new Path("hdfs://hadoop-master:9000/db1/output1"));
fileSystem.rename(src, dst);
形爲重命名,實際上該方法還能夠移動文件,與上傳目的地dst參數同樣:若是dst爲存在的目錄,則會放在它下面;若是不存在,則會建立並認爲它是文件,即上傳的文件名最終會成爲dst指定的文件名
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoop-master:9000");
FileSystem fs = FileSystem.get(conf);
fs.rename(new Path("hdfs://hadoop-master:9000/db1/output2"), new Path("hdfs://hadoop-master:9000/db3/output2"));
FileSystem的delete()方法能夠用來刪除文件或目錄
publicabstractboolean delete(Path f, boolean recursive) throws IOException;
若是f是一個文件或空目錄,那麼recursive的值就會被忽略。只有在recursive值爲true時,非空目錄及其內容纔會被刪除(若是刪除非空目錄時recursive爲false,則會拋IOException異常?)
FileSystem提供了建立目錄的方法:
publicbooleanmkdirs(Path f) throws IOException {
若是父目錄不存在,則也會自動建立,並返回是否成功
一般狀況下,咱們不須要調用這個方法建立目錄,由於調用create方法建立文件時,若是父目錄不存在,則會自動建立
FileStatus類封裝了文件系統中的文件和目錄的元數據,包括文件長度、大小、副本數、修改時間、全部者、權限等
FileSystem的getFileStatus方法能夠獲取FileStatus對象:
publicabstract FileStatus getFileStatus(Path f) throws IOException;
示例:獲取文件(夾)狀態信息
publicclass ShowFileStatus {
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
//out.close();
IOUtils.closeStream(out);
// 文件的狀態信息
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
System.out.println(stat.getPath().toUri().getPath());
System.out.println(stat.isDir());//是否文件夾
System.out.println(stat.getLen());//文件大小
System.out.println(stat.getModificationTime());//文件修改時間
System.out.println(stat.getReplication());//副本數
System.out.println(stat.getBlockSize());//文件系統所使用的塊大小
System.out.println(stat.getOwner());//文件全部者
System.out.println(stat.getGroup());//文件全部者所在組
System.out.println(stat.getPermission().toString());//文件權限
System.out.println();
// 目錄的狀態信息
Path dir = new Path("/dir");
stat = fs.getFileStatus(dir);
System.out.println(stat.getPath().toUri().getPath());
System.out.println(stat.isDir());
System.out.println(stat.getLen());//文件夾爲0
System.out.println(stat.getModificationTime());
System.out.println(stat.getReplication());//文件夾爲0
System.out.println(stat.getBlockSize());//文件夾爲0
System.out.println(stat.getOwner());
System.out.println(stat.getGroup());
System.out.println(stat.getPermission().toString());
}
}
除了上面FileSystem的getFileStatus一次只能獲取一個文件或目錄的狀態信息外,FileSystem還能夠一次獲取多個文件的FileStatus或目錄下的全部文件的FileStatus,這能夠調用FileSystem的listStatus方法,該方法有如下重載版本:
publicabstract FileStatus[] listStatus(Path f) throws IOException;
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
public FileStatus[] listStatus(Path[] files) throws IOException {
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
當傳入的參數是一個文件時,它會簡單轉成以數組方式返回長度爲1的FileStatus對象。當傳入的是一個目錄時,則返回0或多個FileStatus對象,包括此目錄中包括的全部文件和目錄
listStatus方法能夠列出目錄下全部文件的文件狀態,因此就能夠藉助於這個特色列出某個目錄下的全部文件(包括子目錄):
publicclass ListStatus {
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
Path[] paths = new Path[2];
// 目錄
paths[0] = new Path("hdfs://hadoop-master:9000/ncdc");
// 文件
paths[1] = new Path("hdfs://hadoop-master:9000/wordcount/input/wordcount.txt");
// 只傳一個目錄進去。注:listStatus方法只會將直接子目錄或子文件列出來,
// 而不會遞歸將全部層級子目錄文件列出
FileStatus[] status = fs.listStatus(paths[0]);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
// 輸出輸入目錄下的全部文件及目錄的路徑
System.out.println(p);
}
System.out.println();
// 只傳一個文件進去
status = fs.listStatus(paths[1]);
listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
// 輸出輸入文件的路徑
System.out.println(p);
}
System.out.println();
//傳入的爲一個數組:包括文件與目錄
status = fs.listStatus(paths);
// 將FileStatus數組轉換爲Path數組
listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
// 輸出全部輸入的文件的路徑,以及輸入目錄下全部文件或子目錄的路徑
System.out.println(p);
}
}
}
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoop-master:9000");
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem) fs;
DatanodeInfo[] dns = hdfs.getDataNodeStats();
for (int i = 0, h = dns.length; i < h; i++) {
System.out.println("datanode_" + i + "_name: " + dns[i].getHostName());
}
經過DatanodeInfo能夠得到datanode更多的消息
FileSystem提供了兩個通配的方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException {
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
pathPattern參數是通配,filter是進一步驟過濾
注:根據通配表達式,匹配到的多是目錄,也多是文件,這要看通配表達式是隻到目錄,仍是到文件。具體示例請參考下面的PathFilter
有時通配模式並不總能多精確匹配到咱們想要的文件,此時此要使用PathFilter參數進行過濾。FileSystem的listStatus() 和 globStatus()方法就提供了此過濾參數
publicinterfacePathFilter {
boolean accept(Path path);
}
示例:排除匹配指定正則表達式的路徑
publicclass RegexExcludePathFilter implements PathFilter {
privatefinal String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
publicboolean accept(Path path) {
return !path.toString().matches(regex);
}
}
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
FileStatus[] status = fs.globStatus(new Path("/2007/*/*"));//匹配到文件夾
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
FileStatus[] status = fs.globStatus(new Path("/2007/*/*/*30.txt"));//匹配到文件
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
publicstaticvoid main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
FileStatus[] status = fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter(
"^.*/2007/12/31$"));//過濾掉31號的目錄
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
一、 客戶端調用DistributedFileSystem(在編程時咱們通常直接調用的是其抽像父類FileSystem的open方法,對於HDFS文件系統來講,實質上調用的仍是DistributedFileSystem的open方法)的open()方法來打開要讀取的文件,並返回FSDataInputStream類對象(該類實質上是對DFSInputStream的封裝,由它來處理與datanode和namenode的通訊,管理I/O)
二、 DistributedFileSystem經過使用RPC來調用namenode,查看文件在哪些datanode上,並返回這些datanode的地址(注:因爲同一文件塊副本的存放在不少不一樣的datanode節點上,返回的都是網絡拓撲中距離客戶端最近的datanode節點地址,距離算法請參考後面)
三、 客戶端調用FSDataInputStream對象的read()方法
四、 FSDataInputStream去相應datanode上讀取第一個數據塊(這一過程並不須要namenode的參與,該過程是客戶端直接訪問datanote)
五、 FSDataInputStream去相應datanode上讀取第二個數據塊…如此讀完全部數據塊(注:數據塊讀取應該是同時併發讀取,即在讀取第一塊時,也同時在讀取第二塊,只是在拼接文件時須要按塊順序組織成文件)
六、 客戶端調用FSDataInputStream的close()方法關閉文件輸入流
假設有數據中心d1機架r1中的n1節點表示爲 /d1/r1/n1。
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)同一節點
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)同一機架上不一樣節點
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)同一數據中心不一樣機架上不一樣節點
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)不一樣數據中心
哪些節點是哪些機架上是經過配置實現的,具體請參考後面的章節
一、 客戶端調用DistributedFileSystem的create()建立文件,並向客戶端返回FSDataOutputStream類對象(該類實質上是對DFSOutputStream的封裝,由它來處理與datanode和namenode的通訊,管理I/O)
二、 DistributedFileSystem向namenode發出建立文件的RPC調用請求,namenode會告訴客戶端該文件會寫到哪些datanode上
三、 客戶端調用FSDataOutputStream的write方法寫入數據
四、 FSDataOutputStream向datanode寫數據
五、 當數據塊寫完(要達到dfs.replication.min副本數)後,會返回確認寫完的信息給FSDataOutputStream。在返回寫完信息的後,後臺系統還要拷貝數據副本要求達到dfs.replication設置的副本數,這一過程是在後臺自動異步複製完成的,並不須要等全部副本都拷貝完成後才返回確認信息到FSDataOutputStream
六、 客戶端調用FSDataOutputStream的close方法關閉流
七、 DistributedFileSystem發送文件寫入完成的信息給namenode
數據存儲在哪些datanode上,這是有默認佈局策略的:
在客戶端運行的datanode節點上放第一個副本(若是客戶端是在集羣外的機器上運行的話,會隨機選擇一個空閒的機器),第二個副本則放在與第一個副本不在同一機架的節點上,第三個副本則放在與第二個節點同一機架上的不一樣節點上,超過3個副本的,後繼會隨機選擇一臺空閒機器放後繼其餘副本。這樣作的目的兼顧了安全與效率問題
當新建一個文件後,在文件系統命名空間當即可見,但數據不必定能當即可見,即便數據流已刷新:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
當寫入數據超過一個塊後,第一個數據塊對新的reader就是可見的,以後的塊也是同樣,當後面的塊寫入後,前面的塊才能可見。總之,當前正在寫入的塊對其餘reader是不可見的
FSDataOutputStream提供了一個方法sync()來使全部緩存與數據節點強行同步,當sync()方法調用成功後,對全部新的reader而言均可見:
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
注:若是調用了FSDataOutputStream的close()方法,該方法也會調用sync()
文件壓縮有兩大好處:減小存儲文件所須要的磁盤空間,並加速數據在網絡和磁盤上的傳輸
全部的壓縮算法要權衡時間與空間,壓縮時間越短,壓縮率超低,壓縮時間越長,壓縮率超高。上表時每一個工具都有9個不一樣的壓縮級別:-1爲優化壓縮速度,-9爲優化壓縮空間。以下面命令經過最快的壓縮方法建立一個名爲file.gz的壓縮文件:
gzip -1 file
不一樣壓縮工具備不一樣的壓縮特性。gzip是一個通用的壓縮工具,在空間與時間比較均衡。bzip2壓縮能力強於gzip,但速度會慢一些。另外,LZO、LZ4和Snappy都優化了壓縮速度,比gzip快一個數量級,但壓縮率會差一些(LZ4和Snappy的解壓速度比LZO高不少)
上表中的「是否可切分」表示數據流是否能夠搜索定位(seek)。
上面這些算法類都實現了CompressionCodec接口。
CompressionCodec接口包含兩個方法,能夠用於壓縮和解壓。若是要對數據流進行壓縮,能夠調用createOutputStream(OutputStream out)方法獲得CompressionOutputStream輸出流;若是要對數據流進行解壓,能夠調用createInputStream(InputStream in)方法獲得CompressionInputStream輸入流
CompressionOutputStream與CompressionInputStream相似java.util.zip.DeflaterOutputStream和java.util.zip.DeflaterInputStream,只不過前二者可以重置其底層的壓縮與解壓算法
示例:壓縮從標準輸入讀取的數據,而後將其寫到標準輸出
publicstaticvoid main(String[] args) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream("測試".getBytes("GBK"));
Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec");
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);// 壓縮流,構造時會輸出三字節的頭信息:31-117 8
//1F=16+15=31;負數是以補碼形勢存儲的,8B的二進制爲10001011,先減一獲得10001010,再除符號位各們取反獲得原碼11110101,即獲得 -117
System.out.println();
IOUtils.copyBytes(bais, out, 4096, false);// 將壓縮流輸出到標準輸出
out.finish();
System.out.println();
bais = new ByteArrayInputStream("測試".getBytes("GBK"));
ByteArrayOutputStream baos = new ByteArrayOutputStream(4);
out = codec.createOutputStream(baos);
IOUtils.copyBytes(bais, out, 4096, false);// 將壓縮流輸出到緩衝
out.finish();
bais = new ByteArrayInputStream(baos.toByteArray());
CompressionInputStream in = codec.createInputStream(bais);// 解壓縮流
IOUtils.copyBytes(in, System.out, 4096, false);// 將壓縮流輸出到標準輸出
// ---------將壓縮文件上傳到Hadoop中
// 注:hadoop默認使用的是UTF-8編碼,若是使用GBK上傳,使用 hadoop fs -text /gzip_test 命令
// 在Hadoop系統中查看時顯示不出來,但Down下來後能夠
bais = new ByteArrayInputStream("測試".getBytes("UTF-8"));
FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop-master:9000"), conf);
out = codec.createOutputStream(fs.create(new Path("/gzip_test.gz")));
IOUtils.copyBytes(bais, out, 4096);
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeStream(fsout);
}
在讀取一個壓縮文件時,能夠經過文件擴展名推斷須要使用哪一個codec,如以.gz結尾,則使用GzipCodec來讀取。能夠經過調用CompressionCodecFactory的getCodec()方法根據擴展名來獲得一個CompressionCodec
示例:根據文件擴展名自動選取codec解壓文件
publicclass FileDecompressor {
publicstaticvoid main(String[] args) throws Exception {
String uri = "hdfs://hadoop-master:9000/gzip_test.gz";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
// 根據文件的擴展名自動找到對應的codec
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
// 將解壓出的文件放在hdoop上的同一目錄下
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
CompressionCodecFactory從io.compression.codecs(core-site.xml配置文件裏)配置屬性裏定義的列表中找到codec:
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value>
<description>A list of the compression codec classes that can be used for compression/decompression.</description>
</property>
運行上面示例時,會報如下警告:
WARN [main] org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library not loaded
hdfs://hadoop-master:9000/gzip_test
WARN [main] org.apache.hadoop.io.compress.zlib.ZlibFactory - Failed to load/initialize native-zlib library
這是由於程序是在Windows上運行的,在本地沒有搜索到native類庫,而使用Java實現來進行壓縮與解壓。若是將程序打包上傳到Linux上運行時,第二個警告會消失:
[root@hadoop-master /root/tmp]# hadoop jar /root/tmp/FileDecompressor.jar
16/04/26 11:13:31 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/04/26 11:13:31 WARN snappy.LoadSnappy: Snappy native library not loaded
16/04/26 11:13:31 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
但第一個警告仍是有,緣由是Linux系統上沒有安裝snappy,下面安裝:
1、安裝snappy
yum install snappy snappy-devel
2、使得Snappy類庫對Hadoop可用
ln -sf /usr/lib64/libsnappy.so /root/hadoop-1.2.1/lib/native/Linux-amd64-64
再次運行:
[root@hadoop-master /root/hadoop-1.2.1/lib/native/Linux-amd64-64]# hadoop jar /root/tmp/FileDecompressor.jar
16/04/26 11:42:19 WARN snappy.LoadSnappy: Snappy native library is available
16/04/26 11:42:19 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/04/26 11:42:19 INFO snappy.LoadSnappy: Snappy native library loaded
16/04/26 11:42:19 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
與內置的Java實現相比,原生的gzip類庫能夠減小約束一半的解壓時間與約10%的壓縮時間,下表列出了哪些算法有Java實現,哪些有本地實現:
默認狀況下,Hadoop會根據自身運行的平臺搜索原生代碼庫,若是找到則自加載,因此無需爲了使用原生代碼庫而修改任何設置,可是,若是不想使用原生類型,則能夠修改hadoop.native.lib配置屬性(core-site.xml)爲false:
<property>
<name>hadoop.native.lib</name>
<value>false</value>
<description>Should native hadoop libraries, if present, be used.</description>
</property>
如何使用的是代碼庫,而且須要在應用中執行大量壓縮與解壓操做,能夠考慮使用CodecPool,它支持反覆使用壓縮祕解壓,減小建立對應的開銷
publicstaticvoid main(String[] args) throws Exception {
//注:這裏使用GBK,若是使用UTF-8,則輸出到標準時會亂碼,緣由操做系統標準輸出爲GBK解碼
ByteArrayInputStream bais = new ByteArrayInputStream("測試".getBytes("GBK"));
ByteArrayOutputStream bois = new ByteArrayOutputStream();
Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec");
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = null;
CompressionInputStream in = null;
Compressor cmpressor = null;// 壓縮實例
Decompressor decompressor = null;// 解壓實例
try {
// 從池中獲取或新建一個Compressor壓縮實例
cmpressor = CodecPool.getCompressor(codec);
// 從池中獲取或新建一個Compressor解壓縮實例
decompressor = CodecPool.getDecompressor(codec);
out = codec.createOutputStream(bois, cmpressor);
System.out.println();
IOUtils.copyBytes(bais, out, 4096, false);// 將壓縮流輸出到緩衝
out.finish();
bais = new ByteArrayInputStream(bois.toByteArray());
in = codec.createInputStream(bais, decompressor);// 解壓壓縮流
IOUtils.copyBytes(in, System.out, 4096, false);// 解壓後標準輸出
} finally {
IOUtils.closeStream(out);
CodecPool.returnCompressor(cmpressor);// 用完以後返回池中
CodecPool.returnDecompressor(decompressor);
}
}
若是壓縮數據超過塊大小後,會被分紅多塊,若是每一個片段數據單獨做傳遞給不一樣的Map任務,因爲gzip數據是不能單獨片段進行解壓的,因此會出問題。但實際上Mapreduce任務仍是能夠處理gzip文件的,只是若是發現(根據擴展名)是gz,就不會進行文件任務切分(其餘算法也同樣,只要不支持單獨片段解壓的,都會交給同一Map進行處理),而將這個文件塊都交個同一個Map任務進行處理,這樣會影響性能問題。
只有bzip2壓縮格式的文件支持數據任務的切分,哪些壓縮能切分請參考這裏
要想壓縮mapreduce做業的輸出(即這裏講的是對reduce輸出壓縮),應該在mapred-site.xml配置文件的配置項mapred.output.compress設置爲true,mapred.output.compression.code設置爲要使用的壓縮算法:
<property>
<name>mapred.output.compress</name>
<value>false</value>
<description>Should the job outputs be compressed?
</description>
</property>
<property>
<name>mapred.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.DefaultCodec</value>
<description>If the job outputs are compressed, how should they be compressed?
</description>
</property>
也能夠直接在做業啓動程序裏經過FileOutputFormat進行設置:
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "MaxTemperatureWithCompression");
job.setJarByClass(MaxTemperatureWithCompression.class);
//map的輸入能夠是壓縮格式的,也可直接是未壓縮的文本文件,輸入map前會自動根據文件後綴來判斷是否須要解壓,不須要特殊處理或配置
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop-master:9000/ncdc/1901_1902.txt.gz"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://hadoop-master:9000/ncdc/MaxTemperatureWithCompression"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//與mapred-site.xml配置文件裏的mapred.output.compress配置屬性等效:job輸出是否壓縮,即對reduce輸出是否採用壓縮
FileOutputFormat.setCompressOutput(job, true);
//與mapred-site.xml配置文件裏的mapred.output.compression.codec配置屬性等效
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
若是Job輸出生成的是順序文件(sequence file),則能夠設置mapred.output.compression.type(mapred-site.xml)來控制限制使用壓縮格式,默認值爲RECORD,表示針對每一條記錄進行壓縮。若是將其必爲BLOCK,將針對一組記錄進行壓縮,這也是推薦的壓縮策略,由於它的壓縮效率更高
<property>
<name>mapred.output.compression.type</name>
<value>RECORD</value>
<description>If the job outputs are to compressed as SequenceFiles, how should
they be compressed? Should be one of NONE, RECORD or BLOCK.
</description>
</property>
該屬性還能夠直接在JOB啓動任務程序裏經過SequenceFileOutputFormat的setOutputCompressionType()來設定
mapred-site.xml配置文件裏能夠對Job做業輸出壓縮進行配置的三個配置項:
若是對map階段的中間輸出進行壓縮,能夠得到很多好處。因爲map任務的輸出須要寫到磁盤並經過網絡傳輸到reducer節點,因此若是使用LZO、LZ4或者Snappy這樣的快速壓縮方式,是能夠得到性能提高的,由於要傳輸的數據減小了。
啓用map任務輸出壓縮和設置壓縮格式的三個配置屬性以下(mapred-site.xml):
也可在程序裏設定(新的API設置方式):
Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class,
CompressionCodec.class);
Job job = new Job(conf);
舊API設置方式,經過conf對象的方法設置:
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
publicinterface Writable {
void write(DataOutput out) throws IOException;//序列化:即將實例寫入到out輸出流中
void readFields(DataInput in) throws IOException;//反序列化:即從in輸出流中讀取實例
}
Hadoop中可序列化的類都實現了Writable這個接口,好比數據類型類BooleanWritable、ByteWritable、DoubleWritable、FloatWritable、IntWritable、LongWritable、Text
publicstaticvoid main(String[] args) throws IOException {
IntWritable iw = new IntWritable(163);
// 序列化
byte[] bytes = serialize(iw);
// Java裏整型佔兩個字節
System.out.println(StringUtils.byteToHexString(bytes).equals("000000a3"));//true
// 反序列化
IntWritable niw = new IntWritable();
deserialize(niw, bytes);
System.out.println(niw.get() == 163);//true
}
// 序列化
publicstaticbyte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);//最終仍是藉助於Java API中的ByteArrayOutputStream 與 DataOutputStream 來完成序列化:即將基本類型的值(這裏爲整數)轉換爲二進制的過程
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
// 反序列化
publicstaticvoid deserialize(Writable writable, byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputStream dataIn = new DataInputStream(in); //最終仍是藉助於Java API中的ByteArrayInputStream與 DataInputStream來完成反序列化:即將二進制轉換爲基本類型的值(這裏爲整數)的過程
writable.readFields(dataIn);
dataIn.close();
}
IntWritable類的序列化與反序列化實現:
publicclass IntWritable implements WritableComparable<IntWritable> {
privateintvalue;
@Override
publicvoidreadFields(DataInput in) throws IOException {
value = in.readInt();
}
@Override
publicvoidwrite(DataOutput out) throws IOException {
out.writeInt(value); //實質上最後就是將整型值以二進制存儲起來了
}
...
}
IntWritable實現了WritableComparable接口,而WritableComparable接口繼承了Writable接口與java.lang.Comparable接口
publicclassIntWritableimplements WritableComparable {
publicinterfaceWritableComparable<T> extendsWritable, Comparable<T> {
publicinterface Comparable<T> {
publicintcompareTo(T o);
}
IntWritable實現了Comparable的compareTo方法,具體實現:
/** Compares two IntWritables. */
publicint compareTo(Object o) {
int thisValue = this.value;
int thatValue = ((IntWritable)o).value;
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
除了實現了Comparable比較能力接口,Hadoop提供了一個優化接口是繼承自java.util.Comparator比較接口的RawComparator接口:
publicinterfaceRawComparator<T> extendsComparator<T> {
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
RawComparator:原生比較,即基於字節的比較
publicinterface Comparator<T> {
intcompare(T o1, T o2);
boolean equals(Object obj);
}
爲何說是優化接口呢?由於該接口中的比較方法能夠直接對字節進行比較,而不須要先反序列化後再比(由於是靜態內部類實現:
/** A WritableComparable for ints. */
publicclass IntWritable implements WritableComparable {
...
/** A Comparator optimized for IntWritable. */
publicstaticclass Comparator extends WritableComparator {
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
...
}
}
...
}
),這樣就避免了新建對象(即不須要經過反序列化重構Writable對象後,才能調用該對象的compareTo()比較方法)的額外開銷,而Comparable接口比較時是基於對象自己的(屬於非靜態實現):
/** A WritableComparable for ints. */
publicclass IntWritable implements WritableComparable {
...
/** Compares two IntWritables. */
publicint compareTo(Object o) {
...
}
...
}
),因此比較前須要對輸入流進行反序列重構成Writable對象後再比較,因此性能不高。如IntWritable的內部類IntWritable.Comparator就實現了RawComparator原生比較接口,性能比IntWritable.compareTo()比較方法高:
publicstaticclassComparatorextends WritableComparator {
public Comparator() {
super(IntWritable.class);
}
//這裏實現的其實是重寫WritableComparator裏的方法。注:雖然WritableComparator已經提供了該方法的默認實現,但不要直接使用,由於父類WritableComparator提供的默認實現也是先反序列化後,再經過回調IntWritable裏的compareTo()來完成比較的,因此咱們在爲自定義Key時,必定要本身重寫WritableComparator裏提供的默認實現
@Override
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);// readInt爲父類WritableComparator中的方法,將字節數組轉換爲整型(具體請參考後面),這樣不須要將字節數組反序列化成IntWritable後再進行大小比對,而是直接對IntWritable裏封裝的int value進行比對
int thatValue = readInt(b2, s2);
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
而WritableComparator又是實現了RawComparator接口中的compare()方法,同時還實現了Comparator類中的:
publicclass WritableComparator implements RawComparator{
publicint compare(byte[] b1, ints1, intl1, byte[] b2, ints2, intl2) {//該方法實現的是RawComparator接口裏的方法
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
buffer.reset(null, 0, 0); // clean up reference
} catch (IOException e) {
thrownew RuntimeException(e);
}
return compare(key1, key2); // compare them
}
@SuppressWarnings("unchecked")//該方法被上下兩個方法調用,是WritableComparator裏本身定義的方法,不是重寫或實現
publicint compare(WritableComparable a, WritableComparable b) {
returna.compareTo(b);
}
@Override//該方法實現的是Comparator的compare(T o1, T o2)方法
publicint compare(Object a, Object b) {
return compare((WritableComparable)a, (WritableComparable)b);
}
}
WritableComparable是一個接口;而WritableComparator 是一個類。WritableComparator提供一個默認的基於對象(非字節)的比較方法compare(如上面所貼),這與實現Comparable接口的比較方法是同樣的:都是基於對象的,因此性能也不高
獲取IntWritable的內部類Comparator的實例:
RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
這樣能夠取到RawComparator實例,緣由是在IntWritable實現裏註冊過
static { // register this comparator
WritableComparator.define(IntWritable.class, newComparator());
}
這個comparator實例能夠用於比較兩個IntWritable對象:
IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));// comparator.compare(w1, w2)會回調IntWritable.compareTo方法
或是IntWritable對象序列化的字節數組:
byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length),greaterThan(0));//這裏才真正調用IntWritable.Comparator.compare()方法進行原生比較
上面分析的是IntWritable類型,其餘類型基本上也是這樣
Key在Map的shuffle過程當中是須要進行排序的,這就要求Key是實現WritableComparable的類,或者若是不實現WritableComparable接口時,須要經過Job指定比較類,他們的優先選擇順序以下:
一、 若是配置了mapred.output.key.comparator.class比較類,或明確地經過job的setSortComparatorClass(Class<? extends RawComparator> cls)方法(舊API爲setOutputKeyComparatorClass() on JobConf)指定過,則使用指定類(通常從WritableComparator繼承)的實例進行排序(這種狀況要不須要WritableComparable,而只需實現Writable便可)
二、 不然,Key必須是實現了WritableComparable的類(由於在實現內部靜態比較器繼承時須要繼承WritableComparator,其構造函數須要傳進一個實現了WritableComparable的Key,並在WritableComparator類裏提供的默認比較會回調Key類所實現的compareTo()方法,因此須要實現WritableComparable類),而且若是該Key類內部經過靜態塊(WritableComparator.define(Class c, WritableComparator comparator))註冊過基於字節比較的類WritableComparator(實現RawComparator的抽象類,RawComparator又繼承了Comparator接口),則使用字節比較方式進行排序(通常使用這種)
三、 不然,若是沒有使用靜態註冊過內部實現WritableComparator,則使用WritableComparable的compareTo()進行對象比較(這須要先反序列化成對象以後)(注:此狀況下Key也必須是實現WritableComparable類)
Writable不少的實現類實質上是對Java基本類型(但除char沒有對應的Writable實現類外,char能夠存放在IntWritable中)的再一次封裝,get()、set()方法就是對這些封裝的基本值的讀取與設定:
從上表能夠看出,VIntWritable(1~5)與 VLongWritable(1~9)爲變長。若是數字在-112~127之間時,變長格式就只用一個字節進行編碼;不然,使用第一個字節來存放正負號,其餘字節就存放值(究竟須要多少字節來存放,則是看數字的大小,如int類型的值須要1~4個字節不等)。如 值爲163須要兩個字節,而不是4個字節:第一個字節存符號爲(不一樣長度的數這個字節存儲的不太同樣),第二個字節存放的是值;而257則須要三個字節來存放了;
可變長度類型有點像UTF-8同樣,編碼是變長的,若是傳輸內容中的數字都是比較小的數時(若是內容都是英文的字符,UTF-8就會大大縮短編碼長度),用可變長度則能夠減小數據量,這些數的範圍:-65536 =< VIntWritable =< 65535此範圍最多隻佔3字節,包括符號位;-281474976710656L =< VLongWritable =< 28147497671065L此範圍最多隻佔7字節,包括符號位,若是超過了這些數,建議使用定長的,由於此時定長的所佔字節還少,由於在接近最大Int或Long時,變長的VintWritable達到5個字節(如2147483647就佔5字節),VlongWritable達到9個字節(如9223372036854775807L就佔9字節),而定長的最多隻有4字節與8字節
另外,同一個數用VintWritable或VlongWritable最後所佔有字節數是同樣的,好比2147483647這個數,都是8c7fffffff,佔5字節,既然同一數字的編碼長度都同樣,因此優先推薦使用 VlongWritable,由於他存儲的數比VintWritable更大,有更好的擴展
雖然VintWritable與VlongWritable所佔最大字節可能分別達到5或9位,但它們容許的最大數的範圍也 基本類型 int、long是同樣的,即VintWritable容許的數字範圍:-2147483648 =< VintWritable =< 2147483647;VlongWritable容許的數字範圍:-9223372036854775808L =< VlongWritable =< 9223372036854775807L,由於它們的構造函數參數的類型就是基本類型int、long:
public VIntWritable(int value) { set(value); }
public VLongWritable(long value) { set(value); }
提供了序列化、反序列化和在字節級別上比較文本的方法。它的長度類型是整型,採用0壓縮序列化格式。另外,它還支持在不將字符數組轉換爲字符串的狀況下進行字符串遍歷
至關於Java中的String類型,採用UTF-8編解碼,它是對 byte[] 字節數組的封裝,而不直接是String:
length存儲了字符串所佔的字節數,爲int類型,因此最大可達2GB。
getLength():返回的是字節數組bytes的所存儲內容的有效長度,而非字符串個數,取長度時不要直接經過getBytes().length來獲取,由於在經過set()方法重置Text後,有時數組整個長度會大於所存內容的長度
getBytes():返回字符串原生字節數組,但數據的有效長度到getLength()
與String不一樣的是,Text是可變的,能夠經過set()方法重用它
Text索引位置都是以字節爲單位進行索引的,並不像String那樣是以字符爲單位進行索引的
Text與IntWritable同樣,也是可序列化與可比較的
因爲Text在內存中使用的是UTF-8編碼的字節碼,而Java中的String則是Unicode編碼,因此是有區別的
Text t = new Text("江正軍");
//字符所佔字節數,而非字符個數
System.out.println(t.getLength());// 9 UTF-8編碼下每一箇中文佔三字節
//取單個字符,charAt()返回的是Unicode編碼
System.out.println((char) t.charAt(0));
System.out.println((char) t.charAt(3));// 第二個字符,注意:傳入的是byte數組中的索引,不是字符位置索引
System.out.println((char) t.charAt(6));
//轉換成String
System.out.println(t.toString());// 江正軍
ByteBuffer buffer = ByteBuffer.wrap(t.getBytes(), 0, t.getLength());
int cp;
// 遍歷每一個字符
while (buffer.hasRemaining() && (cp = Text.bytesToCodePoint(buffer)) != -1) {
System.out.println((char) cp);
}
// 在末尾附加字符
t.append("江".getBytes("UTF-8"), 0, "江".getBytes("UTF-8").length);
System.out.println(t.toString());// 江正軍江
// 查找字符:返回第一次出現的字符位置(也是在字節數組中的偏移量,而非字符位置),相似String的indexOf,注:這個位置指字符在UTF-8字節數組的索引位置,而不是指定字符所在位置
System.out.println(t.find("江"));// 0
System.out.println(t.find("江", 1));// 9 從第2個字符開始向後查找
Text t2 = new Text("江正軍江");
//比較Text:若是相等,返回0
System.out.println(t.compareTo(t2));// 0
System.out.println(t.compareTo(t2.getBytes(), 0, t2.getLength()));//0
下表列出Text字符(實爲UTF-8字符)與String(實爲Unicode字符)所佔字節:若是是拉丁字符如大寫字母A,則存放在Text中只佔一個字節,而String佔用兩字節;大於127的都佔有兩字節;漢字時Text佔有三字節,String佔兩字節;後面的U+10400不知道是什麼擴展字符?反正表示一個字符,但都佔用了4個字節:
@Test
publicvoid string() throws UnsupportedEncodingException {
String s = "\u0041\u00DF\u6771\uD801\uDC00";
assertThat(s.length(), is(5));
assertThat(s.getBytes("UTF-8").length, is(10));
assertThat(s.indexOf("\u0041"), is(0));
assertThat(s.indexOf("\u00DF"), is(1));
assertThat(s.indexOf("\u6771"), is(2));
assertThat(s.indexOf("\uD801\uDC00"), is(3));
assertThat(s.charAt(0), is('\u0041'));
assertThat(s.charAt(1), is('\u00DF'));
assertThat(s.charAt(2), is('\u6771'));
assertThat(s.charAt(3), is('\uD801'));
assertThat(s.charAt(4), is('\uDC00'));
assertThat(s.codePointAt(0), is(0x0041));
assertThat(s.codePointAt(1), is(0x00DF));
assertThat(s.codePointAt(2), is(0x6771));
assertThat(s.codePointAt(3), is(0x10400));
}
@Test
publicvoid text() {
Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
assertThat(t.getLength(), is(10));
assertThat(t.find("\u0041"), is(0));
assertThat(t.find("\u00DF"), is(1));
assertThat(t.find("\u6771"), is(3));
assertThat(t.find("\uD801\uDC00"), is(6));
assertThat(t.charAt(0), is(0x0041));
assertThat(t.charAt(1), is(0x00DF));
assertThat(t.charAt(3), is(0x6771));
assertThat(t.charAt(6), is(0x10400));
}
與Text同樣,BytesWritable是對二進制數據的封裝
序列化時,前4個字節存儲了字節數組的長度:
publicstaticvoid main(String[] args) throws IOException {
BytesWritable b = new BytesWritable(newbyte[] { 3, 5 });
byte[] bytes = serialize(b);
System.out.println((StringUtils.byteToHexString(bytes)));//000000020305
}
// 序列化
publicstaticbyte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
BytesWritable也是可變的,能夠經過set()方法進行修改。與Text同樣,BytesWritable的getBytes()返回的是字節數組長——容量——也能夠沒法體現所存儲的實際大小,能夠經過getLength()來肯定實際大小,能夠經過 setCapacity(int new_cap) 方法重置緩衝大小
它是一個Writable特殊類,它序列化長度爲0,即不從數據流中讀取數據,也不寫入數據,充當佔位符。如在MapReduce中,若是你不須要使用鍵或值,你就能夠將鍵或值聲明爲NullWritable
它是一個單例,能夠經過NullWritable.get()方法獲取實例
ObjectWritable是對Java基本類型、String、enum、Writable、null或這些類型組成的一個通用封裝:
當一個字段中包含多個類型時(好比在map輸出多種類型時),ObjectWritable很是有用,例如:若是SequenceFile中的值包含多個類型,就能夠將值類型聲明爲ObjectWritable。
能夠經過getDeclaredClass()獲取ObjectWritable封裝的類型
ObjectWritable在序列會時會將封裝的類型名一併輸出,這會浪費空間,咱們能夠使用GenericWritable來解決這個問題:若是封裝的類型數量比較少而且可以提交知道須要封裝哪些類型,那麼就能夠繼承GenericWritable抽象類,並實現這個類將要對哪些類型進行封裝的抽象方法:
abstractprotected Class<? extends Writable>[] getTypes();
這們在序列化時,就沒必要直接輸出封裝類型名稱,而是這些類型的名稱的索引(在GenericWritable內部會它他們分配編號),這樣就減小空間來提升性能
class MyWritable extendsGenericWritable{
MyWritable(Writable writable) {
set(writable);
}
publicstatic Class<? extends Writable>[] CLASSES = new Class[] { Text.class };
@Override
protected Class<? extends Writable>[] getTypes() {
returnCLASSES;
}
publicstaticvoid main(String[] args) throws IOException {
Text text = new Text("\u0041\u0071");
MyWritable myWritable = new MyWritable(text);
System.out.println(StringUtils.byteToHexString(serialize(text)));// 024171
System.out.println(StringUtils.byteToHexString(serialize(myWritable)));// 00024171
ObjectWritable ow = new ObjectWritable(text); //00196f72672e6170616368652e6861646f6f702e696f2e5465787400196f72672e6170616368652e6861646f6f702e696f2e54657874024171 紅色前面都是類型名序列化出來的結果,佔用了很大的空間
System.out.println(StringUtils.byteToHexString(serialize(ow)));
}
publicstaticbyte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}
}
GenericWritable的序列化只是把類型在type數組裏的索引放在了前面,這樣就比ObjectWritable節省了不少空間,因此推薦你們使用GenericWritable
6種集合類:ArrayWritable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable,SortedMapWritable, EnumSetWritable.
ArrayWritable與TwoDArrayWritable是對Writable的數組和二維數據(數組的數組)的實現:
ArrayWritable與TwoDArrayWritable中全部元素必須是同一類型的實例(在構造函數中指定):
ArrayWritable writable = new ArrayWritable(Text.class);
TwoDArrayWritable writable = new TwoDArrayWritable(Text.class);
ArrayWritable與TwoDArrayWritable都有get、set、toArray方法,注:toArray方法是獲取的數組(或二維數組)的副本(淺複製,雖然數組殼是複製了一份,只裏面存放的元素未深度複製)
publicvoid set(Writable[] values) { this.values = values; }
publicWritable[] get() { returnvalues; }
publicvoid set(Writable[][] values) { this.values = values; }
publicWritable[][] get() { returnvalues; }
ArrayPrimitiveWritable是對Java基本數組類型的一個封裝,調用set()方法時能夠識別相應組件類型,所以無需經過繼承來設置類型
MapWritable 與 SortedMapWritable分別實現了java.util.Map<Writable,Writable> 與 java.util.SortedMap<WritableComparable, Writable>接口。它們在序列化時,類型名稱也是使用索引來替代一塊兒輸出,若是存入的是自定義Writable內,則不要超過127個,因它這兩個類裏面是使用一個byte來存放自定義Writable類型名稱索引的,而那些標準的Writable則使用-127~0之間的數字來編號索引
對集合的枚舉類型能夠採用EnumSetWritable。對於單類型的Writable列表,使用ArrayWritable就足夠了。但若是須要把不一樣的Writable類型存放在單個列表中,能夠使用GenericWritable將元素封裝在一個ArrayWritable中
Hadoop中提供的現有的一套標準Writable是能夠知足咱們決大多數需求的。但在某些業務下需咱們定義具備本身數據結構的Writable。
定製的Writable能夠徹底控制二進制表示和排序順序。因爲Writable是MapReduce數據路徑的核心,因此調整二進制表示能對性能產生顯著效果。雖然Hadoop自帶的Writable實現已通過很好的性能調優,但若是但願將結構調整得更好,更好的作法就是新建一個Writable類型
示例:存儲一對Text對象的自定義Writable,若是是Int整型,能夠參考後面示例IntPair,若是複合鍵若是由整型與字符型組成,則可能同時參考這兩個類來定義:
publicclassTextPairimplements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
publicvoid set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
returnfirst;
}
public Text getSecond() {
returnsecond;
}
@Override
publicvoid write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
publicvoid readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
/*
* HashPartitioner(MapReuce中的默認分區類)一般用hashcode()方法來選擇reduce分區,所
* 以應該確保有一個比較好的哈希函數來保證每一個reduce數據分區大小類似
*/
@Override
publicint hashCode() {
returnfirst.hashCode() * 163 + second.hashCode();
}
@Override
publicboolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
returnfirst.equals(tp.first) && second.equals(tp.second);
}
returnfalse;
}
/*
* TextOutputFormat將鍵或值輸出時,會調用此方法,因此也需重寫
*/
@Override
public String toString() {
returnfirst + "\t" + second;
}
/*
* 除VIntWritable、VLongWritable這兩個Writable外,大多數的Writable類自己都實現了
* Comparable比較能力的接口compareTo()方法,而且又還在Writable類靜態的實了Comparator
* 比較接口的compare()方法,這兩個方法在Writable中的實現的性能是不同的:Comparable.
* compareTo()方法在比較前,須要將字節碼反序列化成相應的Writable實例後,才能調用;而
* Comparator.compare()比較前是不須要反序列化,它能夠直接對字節碼(數組)進行比較,所
* 以這個方法的性能比較高,屬於原生比較
*
* VIntWritable、VLongWritable這兩個類裏沒有靜態的實現Comparator接口,多是由於
* 變長的緣由,
*
*/
@Override//WritableComparator裏自定義比較方法 compare(WritableComparable a, WritableComparable b) 會回調此方法
publicintcompareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {//先按第一個字段比,若是相等,再比較第二個字段
return cmp;
}
returnsecond.compareTo(tp.second);
}
//(整型類型IntWritable基於字節數組原生比較請參考這裏)
publicstaticclassComparatorextends WritableComparator {
privatestaticfinal Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
//或者這樣來獲取Text.Comparator實例?
// RawComparator<IntWritable> comparator = WritableComparator.get(Text.class);
public Comparator() {
super(TextPair.class);
}
/*
* 這個方法(下面註釋掉的方法)從Text.Comparator.compare()方法拷過來的 l1、l2表示字節數有效的長度
*
* 因爲Text在序列化時(這一序列化過程可參照Text的序列化方法write()源碼來了解):首先是將Text的有效字節數 length
* 以VIntWritable方式序列化(即length在序列化時所在字節爲 1~5), 而後再將整個字節數組序列化
* (字節數組序列化時也是先將字節有效長度輸出,不過此時爲Int,而非VInt,請參考後面貼出的源碼)
* 下面是Text的序列化方法源碼:
* public void write(DataOutput out) throws IOException {
* WritableUtils.writeVInt(out, length);
* out.write(bytes,0, length);
* }
*
* 下面是BytesWritable的序列化方法源碼:
* public void write(DataOutput out) throws IOException {
* out.writeInt(size);
* out.write(bytes, 0, size);
* }
*
* WritableUtils.decodeVIntSize(b1[s1]):讀出Text序列化出的串前面多少個字節是用來表示Text的長度的,
* 這樣在取Text字節內容時要跳過長度信息串。傳入時只需傳入字節數組的第一個字節便可
*
* compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2):此方法纔是真正按一個個字節進行大小比較
* b1從s1 + n1開始l1 - n1個字節纔是Text真正字節內容
*
*/
// public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {//此方法是從Text.Comparator中拷出來的
// int n1 = WritableUtils.decodeVIntSize(b1[s1]);//序列化串中前面多少個字節是長度信息
// int n2 = WritableUtils.decodeVIntSize(b2[s2]);
// return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
// }
@Override
publicintcompare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
//WritableUtils.decodeVIntSize(b1[s1])表示Text有效長度序列化輸出佔幾個字節
//readVInt(b1, s1):將Text有效字節長度是多少讀取出來。
//最後firstL1 表示的就是第一個Text屬性成員序列化輸出的有效字節所佔長度
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
//比較第一個Text:即first屬性。自己Text裏就有Comparator的實現,這裏只須要將first
//與second所對應的字節截取出來,再調用Text.Comparator.compare()即根據字節進行比較
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}//若是第一個Text first 不等,則比較第二個Test:即second屬性
//s1 + firstL1爲第二個Text second的起始位置,l1 - firstL1爲第二個Text second的字節數
returnTEXT_COMPARATOR
.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
thrownew IllegalArgumentException(e);
}
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
}
SequenceFile:順序文件、或叫序列文件。它是一種具備必定存儲結構的文件,數據以在內存中的二進制寫入。Hadoop在讀取與寫入這類文件時效率會高
順序文件——相對於MapFile只能順序讀取,因此稱順序文件
序列文件——寫入文件時,直接將數據在內存中存儲的二進寫入到文件,因此寫入後使用記事本沒法直接閱讀,但使用程序反序列化後或經過Hadoop命令能夠正常閱讀顯示:hadoop fs -text /sequence/seq1
SequenceFile類提供了Writer,Reader 和 SequenceFile.Sorter 三個類用於完成寫,讀,和排序
publicclass SequenceFileWriteDemo {
privatestaticfinal String[] DATA = { "One, 一", "Three, 三", "Five, 五", "Seven, 七", "Nine, 九" };
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/sequence/seq1";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
/*
* 該方法有不少的重載版本,但都須要指定FileSystem+Configuration(或FSDataOutputStream+Configuration)
* 、鍵的class、值的class;另外,其餘可選參數包括壓縮類型CompressionType以及相應的CompressionCodec
* 、 用於回調通知寫入進度的Progressable、以及在Sequence文件頭存儲的Metadata實例
*
* 存儲在SequenceFile中的鍵和值並不必定須要Writable類型,只要能被Serialization序列化和反序列化
* ,任何類型均可以
*/
// 經過靜態方法獲取順序文件SequenceFile寫實例SequenceFile.Writer
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < 10; i++) {
key.set(10 - i);
value.set(DATA[i % DATA.length]);
// getLength()返回文件當前位置,後繼將今後位置接着寫入(注:當SequenceFile剛建立時,就已
// 寫入元數據信息,因此剛建立後getLength()也是非零的
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
/*
* 同步點:用來快速定位記錄(鍵值對)的邊界的一個特殊標識,在讀取SequenceFile文件時,能夠經過
* SequenceFile.Reader.sync()方法來搜索這個同步點,便可快速找到記錄的起始偏移量
*
* 加入同步點的順序文件能夠做爲MapReduce的輸入,因爲訪類順序文件容許切分,因此該文件的不一樣部分能夠
* 由不一樣的map任務單獨處理
*
* 在每條記錄(鍵值對)寫入前,插入一個同步點,這樣是方便讀取時,快速定位每記錄的起始邊界(若是讀取的
* 起始位置不是記錄邊界,則會拋異常SequenceFile.Reader.next()方法會拋異常)
*
* 在真正項目中,可能不是在每條記錄寫入前都加上這個邊界同步標識,而是以業務數據爲單位(多條記錄)加入
* ,這裏只是爲了測試,因此每條記錄前都加上了
*/
writer.sync();
// 只能在文件末尾附加健值對
writer.append(key, value);
}
} finally {
// SequenceFile.Writer實現了java.io.Closeable,能夠關閉流
IOUtils.closeStream(writer);
}
}
}
寫入後在操做系統中打開顯示亂的:
從上面能夠看出這種文件的前面會寫入一些元數據信息:鍵的Class、值的Class,以及壓縮等信息
若是使用Hadoop來看,則仍是能夠正常顯示的,由於該命令會給咱們反序列化後再展現出來:
publicclass SequenceFileReadDemo {
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/sequence/seq1";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
// 經過SequenceFile.Reader實例進行讀
reader = new SequenceFile.Reader(fs, path, conf);
/*
* 經過reader.getKeyClass()方法從SequenceFile文件頭的元信息中讀取鍵的class類型
* 經過reader.getValueClass()方法從SequenceFile文件頭的元信息中讀取值的class類型
* 而後經過ReflectionUtils工具反射獲得Key與Value類型實例
*/
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
// 返回當前位置,今後位置讀取下一健值對
long position = reader.getPosition();
// 讀取下一健值對,並分別存入key與value變量中,若是到文件尾,則返回false
while (reader.next(key, value)) {
// 若是讀取的記錄(鍵值對)前有邊界同步特殊標識時,則打上*
String syncSeen = reader.syncSeen() ? "*" : "";
// position爲當前輸入鍵值對的起始偏移量
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next
// record下一對健值對起始偏移量
}
System.out.println();
//設置讀取的位置,注:必定要是鍵值對起始偏移量,即記錄的邊界位置,不然拋異常
reader.seek(228);
System.out.print("[" + reader.getPosition() + "]");
reader.next(key, value);
System.out.println(key + " " + value + " [" + reader.getPosition() + "]");
//這個方法與上面seek不一樣,傳入的位置參數不須要是記錄的邊界起始偏移的準確位置,根據邊界同步特殊標記能夠自動定位到記錄邊界,這裏從223位置開始向後搜索第一個同步點
reader.sync(223);
System.out.print("[" + reader.getPosition() + "]");
reader.next(key, value);
System.out.println(key + " " + value + " [" + reader.getPosition() + "]");
} finally {
IOUtils.closeStream(reader);
}
}
}
hadoop fs –text命令除能夠顯示純文本文件,還能夠以文本形式顯示SequenceFile文件、MapFile文件、gzip壓縮文件,該命令能夠自動力檢測出文件的類型,根據檢測出的類型將其轉換爲相應的文本。
對於SequenceFile文件、MapFile文件,會調用Key與Value的toString方法來顯示成文本,因此要重寫好自定義的Writable類的toString()方法
MapReduce是對一個或多個順序文件進行排序(或合併)最好的方法。MapReduce自己是並行的,並就能夠指定reducer的數量(即分區數),如指定1個reducer,則只會輸出一個文件,這樣就能夠將多個文件合併成一個排序文件了。
除了本身寫這樣一個簡單的排序合併MapReduce外,咱們能夠直接使用Haddop提供的官方實例來完成排序合併,如將前面寫章節中產生的順序文件從新升級排序(原輸出爲降序):
[root@hadoop-master /root/hadoop-2.7.2/share/hadoop/mapreduce]# hadoop jar ./hadoop-mapreduce-examples-2.7.2.jar sort-r 1 -inFormat org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat -outKey org.apache.hadoop.io.IntWritable -outValue org.apache.hadoop.io.Text /sequence/seq1 /sequence/seq1_sorted
[root@hadoop-master /root/hadoop-2.7.2/share/hadoop/mapreduce]# hadoop fs -text /sequence/seq1_sorted/part-r-00000
1 Nine, 九
2 Seven, 七
3 Five, 五
4 Three, 三
5 One, 一
6 Nine, 九
7 Seven, 七
8 Five, 五
9 Three, 三
10 One, 一
System.out.println("sort [-r <reduces>] " + //reduces的數量
"[-inFormat <input format class>] " +
"[-outFormat <output format class>] " +
"[-outKey <output key class>] " +
"[-outValue <output value class>] " +
"[-totalOrder <pcnt> <num samples> <max splits>] " +
"<input> <output>");
注:官方提供的Sort示例除了排序合併順序文件外,還能夠合併普通的文本文件,下面是它的部分源碼:
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(num_reduces);
job.setInputFormatClass(inputFormatClass);
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(outputKeyClass);
job.setOutputValueClass(outputValueClass);
順序文件由文件頭Header、隨後的一條或多條記錄Record、以及記錄間邊界同步點特殊標識符Sync(可選):
此圖爲壓縮前和記錄壓縮Record compression後的順序文件的內部結構
順序文件的前三個字節爲SEQ(順序文件代碼),緊隨其後的一個字節表示順序文件的版本號,文件頭還包括其餘字段,例如鍵和值的名稱、數據壓縮細節、用戶定義的元數據,此外,還包含了一些同步標識,用於快速定位到記錄的邊界。
每一個文件都有一個隨機生成的同步標識,存儲在文件頭中。同步標識位於順序文件中的記錄與記錄之間,同步標識的額外存儲開銷要求小於1%,因此沒有必要在每條記錄末尾添加該標識,特別是比較短的記錄
記錄的內部結構取決因而否啓用壓縮,SeqeunceFile支持兩種格式的數據壓縮,分別是:記錄壓縮record compression和塊壓縮block compression。
record compression如上圖所示,是對每條記錄的value進行壓縮
默認狀況是不啓用壓縮,每條記錄則由記錄長度(字節數)Record length、健長度Key length、鍵Key和值Value組成,長度字段佔4字節
記錄壓縮(Record compression)格式與無壓縮狀況基本相同,只不過記錄的值是用文件頭中定義的codec壓縮的,注,鍵沒有被壓縮(指記錄壓縮方式的Key是不會被壓縮的,而若是是塊壓縮方式的話,整個記錄的各個部分信息都會被壓縮,請看下面塊壓縮)
塊壓縮(Block compression)是指一次性壓縮多條記錄,由於它能夠利用記錄間的類似性進行壓縮,因此比單條記錄壓縮方式要好,塊壓縮效率更高。block compression是將一連串的record組織到一塊兒,統一壓縮成一個block:
上圖:採用塊壓縮方式以後,順序文件的內部結構,記錄的各個部分都會被壓縮,不僅是Value部分
能夠不斷向數據塊中壓縮記錄,直到塊的字節數不小於io.seqfile.compress.blocksize(core-site.xml)屬性中設置的字節數,默認爲1MB:
<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
<description>The minimum block size for compression in block compressed SequenceFiles.
</description>
</property>
每個新塊的開始處都須要插入同步標識,block數據塊的存儲格式:塊所包含的記錄數(vint,1~5個字節,不壓縮)、每條記錄Key長度的集合(Key長度集合表示將全部Key長度信息是放在一塊兒進行壓縮)、每條記錄Key值的集合(全部Key放在一塊兒再起壓縮)、每條記錄Value長度的集合(全部Value長度信息放在一塊兒再進行壓縮)和每條記錄Value值的集合(全部值放在一塊兒再進行壓縮)
MapFile是已經排過序的SequenceFile,它有索引,索引存儲在另外一單獨的index文件中,因此能夠按鍵進行查找,注:MapFile並未實現java.util.Map接口
MapFile是對SequenceFile的再次封裝,分爲索引與數據兩部分:
publicclass MapFile {
/** The name of the index file. */
publicstaticfinal String INDEX_FILE_NAME = "index";
/** The name of the data file. */
publicstaticfinal String DATA_FILE_NAME = "data";
publicstaticclass Writer implements java.io.Closeable {
private SequenceFile.Writer data;
private SequenceFile.Writer index;
/** Append a key/value pair to the map. The key must be greater or equal
* to the previous key added to the map. 在Append時,Key的值必定要大於或等於前面的已加入的值,即升序,不然拋異常*/
publicsynchronizedvoid append(WritableComparable key, Writable val)
throws IOException {
...
publicstaticclass Reader implements java.io.Closeable {
// the data, on disk
private SequenceFile.Reader data;
private SequenceFile.Reader index;
...
與SequenceFile同樣,也是使用append方法在文件末寫入,並且鍵要是WritableComparable類型的具備比較能力的Writable,值與SequenceFile同樣也是Writable類型便可
privatestaticfinal String[] DATA = { "One, 一", "Three, 三", "Five, 五", "Seven, 七", "Nine, 九" };
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/map";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
IntWritable key = new IntWritable();
Text value = new Text();
MapFile.Writer writer = null;
try {
/*
* 注:在建立writer時與SequenceFile不太同樣,這裏傳進去的是URI,而不是具體文件的Path,
* 這是由於MapFile會生成兩個文件,一個是data文件,一個是index文件,能夠查看MapFile源碼:
* //The name of the index file.
* public static final String INDEX_FILE_NAME = "index";
* //The name of the data file.
* public static final String DATA_FILE_NAME = "data";
*
* 因此不須要具體的文件路徑,只傳入URI便可,且傳入的URI只到目錄級別,即便包含文件名也會看做目錄
*/
writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass());
for (int i = 0; i < 1024; i++) {
key.set(i);
value.set(DATA[i % DATA.length]);
// 注:append時,key的值要大於等前面已加入的鍵值對
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
[root@localhost /root]# hadoop fs -ls /map
Found 2 items
-rw-r--r-- 3 Administrator supergroup 430 2016-05-01 10:24 /map/data
-rw-r--r-- 3 Administrator supergroup 203 2016-05-01 10:24 /map/index
會在map目錄下建立兩個文件data與index文件,這兩個文件都是SequenceFile
[root@localhost /root]# hadoop fs -text /map/data | head
0 One, 一
1 Three, 三
2 Five, 五
3 Seven, 七
4 Nine, 九
5 One, 一
6 Three, 三
7 Five, 五
8 Seven, 七
9 Nine, 九
[root@localhost /root]# hadoop fs -text /map/index
0 128
128 4013
256 7918
384 11825
512 15730
640 19636
768 23541
896 27446
Index文件存儲了部分鍵(上面顯示的第一列)及在data文件中的起使偏移量(上面顯示的第二列)。從index輸出能夠看到,默認狀況下只有每隔128個鍵纔有一個包含在index文件中,固然這個間隔是能夠調整的,可調用MapFile.Writer實例的setIndexInterval()方法來設置(或者經過io.map.index.interval屬性配置也可)。增長索引間隔大小能夠有效減小MapFile存儲索引所須要的內存,相反,若是減少間隔則能夠提升查詢效率。由於索引index文件只保留一部分鍵,因此MapFile不可以提供枚舉或計算全部的鍵的方法,惟一的辦法是讀取整個data文件
下面能夠根據index的索引seek定位到相應位置後讀取相應記錄:
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/map/data";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.seek(4013);
System.out.print("[" + reader.getPosition() + "]");
reader.next(key, value);
System.out.println(key + " " + value + " [" + reader.getPosition() + "]");
} finally {
IOUtils.closeStream(reader);
}
}
[4013]128 Seven, 七 [4044]
MapFile遍歷文件中全部記錄與SequenceFile同樣:先建一個MapFile.Reader實例,而後調用next()方法,直到返回爲false到文件尾:
/** Read the next key/value pair in the map into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
* the end of the map */
publicsynchronizedboolean next(WritableComparable key, Writable val)
throws IOException {
經過調用get()方法能夠隨機訪問文件中的數據:
/** Return the value for the named key, or null if none exists. */
publicsynchronized Writable get(WritableComparable key, Writable val)
throws IOException {
根據指定的key查找記錄,若是返回null,說明沒有相應的條目,若是找到相應的key,則將該鍵對應的值存入val參變量中
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/map/data";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
MapFile.Reader reader = null;
try {
//構造時,路徑只須要傳入目錄便可,不能到data文件
reader = new MapFile.Reader(fs, "hdfs://hadoop-master:9000/map", conf);
IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
key.set(255);
//根據給定的key查找相應的記錄
reader.get(key, value);
System.out.println(key + " " + value);// 255 One, 一
} finally {
IOUtils.closeStream(reader);
}
}
get()時,MapFile.Reader首先將index文件讀入內存,接着對內存中的索引進行二分查找,最後在index中找到小於或等於搜索索引的鍵255,這裏即爲128,對應的data文件中的偏移量爲4013,而後從這個位置順序讀取每條記錄,拿出Key一個個與255進行對比,這裏很不幸運,須要比較128(由io.map.index.interval決定)次直到找到鍵255爲止。
getClosest()方法與get()方法相似,只不過它返回的是與指定鍵匹配的最接近的鍵,而不是在不匹配的返回null,更準確地說,若是MapFile包含指定的鍵,則返回對應的條目;不然,返回MapFile中的第一個大於(或小於,由相應的boolean參數指定)指定鍵的鍵
大型MapFile的索引全加載到內存會佔據大量內存,若是不想將整個index加載到內存,不須要修改索引間隔以後再重建索引,而是在讀取索引時設置io.map.index.skip屬性(編程時可經過Configuration來設定)來加載必定比例的索引鍵,該屬性一般設置爲0,意味着加載index時不跳過索引鍵所有加載;若是設置爲1,則表示加載index時每次跳過索引鍵中的一個,這樣索引會減半;若是設置爲2,則表示加載index時每次讀取索引時跳過2個鍵,這樣只加載索引的三分一的鍵,以此類推,設置的值越大,節省大量內存,但增長搜索時間
l SetFile是一個特殊的MapFile,用於只存儲Writable鍵的集合,鍵必須升序添加:
publicclass SetFile extends MapFile {
publicstaticclass Writer extends MapFile.Writer {
/** Append a key to a set. The key must be strictly greater than the
* previous key added to the set. */
publicvoid append(WritableComparable key) throws IOException{
append(key, NullWritable.get());//只存鍵。因爲調用MapFile.Writer.append()方法實現,因此鍵也只能升序添加
}
. . .
/** Provide access to an existing set file. */
publicstaticclass Reader extends MapFile.Reader {
/** Read the next key in a set into <code>key</code>. Returns
* true if such a key exists and false when at the end of the set. */
publicboolean next(WritableComparable key)
throws IOException {
return next(key, NullWritable.get());//也只讀取鍵
}
l ArrayFile也是一個特殊的MapFile,鍵是一個整型,表示數組中的元素索引,而值是一個Writable值
publicclass ArrayFile extends MapFile {
/** Write a new array file. */
publicstaticclass Writer extends MapFile.Writer {
private LongWritable count = new LongWritable(0);
/** Append a value to the file. */
publicsynchronizedvoid append(Writable value) throws IOException {
super.append(count, value); // add to map 鍵是元素索引
count.set(count.get()+1); // increment count 每添加一個元素後,索引加1
}
. . .
/** Provide access to an existing array file. */
publicstaticclass Reader extends MapFile.Reader {
private LongWritable key = new LongWritable();
/** Read and return the next value in the file. */
publicsynchronized Writable next(Writable value) throws IOException {
return next(key, value) ? value : null;//只返回值
}
/** Returns the key associated with the most recent call to {@link
* #seek(long)}, {@link #next(Writable)}, or {@link
* #get(long,Writable)}. */
publicsynchronizedlong key() throws IOException {//若是知道是第幾個元素,則是能夠調用此方法
returnkey.get();
}
/** Return the <code>n</code>th value in the file. */
publicsynchronized Writable get(long n, Writable value)//根據數組元素索引取值
throws IOException {
key.set(n);
return get(key, value);
}
l BloomMapFile文件構建在MapFile的基礎之上:
publicclass BloomMapFile {
publicstaticfinal String BLOOM_FILE_NAME = "bloom";
publicstaticclass Writer extends MapFile.Writer {
惟一不一樣之處就是,除了data與index兩個文件外,還增長了一個bloom文件,該bloom文件主要包含一張二進制的過濾表,該過濾表能夠提升key-value的查詢效率。在每一次寫操做完成時,會更新這個過濾表,其實現源代碼以下:
publicclass BloomMapFile {
publicstaticclass Writer extends MapFile.Writer {
publicsynchronizedvoid append(WritableComparable key, Writable val)
throws IOException {
super.append(key, val);
buf.reset();
key.write(buf);
bloomKey.set(byteArrayForBloomKey(buf), 1.0);
bloomFilter.add(bloomKey);
}
它有兩個調優參數,一個是io.mapfile.bloom.size,指出map文件中大概有多少個條目;另外一個是io.mapfile.bloom.error.rate , BloomMapFile中使用布隆過濾器失敗比率. 若是減小這個值,使用的內存會成指數增加。
VERSION: 過濾器的版本號;
nbHash: 哈希函數的數量;
hashType: 哈希函數的類型;
vectorSize: 過濾表的大小;
nr: 該BloomFilter可記錄key的最大數量;
currentNbRecord: 最後一個BloomFilter記錄key的數量;
numer: BloomFilter的數量;
vectorSet: 過濾表;
前提是SequenceFile裏是按鍵升序存放的,這樣才能夠爲它建立index文件
publicclass MapFileFixer {
publicstaticvoid main(String[] args) throws Exception {
String mapUri = "hdfs://hadoop-master:9000/sequence2map";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(mapUri), conf);
Path map = new Path(mapUri);
//若是data文件名不是data也是能夠的,但這裏爲默認的data,因此指定MapFile.DATA_FILE_NAME便可
Path mapData = new Path(map, MapFile.DATA_FILE_NAME);
// Get key and value types from data sequence file
SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);
Class keyClass = reader.getKeyClass();
Class valueClass = reader.getValueClass();
reader.close();
// Create the map file index file
long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);
System.out.printf("Created MapFile %s with %d entries\n", map, entries);
}
}
fix()方法一般用於重建已損壞的索引,若是要將某個SequenceFile轉換爲MapFile,則通常通過如下幾步:
一、 保證SequenceFile裏的數據是按鍵升序存放的,不然使用MapReduce任務對文件進行一次輸入輸出,就會自動排序合併,如:
//建立兩個SequenceFile
publicclass SequenceFileCreate {
privatestaticfinal String[] DATA = { "One, 一", "Three, 三", "Five, 五", "Seven, 七", "Nine, 九" };
publicstaticvoid main(String[] args) throws IOException {
String uri = "hdfs://hadoop-master:9000/sequence/seq1";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
Path path2 = new Path("hdfs://hadoop-master:9000/sequence/seq2");
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null, writer2 = null;
try {
//建立第一個SequenceFile
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < 10; i++) {
key.set(10 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
//建立第二個SequenceFile
writer2 = SequenceFile.createWriter(fs, conf, path2, key.getClass(), value.getClass());
for (int i = 10; i < 20; i++) {
key.set(30 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer2.getLength(), key, value);
writer2.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
IOUtils.closeStream(writer2);
}
}
}
//將前面生成的兩個SequenceFile排序合併成一個SequenceFile文件
publicclass SequenceFileCovertMapFile {
publicstaticclass Mapper extends
org.apache.hadoop.mapreduce.Mapper<IntWritable, Text, IntWritable, Text> {
@Override
publicvoid map(IntWritable key, Text value, Context context) throws IOException,
InterruptedException {
context.write(key, value);
System.out.println("key=" + key + " value=" + value);
}
}
publicstaticclass Reducer extends
org.apache.hadoop.mapreduce.Reducer<IntWritable, Text, IntWritable, Text> {
@Override
publicvoid reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hadoop-master:9001");
Job job = Job.getInstance(conf, "SequenceFileCovert");
job.setJarByClass(SequenceFileCovertMapFile.class);
job.setJobName("SequenceFileCovert");
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
// 注意這裏要設置輸入輸出文件格式爲SequenceFile
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);//默認就是1,一個Reduce就只輸出一個文件,這樣就將多個輸入文件合併成一個文件了
SequenceFileInputFormat.addInputPath(job, new Path("hdfs://hadoop-master:9000/sequence"));
SequenceFileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop-master:9000/sequence2map"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
二、 將SequenceFile文件名修改成data(hadoop fs -mv /sequence2map/part-r-00000 /sequence2map/data)
三、 使用最前面的MapFileFixer程序建立index
org.apache.hadoop.conf.Configuration類是用來讀取特定格式XML配置文件的
<?xml version="1.0"?>
<configuration>
<property>
<name>color</name>
<value>yellow</value>
<description>Color</description>
</property>
<property>
<name>size</name>
<value>10</value>
<description>Size</description>
</property>
<property>
<name>weight</name>
<value>heavy</value>
<final>true</final> <!--該屬性不能被後面加進來的同名屬性覆蓋-->
<description>Weight</description>
</property>
<property>
<name>size-weight</name>
<value>${size},${weight}</value><!—配置屬性能夠引用其餘屬性或系統屬性-->
<description>Size and weight</description>
</property>
</configuration>
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");//若有多個XML,能夠多添調用此方法添加,相同屬性後面會覆蓋前面的,除非前面是final屬性
System.out.println(conf.get("color"));//yellow
System.out.println(conf.get("size"));//10
System.out.println(conf.get("breadth", "wide"));//wide 若是不存在breadth配置項,則返回後面給定的wide默認值
System.out.println(conf.get("size-weight"));//10,heavy
系統屬性的優先級高於XML配置文件中定義的屬性,但仍是不能覆蓋final爲true的屬性:
System.setProperty("size", "14");//系統屬性
System.out.println(conf.get("size-weight"));//14,heavy
系統屬性還能夠經過JVM參數 -Dproperty=value 來設置
雖然能夠經過系統屬性來覆蓋XML配置文件中非final屬性,但若是XML中不存在該屬性,則僅配置系統屬性後,經過Configuration