這兩個關於NameNode的問題其實很是地經典,不只有不少細節可詢,並且也是面試的一個高頻問題,因此特地獨立出來一篇。元數據管理會結合源碼來說,而雙緩衝雖然暫時沒去翻源碼,可是咱們能夠藉由一個簡單的實現去向你們好好地說明。後面也會對這段源碼進行一些修改操做來讓它更爲高效。那話很少說我們就開始吧java
由於直接看源碼你們可能接受不了,因此咱們先來聊聊雙緩衝機制。node
Namenode裏面的元數據是以兩種狀態進行存儲的:面試
第一狀態便是存儲在內存裏面,也就是剛剛所提到的目錄樹,它就是一個list,在內存裏面更新元數據速度是很快的。可是若是僅僅只在內存裏存放元數據,數據是不太安全的。安全
因此咱們在磁盤上也會存儲一份元數據,但是此時問題就出現了,咱們須要把數據寫進磁盤,這個性能確定是不太好的呀。可NameNode做爲整個集羣的老大,在hadoop上進行hive,HBASE,spark,flink等計算,這些數據都會不停給NameNode施壓寫元數據,一天下來一億條元數據都是可能的,因此NameNode的設計確定是要支持超高併發的,但是寫磁盤這操做是很是很是慢的,一秒幾十或者最多幾百都已經封頂了,那如今咋辦?併發
並且在此基礎上,hadoop會給每個元數據信息的修改賦予一個事務ID號,保證操做都是有序的。這也是出於數據的安全考慮。這樣整個系統要求的內存會很是大,因此這關乎一個hadoop的優化問題,在以後將會說起。分佈式
這個對理解起來其實十分有幫助,但願你們能跟着思路走ide
咱們先設計一條元數據信息出來高併發
public class EditLog{
//事務的ID
public long taxid;
public String log;
public EditLog(long taxid, String log) {
this.taxid = taxid;
this.log = log;
}
@Override
public String toString() {
return "EditLog [taxid=" + taxid + ", log=" + log + "]";
}
}
複製代碼
代碼其實不難,分爲5塊oop
① 定義了兩個緩衝區currentBuffer(有序隊列)和syncBuffer性能
② 一個write方法負責寫入元數據
③ 一個flush方法把元數據寫入到磁盤上,這裏我只用了一個打印語句模擬了一下寫入磁盤,寫入完成後清空syncBuffer的數據
④ 一個exchange方法來交換currentBuffer和syncBuffer
⑤ 還有一個getMaxTaxid方法獲取到正在同步數據的內存裏面事務ID的最大ID,這個方法的做用稍後說明
這5塊除了最後的獲取ID,應該你們都知道是幹嗎用的了吧,那行,以後就會揭曉
public class DoubleBuffer{
//寫數據,有序隊列
LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
//用來把數據持久化到磁盤上面的內存
LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>();
/**
* 寫元數據信息
* @param editLog
*/
public void write(EditLog editLog){
currentBuffer.add(editLog);
}
/**
* 把數據寫到磁盤
*/
public void flush() {
for(EditLog editLog:syncBuffer) {
//模擬將數據寫到磁盤
System.out.println(editLog);
}
syncBuffer.clear();
}
/**
* 交換currentBuffer和syncBuffer
*/
public void exchange() {
LinkedList<EditLog> tmp=currentBuffer;
currentBuffer=syncBuffer;
syncBuffer=tmp;
}
/**
* 獲取到正在同步數據的內存裏面事務ID的最大ID
*/
public long getMaxTaxid() {
return syncBuffer.getLast().taxid;
}
}
複製代碼
那我如今要保證這個寫操做(這裏的寫操做是客戶端向bufCurrent寫)的順序,因此咱們在這裏會使用synchronized來加鎖,而後經過taxid++順序處理。而後new出一個元數據對象,把對象寫進磁盤
long taxid=0L;//
DoubleBuffer doubleBuffer=new DoubleBuffer();
//每一個線程本身擁有的副本
ThreadLocal<Long> threadLocal=new ThreadLocal<Long>();
private void logEdit(String log) {
synchronized (this) {
taxid++;
// 讓每一個線程裏面都擁有本身的事務ID號,做用後面會解釋
threadLocal.set(taxid);
EditLog editLog=new EditLog(taxid,log);
//往內存裏面寫東西
doubleBuffer.write(editLog);
}
// 此時鎖釋放
//將數據持久化到硬盤的方法
logFlush();
}
複製代碼
那有小夥伴就會有疑問了,都加了鎖了這運行的性能能好?但是你要知道這把鎖裏面doubleBuffer.write(editLog)這是往內存裏面寫東西的呀。因此這是沒有問題的,也能完美支持高併發
事先說起一下,這裏將會用到分段加鎖,好比此時咱們有3個線程,線程1進來logEdit,執行完write以後,馬上鎖就會被釋放,而後線程2馬上又能緊隨其後write,寫完又到線程3。
由於寫內存的速度是極快的,因此此時在還沒輪到**logFlush()方法(將數據持久化到硬盤的方法)**執行,咱們可能都已經都已經完成了3個數據往bufCurrent寫入的操做。
舒適提示:此時這邊的線程1將要進入到logFlush了,但是此時bufCurrent可能已經夾帶了線程1,2,3的數據了,如今我先作個假設,線程1,2,3寫入的元數據分別就是1,2,3,這句話很是重要!!!這句話很是重要!!!這句話很是重要!!!很是重要的事情說三遍,而後請看logFlush的解釋
//判斷此時後臺正在把數據同步到磁盤上
public boolean isSyncRunning =false;
//正在同步磁盤的內存塊裏面最大的一個ID號。
long maxtaxid=0L;
boolean isWait=false;
private void logFlush() {
synchronized(this) {
if(isSyncRunning) {
//獲取當前線程的是事務ID
long localTaxid=threadLocal.get();
if(localTaxid <= maxtaxid) {
return;
}
if(isWait) {
return;
}
isWait=true;
while(isSyncRunning) {
try {
//一直等待
//wait這個操做是釋放鎖
this.wait(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
isWait=false;
//
}
//代碼就走到這兒
//syncBuffer(1,2,3)
doubleBuffer.exchange();
//maxtaxid = 3
if(doubleBuffer.syncBuffer.size() > 0) {
maxtaxid=doubleBuffer.getMaxTaxid();
}
isSyncRunning=true;
}
//釋放鎖
//把數據持久化到磁盤,比較耗費性能的。
doubleBuffer.flush();
//分段加鎖
synchronized (this) {
//修改isSyncRunning的狀態
isSyncRunning=false;
//喚醒wait;
this.notifyAll();
}
複製代碼
思路很是清晰,如今咱們就來整理一下。
這個logFlush的方法的流程就是,我先使用一個boolean值 isSyncRunning 判斷此時後臺正在把數據同步到磁盤上,這個東西在客戶端沒有足夠數據寫進來以前一開始確定是false的,可是若是寫進bufCurrent的數據已經差很少了,那我就要把bufCurrent和syncBuffer交換,把 isSyncRunning 改爲true。此時記錄一下正在同步磁盤的內存塊裏面最大的一個ID號maxtaxid(後面須要使用)。而後讓本來的bufCurrent往磁盤上寫數據。 在寫入完成後,isSyncRunning的值修改回false
若是如今第二個線程夾帶着它的數據2進來了logFlush,此時寫入磁盤的操做尚未執行完成,那它就會先獲取當前線程的事務ID---localTaxid,若是當前的這個localTaxid小於我如今進行同步的事務ID的最大值(2<3),那就說明如今的這個線程2所夾帶的數據我已經在上一個線程操做中了。那我就直接無視(若是不理解爲啥無視,直接看下一段話)
這裏就要使用到我剛剛說了三遍很重要的事情了,上一個線程1進來的時候,bufCurrent已經夾帶了數據1,2,3,此時個人maxtaxid=3,線程2所夾帶的2,已是在處理中的數據了
可是若是localTaxid大於我如今進行同步的事務ID的最大值,可是此時又還有線程在同步元數據,那我就讓它等wait,此時我這邊一wait,那邊客戶端又能夠繼續往bufCurrent寫入元數據了。這裏代碼的邏輯是等待1s後,又從新去查看是否有線程正在同步元數據。而後在我同步元數據的操做後面,添加上喚醒這個wait的操做,由於在這一瞬間我同步結束後,若是這個線程仍然在wait,那豈不是在白等了,因此我這邊處理完了,馬上就喚醒它來繼續同步,不浪費你們時間。
public static void main(String[] args) {
FSEdit fs=new FSEdit();
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 100; j++) {
fs.logEdit("日誌");
}
}
}).start();
}
}
複製代碼
咱們隨便上個10W條跑跑看,彷佛是3到4秒就搞定了,並且生成的EditLog都是有序的
這個套路其實徹底是模仿了hadoop的源碼寫了一個大概的,後面咱們也會對這個地方的源碼進行修改。可是也是很是地接近了。
在這裏我也能夠說明有哪些地方的不足,好比咱們這樣操做內存頻繁地交換,那確定是會對性能產生必定影響的,因此咱們會在這塊設置一個合理的大小再進行交換。
分析NameNode對元數據的管理這個問題咱們的作法很簡單,先經過命令建立一個目錄,而後看HDFS的元數據是否隨之發生了變化
hadoop fs -mkdir /user/soft
複製代碼
按照這個思路,那咱們打開hadoop-src吧
如今經過一段Java代碼來建立目錄
public static void main(String[] args) throws IOException {
Configuration configuration=new Configuration();
FileSystem fileSystem=FileSystem.newInstance(configuration);
//建立目錄(分析的是元數據的管理流程)
fileSystem.mkdirs(new Path(""));
/**
* TODO 分析HDFS上傳文件的流程
* TODO 作一些重要的初始化工做
*/
FSDataOutputStream fsous=fileSystem.create(new Path("/user.txt"));
//TODO 完成上傳文件的流程
fsous.write("showMeYourDream".getBytes());
}
複製代碼
那我如今已經寫好在這裏了,接下來就開始分析
此時咱們就懂了,這個mkdirs方法是被這個DistributedFileSystem(分佈式文件系統)類給實現了,那咱們就過去它那裏去找找看吧
因此如今咱們能夠直接找出NameNode,好好看看它的mkdirs方法了
繼續點進去FSDirMkdirOp.mkdirs
代碼很長,有一點小複雜,因此咱們分步驟說明一下首先是第一個關鍵詞FSDirectory,這個是管理元數據的目錄樹(fsimage),元數據就是在NameNode的內存裏面
並且這裏還要注意,咱們的元數據是有兩份的,一份是在磁盤中的 fsimage + edit log,由FSNameSystem來進行管理,還有一份是內存裏面的fsimage,由這個FSDirectory來管理
那這種東西我口說無憑啊,那我是怎麼知道的,那固然仍是hadoop的開發者告訴個人啦,點進去FSDirectory
若是我直接把原有的註釋扔到百度翻譯,結果是這樣的 固然這裏的記憶翻譯是錯的,翻譯應該是內存。但是後面關於FSNamesystem的說明已經很直白了,就是它 把咱們的元數據記錄信息是持久化到磁盤上面的。你看,這我不會騙你的而這個FSDirectory的對象是如何管理一個目錄的呢,咱們要經過FSDirectory的代碼說明
回到FSDirMkdirOp中的mkdirs
這裏的lastINode是個啥意思呢,好比咱們如今已經存在的目錄是/user/hive/warehouse,咱們須要建立的目錄是:/user/hive/warehouse/data/mytable,那我就是在data以前的那些目錄都不須要本身再建立了呀。因此首先找到 最後一個INode,其實就是warehouse(也就是lastINode)。 從這個INode開始建立INodeDirectory(data),再建立INodeDirectory(mytable) 以後就是這個邏輯,nonExisting就是指咱們仍未建立的那部分INode,對應剛剛的例子就是/data/mytable。而後進行判斷,若是length大於1(對於這個例子length就是大於1的,由於length指代的是要建立的Inode個數,例子中有data和mytable兩個INode) 其實不管是建立1個仍是多個,都是進去的同一個方法createChildrenDirectories,在這個方法裏面使用了for循環,無論你是幾個,我都用同一套邏輯,完成代碼複用而已,那咱們如今看到了,建立邏輯就是那麼一句話createSingleDirectory而已,點進去 繼續點unprotectedMkdir 以後就是建立出來一個名稱叫作dir(/data/mytable)的目錄,而後把這個dir添加到本來/user/hive/warehouse的末尾處到如今爲止,目錄樹fsimage就完成了更新。
DistributedFileSystem實現了mkdirs方法,跳轉到DFSClient,在DFSClient中看到了這個mkdirs實際上是調用了Namenode的mkdirs,因此跳到Namenode的mkdirs,
fsimage其實就是一個目錄樹,咱們的元數據是有兩份的,一份是在磁盤中的 fsimage + edit log,由FSNameSystem來進行管理,還有一份是內存裏面的fsimage,由這個FSDirectory來管理,還有就是目錄樹的每個節點都爲INode,這個INode有兩種形態,INodeFIle表明文件,INodeDirectory表明文件夾,建立目錄其實就是,先獲取到集羣本來已存在的目錄的最後一個INode,咱們稱之爲lastNode,而後經過一個for循環來將目錄拼接到這個lastNode的末尾。
字數算是較多,不過其實總得來講都不難理解。重要的位置基本都已經加粗了,也是但願對你們有所幫助。之後再被問到這倆問題,在明白這個套路的前提下,按照咱們的總結過程簡短地說出來便可。但願你們都能裝的一手好B,hhh。