Hbase寫入hdfs源碼分析

版權聲明:本文由熊訓德原創文章,轉載請註明出處: 
文章原文連接:https://www.qcloud.com/community/article/258node

來源:騰雲閣 https://www.qcloud.com/communityapache

 

本文檔從源碼角度分析了,hbase做爲dfs client寫入hdfs的hadoop sequence文件最終刷盤落地的過程。
以前在《wal線程模型源碼分析》中描述wal的寫過程時說過會寫入hadoop sequence文件,hbase爲了保證數據的安全性,通常都是寫入同爲hadoop生態的hdfs(Hadoop Distribute File System)中。append的最終結果是使用write.append()寫入,而sync()則是使用write.sync()刷盤。這時其實並未真正的結束,爲了保障數據安全性,hdfs可會根據用戶的配置寫到多個datanode節點中,無論是HFile仍是FSHLog都不單單是簡單的寫入或刷入(flush)了真正的存儲節點--DataNode中,其中涉及到數據流(WALEntry)如何安全有序且高效地寫到datanode文件中,而flush又是具體如何作的,這個文檔就將從源碼上分析hbase的「寫」操做到了wirter.append()和writer.sync()後具體發生了什麼,如何落地的。緩存

下圖是《Hbase權威指南》中描述Hbase底層存儲結構的頂層結構圖。能夠看到Hbase將處理HFile文件(memstore生成)和HLog文件(WAL生成)這兩種文件都將有HRegionServer管理,當真正存儲到HDFS中時,會使用DFS Client做爲hdfs的客戶端把大批量的這兩種數據流寫到多個DataNode節點中。
安全

在文檔 《wal線程模型源碼分析》中爲了突出重點說明wal線程模型,並未具體說明writer.append()和writer.sync()中writer實例是什麼,在FSHLog中被volatile關鍵字修飾聲明爲一個WALProvider.Writer類型的接口:

其實它的實現類是ProtobufLogWriter,這個類也在org.apache.hadoop.hbase.regionserver.wal包中,在wal包中是做爲wal向datanode的writer,它在FSHLog是使用工廠模式createWriterInstance()實例化,而後調用init()方法初始化:架構


從源碼中能夠看到真正寫實例是FSDataOutputStream,它用於向新生成的文件中寫入數據,就像前面敘述的,在ProtobufLogWriter的init()方法中被初始化:
app

在這裏咱們僅僅討論使用hdfs做爲hbase的文件系統,也便是init參數中fs(System)是DistributedFileSystem的實例。在其createNonRecursive的實現的參數除了path參數指明須要在hdfs建立的文件路徑比較重要之外,還有一個replication參數也很重要,這個參數說明了備份數量也便是寫datanode份數。DistributedFileSystem中dfs是DFSClient的實例引用,也即最開始那張架構圖中所指的DFS Client。hbase使用DFSClient的create方法經過RPC調用向hdfs的namenode建立一個文件並構造了輸出流DFSOutputStream實例,這個方法另一個重點就啓動了一個pipeline,具體調用是streamer.start(),這個pipleline是hbase向hdfs的多個datanode管道寫的實現。雖然這裏分析的是wal的寫入過程,可是其實keyvalue寫到memstore,再寫到HFile後也是採用這種方式管道寫(pipeline)的方式實現。ide




經過rpc調用NameNode的create函數,調用namesystem.startFile函數,其又調用startFileInternal函數,它建立一個新的文件,狀態爲under construction,沒有任何data block與之對應。於此同時建立成功後會返回一個DFSOutputStream類型的實例,在FSDataOutputStream中被稱做wrappedStream,該對象負責處理datanode和namenode之間的通信。函數


hdfs的文件結構,HDFS一個文件由多個block(默認64MB)構成。這裏經過註釋能夠看到HDFS在進行block讀寫的時候是以packet(默認每一個packet爲64K)爲單位進行的。每個packet由若干個chunk(默認512Byte)組成。Chunk是進行數據校驗的基本單位,對每個chunk生成一個校驗和(默認4Byte)並將校驗和進行存儲。oop

分析到這,已經能夠看出hbase文件寫入hdfs的過程並無特別,hdfs就把hbase當作hdfs的client而後封裝成chunk再組裝成packet,再向datanode批量寫數據。爲了保證數據有序的傳輸,使用了數據發送隊列dataqueue和待確認隊列ackqueue,並使用兩個線程DFSOutputStream$DataStreamer和DFSOutputStream$DataStreamer$ResponseProcessor(在run()中)分別來發送數據到對應block和確認數據是否到達。源碼分析

還有另一個重點就是hbase是如何把數據到datanode的磁盤的。

在此,咱們又要回到ProtobufLogWriter類中由於writer.sync()最終調用的就是ProtobufLogWriter的writer方法,它的源碼以下:

其中,output在就是以前分析過的FSDataOutputStream的實例,在sync()方法中調用了FSDataOutputStream的flush和hflush,其實flush什麼都沒作(noop,源碼中也說明了),hflush()則會調用也是前面提過的DFSOutputStrem類的hflush方法,hflush方法將Client緩存的全部數據(packet)當即發送給Datanodes,並阻塞直到它們寫入成功爲止。hflush以後,能夠確保Client端故障不會致使數據丟失,但若是Datanodes失效仍有丟失數據的可能,當FSDataOutputStream關閉時也會額外的執行一次flush操做:

就像註釋中所解釋的同樣,hflush是同步的只能保證能讓新reader能看到,可是並不能保證其真正的持久化到了每一個datanode中,也即沒真的調用posix中的fsync()系統調用。它只是將client端寫入的數據刷到每一個DataNode的OS緩存(store)中,若是每一個副本所在的DataNode同時crash時(例如機房斷電)將會致使數據丟失。

hdfs給客戶端還提供了另一種語義hsync:client端全部的數據都發送到副本的每一個datanode上,而且datanode上的每一個副本都完成了posix中fsync的調用,也就是說操做系統已經把數據刷到磁盤上(固然磁盤也可能緩衝數據);須要注意的是當調用fsync時只有當前的block會刷到磁盤中,要想每一個block都刷到磁盤,必須在建立流時傳入Sync標示。

hbase當前選擇的是hflush語義。這兩種語義都調用的flushOrsync方法,其中hflush調用的isSync傳入false,而hsync傳入的是true。

這個方法的主要做用就是把還在緩存(buffered )中的數據刷到datanodes中,

其中最終要的幾個方法就是flushBuffer(),waitAndQueueCurrentPackket()和waitForAckedSeqno(),調用waitAndQueueCurrentPacket()將當前Package放到發送隊列中waitForAckedSeqno()等待發送package的確認包,而其原理和寫數據同樣,就是把數據封裝成chunk在把chunk一個個填充到package,再以pipeline的方式一個個寫到datanode再根據是否有sync標識刷盤。


waitForAckedSeqno()就是用於等待ackqueue中的ack回來,並被喚醒。

相關文章
相關標籤/搜索