Hadoop源碼篇 --- 面試常問的Namenode元數據管理及雙緩衝機制

前言

這兩個關於NameNode的問題其實很是地經典,不只有不少細節可詢,並且也是面試的一個高頻問題,因此特地獨立出來一篇。元數據管理會結合源碼來說,而雙緩衝雖然暫時沒去翻源碼,可是咱們能夠藉由一個簡單的實現去向你們好好地說明。後面也會對這段源碼進行一些修改操做來讓它更爲高效。那話很少說我們就開始吧java

由於直接看源碼你們可能接受不了,因此咱們先來聊聊雙緩衝機制。node

1、Namenode的雙緩衝機制

1.1 問題場景

Namenode裏面的元數據是以兩種狀態進行存儲的:面試

第一狀態便是存儲在內存裏面,也就是剛剛所提到的目錄樹,它就是一個list,在內存裏面更新元數據速度是很快的。可是若是僅僅只在內存裏存放元數據,數據是不太安全的。安全

因此咱們在磁盤上也會存儲一份元數據,但是此時問題就出現了,咱們須要把數據寫進磁盤,這個性能確定是不太好的呀。可NameNode做爲整個集羣的老大,在hadoop上進行hive,HBASE,spark,flink等計算,這些數據都會不停給NameNode施壓寫元數據,一天下來一億條元數據都是可能的,因此NameNode的設計確定是要支持超高併發的,但是寫磁盤這操做是很是很是慢的,一秒幾十或者最多幾百都已經封頂了,那如今咋辦?併發

1.2 元數據的雙緩衝機制

首先咱們的客戶端(這裏指的是hive,hbase,spark···都不要緊)所產生的數據將會走兩個流程,第一個流程會向內存處寫入數據,這個過程很是快,也不難理解

這時候確定就不能直接寫內存了,畢竟咱們是明知道這東西很是慢的,真的要等它一條條數據寫到磁盤,那特麼咱們均可以雙手離開鼠標鍵盤下班走人了。那NameNode一個東西不行就整個集羣都不行了,那如今咱們該如何解決?

雙緩衝機制就是指咱們將會開闢兩份如出一轍的內存空間,一個爲bufCurrent,產生的數據會直接寫入到這個bufCurrent,而另外一個叫bufReady,在bufCurrent數據寫入(其實這裏可能不止一條數據,等下會說明)後,兩片內存就會exchange(交換)。而後以前的bufCurrent就負責往磁盤上寫入數據,以前的bufReady就繼續接收客戶端寫入的數據。其實就是將向磁盤寫數據的任務交給了後臺去作。這個作法,在JUC裏面也有用到

並且在此基礎上,hadoop會給每個元數據信息的修改賦予一個事務ID號,保證操做都是有序的。這也是出於數據的安全考慮。這樣整個系統要求的內存會很是大,因此這關乎一個hadoop的優化問題,在以後將會說起。分佈式

1.3 雙緩衝機制的代碼實現

這個對理解起來其實十分有幫助,但願你們能跟着思路走ide

1.3.1 模擬一個元數據信息類

咱們先設計一條元數據信息出來高併發

public class EditLog{
	//事務的ID
	public long taxid;
	public String log;
	
	public EditLog(long taxid, String log) {
		this.taxid = taxid;
		this.log = log;
	}

	@Override
	public String toString() {
		return "EditLog [taxid=" + taxid + ", log=" + log + "]";
	}
	
}
複製代碼

1.3.2 雙緩衝區

代碼其實不難,分爲5塊oop

① 定義了兩個緩衝區currentBuffer(有序隊列)和syncBuffer性能

② 一個write方法負責寫入元數據

③ 一個flush方法把元數據寫入到磁盤上,這裏我只用了一個打印語句模擬了一下寫入磁盤,寫入完成後清空syncBuffer的數據

④ 一個exchange方法來交換currentBuffer和syncBuffer

⑤ 還有一個getMaxTaxid方法獲取到正在同步數據的內存裏面事務ID的最大ID,這個方法的做用稍後說明

這5塊除了最後的獲取ID,應該你們都知道是幹嗎用的了吧,那行,以後就會揭曉

public class DoubleBuffer{
	//寫數據,有序隊列
	LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
	//用來把數據持久化到磁盤上面的內存
	LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>();
	/**
	 * 寫元數據信息
	 * @param editLog
	 */
	public void write(EditLog editLog){
		currentBuffer.add(editLog);
		
	}
	/**
	 * 把數據寫到磁盤
	 */
	public void flush() {
		for(EditLog editLog:syncBuffer) {
			//模擬將數據寫到磁盤
			System.out.println(editLog);	
		}
		syncBuffer.clear();
		
	}
	/**
	 * 交換currentBuffer和syncBuffer
	 */
	public void exchange() {
		LinkedList<EditLog> tmp=currentBuffer;
		currentBuffer=syncBuffer;
		syncBuffer=tmp;
	}
	/**
	 * 獲取到正在同步數據的內存裏面事務ID的最大ID
	 */
	public long getMaxTaxid() {
		return syncBuffer.getLast().taxid;
	}
	
}
複製代碼

1.3.3 寫元數據日誌的核心方法

那我如今要保證這個寫操做(這裏的寫操做是客戶端向bufCurrent寫)的順序,因此咱們在這裏會使用synchronized來加鎖,而後經過taxid++順序處理。而後new出一個元數據對象,把對象寫進磁盤

long taxid=0L;//
DoubleBuffer doubleBuffer=new DoubleBuffer();
//每一個線程本身擁有的副本
ThreadLocal<Long> threadLocal=new ThreadLocal<Long>();
private void logEdit(String log) {
	synchronized (this) {
		taxid++;
		
		// 讓每一個線程裏面都擁有本身的事務ID號,做用後面會解釋
		threadLocal.set(taxid);
		EditLog editLog=new EditLog(taxid,log);
		//往內存裏面寫東西
		doubleBuffer.write(editLog);
		
	} 
	// 此時鎖釋放
	
	//將數據持久化到硬盤的方法
	logFlush();
	
}
複製代碼

那有小夥伴就會有疑問了,都加了鎖了這運行的性能能好?但是你要知道這把鎖裏面doubleBuffer.write(editLog)這是往內存裏面寫東西的呀。因此這是沒有問題的,也能完美支持高併發

事先說起一下,這裏將會用到分段加鎖,好比此時咱們有3個線程,線程1進來logEdit,執行完write以後,馬上鎖就會被釋放,而後線程2馬上又能緊隨其後write,寫完又到線程3。

由於寫內存的速度是極快的,因此此時在還沒輪到**logFlush()方法(將數據持久化到硬盤的方法)**執行,咱們可能都已經都已經完成了3個數據往bufCurrent寫入的操做。

舒適提示:此時這邊的線程1將要進入到logFlush了,但是此時bufCurrent可能已經夾帶了線程1,2,3的數據了,如今我先作個假設,線程1,2,3寫入的元數據分別就是1,2,3,這句話很是重要!!!這句話很是重要!!!這句話很是重要!!!很是重要的事情說三遍,而後請看logFlush的解釋

1.3.4 logFlush --- 將數據持久化到硬盤的方法

//判斷此時後臺正在把數據同步到磁盤上
public boolean isSyncRunning =false;
//正在同步磁盤的內存塊裏面最大的一個ID號。
long maxtaxid=0L;
boolean isWait=false;

private void logFlush() {
	synchronized(this) {
		if(isSyncRunning) {
			//獲取當前線程的是事務ID
			long localTaxid=threadLocal.get();
			if(localTaxid <= maxtaxid) {
				return;
			}
			if(isWait) {
				return;
			}
			isWait=true;
			while(isSyncRunning) {
				try {
					//一直等待
					//wait這個操做是釋放鎖
					this.wait(1000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
			isWait=false;
					
			//
		}
		//代碼就走到這兒
		//syncBuffer(1,2,3)  
		doubleBuffer.exchange();
		//maxtaxid = 3
		if(doubleBuffer.syncBuffer.size() > 0) {
			maxtaxid=doubleBuffer.getMaxTaxid();
		}
		isSyncRunning=true;
		
	}
	//釋放鎖
	//把數據持久化到磁盤,比較耗費性能的。
	doubleBuffer.flush();
	//分段加鎖
	synchronized (this) {
		//修改isSyncRunning的狀態
		isSyncRunning=false;
		//喚醒wait;
		this.notifyAll();
		
	}
複製代碼

思路很是清晰,如今咱們就來整理一下。

這個logFlush的方法的流程就是,我先使用一個boolean值 isSyncRunning 判斷此時後臺正在把數據同步到磁盤上,這個東西在客戶端沒有足夠數據寫進來以前一開始確定是false的,可是若是寫進bufCurrent的數據已經差很少了,那我就要把bufCurrent和syncBuffer交換,把 isSyncRunning 改爲true。此時記錄一下正在同步磁盤的內存塊裏面最大的一個ID號maxtaxid(後面須要使用)。而後讓本來的bufCurrent往磁盤上寫數據。 在寫入完成後,isSyncRunning的值修改回false

若是如今第二個線程夾帶着它的數據2進來了logFlush,此時寫入磁盤的操做尚未執行完成,那它就會先獲取當前線程的事務ID---localTaxid,若是當前的這個localTaxid小於我如今進行同步的事務ID的最大值(2<3),那就說明如今的這個線程2所夾帶的數據我已經在上一個線程操做中了。那我就直接無視(若是不理解爲啥無視,直接看下一段話)

這裏就要使用到我剛剛說了三遍很重要的事情了,上一個線程1進來的時候,bufCurrent已經夾帶了數據1,2,3,此時個人maxtaxid=3,線程2所夾帶的2,已是在處理中的數據了

可是若是localTaxid大於我如今進行同步的事務ID的最大值,可是此時又還有線程在同步元數據,那我就讓它等wait,此時我這邊一wait,那邊客戶端又能夠繼續往bufCurrent寫入元數據了。這裏代碼的邏輯是等待1s後,又從新去查看是否有線程正在同步元數據。而後在我同步元數據的操做後面,添加上喚醒這個wait的操做,由於在這一瞬間我同步結束後,若是這個線程仍然在wait,那豈不是在白等了,因此我這邊處理完了,馬上就喚醒它來繼續同步,不浪費你們時間。

1.3.5 編寫一個main方法跑跑看

public static void main(String[] args) {
	FSEdit fs=new FSEdit();
	for (int i = 0; i < 1000; i++) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				for (int j = 0; j < 100; j++) {
					fs.logEdit("日誌");
				}
				
			}
		}).start();
	}
}
複製代碼

咱們隨便上個10W條跑跑看,彷佛是3到4秒就搞定了,並且生成的EditLog都是有序的

這個套路其實徹底是模仿了hadoop的源碼寫了一個大概的,後面咱們也會對這個地方的源碼進行修改。可是也是很是地接近了。

在這裏我也能夠說明有哪些地方的不足,好比咱們這樣操做內存頻繁地交換,那確定是會對性能產生必定影響的,因此咱們會在這塊設置一個合理的大小再進行交換。

2、NameNode是如何管理元數據的

分析NameNode對元數據的管理這個問題咱們的作法很簡單,先經過命令建立一個目錄,而後看HDFS的元數據是否隨之發生了變化

hadoop fs -mkdir /user/soft
複製代碼

按照這個思路,那咱們打開hadoop-src吧

2.1 經過Java代碼先模擬建立目錄的操做

如今經過一段Java代碼來建立目錄

public static void main(String[] args) throws IOException {
	Configuration configuration=new Configuration();
	FileSystem fileSystem=FileSystem.newInstance(configuration);
	//建立目錄(分析的是元數據的管理流程)
	fileSystem.mkdirs(new Path(""));

	/**
	 * TODO 分析HDFS上傳文件的流程
	 * TODO 作一些重要的初始化工做
	 */
	FSDataOutputStream fsous=fileSystem.create(new Path("/user.txt"));
	
	//TODO 完成上傳文件的流程
	fsous.write("showMeYourDream".getBytes());
	
}
複製代碼

那我如今已經寫好在這裏了,接下來就開始分析

2.2 mkdirs的操做

點進去mkdirs方法,發現它跳轉到了 FileSystem.java 的mkdirs

繼續深刻,仍舊是在 FileSystem.java ,你會發現我們沒得再點了,這時候咱們只能讓hadoop的開發者告訴咱們了

仍舊是在 FileSystem.java ,此時咱們把位置拉取到源碼大約87行的位置,能夠看到這麼一段話,The local implementation is {@link LocalFileSystem} and distributed implementation is DistributedFileSystem.

粘貼到百度翻譯,本地實現是{@link LocalFileSystem},而且是分佈式的。實現是DistributedFileSystem 。

此時咱們就懂了,這個mkdirs方法是被這個DistributedFileSystem(分佈式文件系統)類給實現了,那咱們就過去它那裏去找找看吧

2.2.1 DistributedFileSystem的mkdirs

直接查找mkdirs方法,看到一個本地的mkdirs,繼續點進去

看起來仍是沒有咱們想要看的邏輯,咱們還得繼續點進去mkdirs

2.2.2 DFSClient的mkdirs

咱們暫時不關心這些什麼權限不權限的,咱們只關心建立的邏輯,繼續點primitiveMkdir

說過的有try就直接看try,看到這裏咱們就知道了,原來咱們服務端的建立文件目錄的操做其實就是調用了NameNode的mkdirs,而這裏的NameNode確定是一個咱們服務端的代理對象,不信咱們點一下namenode

DFSClient.java第261行左右,繼續點擊ClientProcotol

跳轉到這裏,而後咱們直接複製它的註釋到百度翻譯

那它其實不就是咱們的一個服務端代理對象嗎

因此如今咱們能夠直接找出NameNode,好好看看它的mkdirs方法了

2.2.3 NameNode的mkdirs

前面的if直接無視,看到return namesystem.mkdirs那,點進去

這裏的try裏面會先進行一個安全模式的判斷,若是集羣處於安全模式的話是集羣是隻能支持讀不能支持寫的,而後沒問題的話就會執行建立目錄的步驟,這時候有一個問題,就是若是你建立了目錄,是會產生相應的日誌的,因此後面會有一步對日誌的持久化操做。

繼續點進去FSDirMkdirOp.mkdirs

代碼很長,有一點小複雜,因此咱們分步驟說明一下

2.3 FSDirectory和FSNameSystem的關係

首先是第一個關鍵詞FSDirectory,這個是管理元數據的目錄樹(fsimage),元數據就是在NameNode的內存裏面

並且這裏還要注意,咱們的元數據是有兩份的,一份是在磁盤中的 fsimage + edit log,由FSNameSystem來進行管理,還有一份是內存裏面的fsimage,由這個FSDirectory來管理

那這種東西我口說無憑啊,那我是怎麼知道的,那固然仍是hadoop的開發者告訴個人啦,點進去FSDirectory

若是我直接把原有的註釋扔到百度翻譯,結果是這樣的

固然這裏的記憶翻譯是錯的,翻譯應該是內存。但是後面關於FSNamesystem的說明已經很直白了,就是它 把咱們的元數據記錄信息是持久化到磁盤上面的。你看,這我不會騙你的

而這個FSDirectory的對象是如何管理一個目錄的呢,咱們要經過FSDirectory的代碼說明

2.4 FSDirectory的結構及目錄樹的生成流程

這個rootDir就是根目錄,咱們點進去INodeDirectory

能夠看到它最重要的屬性children,這裏有個細節須要注意,咱們看到這個children是一個list,而這個list裏面存放的是INode這個類型的數據

這個INode是什麼東西?咱們知道咱們 平時磁盤上的文件夾,文件夾裏面既能夠是文件夾,也能夠是文件,那INode也是這麼去設計的,若是它的子目錄下面是文件夾,那就是用INodeDirectory來表示,而若是是文件,那就是INodeFile,明白是這麼設計的就行了

回到FSDirMkdirOp中的mkdirs

這裏的lastINode是個啥意思呢,好比咱們如今已經存在的目錄是/user/hive/warehouse,咱們須要建立的目錄是:/user/hive/warehouse/data/mytable,那我就是在data以前的那些目錄都不須要本身再建立了呀。因此首先找到 最後一個INode,其實就是warehouse(也就是lastINode)。 從這個INode開始建立INodeDirectory(data),再建立INodeDirectory(mytable)

以後就是這個邏輯,nonExisting就是指咱們仍未建立的那部分INode,對應剛剛的例子就是/data/mytable。而後進行判斷,若是length大於1(對於這個例子length就是大於1的,由於length指代的是要建立的Inode個數,例子中有data和mytable兩個INode)

其實不管是建立1個仍是多個,都是進去的同一個方法createChildrenDirectories,在這個方法裏面使用了for循環,無論你是幾個,我都用同一套邏輯,完成代碼複用而已,那咱們如今看到了,建立邏輯就是那麼一句話createSingleDirectory而已,點進去

繼續點unprotectedMkdir

以後就是建立出來一個名稱叫作dir(/data/mytable)的目錄,而後把這個dir添加到本來/user/hive/warehouse的末尾處

到如今爲止,目錄樹fsimage就完成了更新。

流程總結

DistributedFileSystem實現了mkdirs方法,跳轉到DFSClient,在DFSClient中看到了這個mkdirs實際上是調用了Namenode的mkdirs,因此跳到Namenode的mkdirs,

fsimage其實就是一個目錄樹,咱們的元數據是有兩份的,一份是在磁盤中的 fsimage + edit log,由FSNameSystem來進行管理,還有一份是內存裏面的fsimage,由這個FSDirectory來管理,還有就是目錄樹的每個節點都爲INode,這個INode有兩種形態,INodeFIle表明文件,INodeDirectory表明文件夾,建立目錄其實就是,先獲取到集羣本來已存在的目錄的最後一個INode,咱們稱之爲lastNode,而後經過一個for循環來將目錄拼接到這個lastNode的末尾。

finally

字數算是較多,不過其實總得來講都不難理解。重要的位置基本都已經加粗了,也是但願對你們有所幫助。之後再被問到這倆問題,在明白這個套路的前提下,按照咱們的總結過程簡短地說出來便可。但願你們都能裝的一手好B,hhh。

相關文章
相關標籤/搜索