HBase的讀流程目前看來比較複雜,主要因爲:html
讀流程中充斥着各類Scanner,以下圖:git
+--------------+ | | +-----------+ RegionScanner+----------+ | +------+-------+ | | | | | | | +-----v+-------+ +------v-------+ +------v+------+ | | | | | | | StoreScanner | | StoreScanner | | StoreScanner | | | | | | | +--------------+ +--+---+-----+-+ +--------------+ | | | +-----------------------+ | +----------+ | | | | | | +-------v---------+ +-------------v----+ +---------v------+ | | | | | | |StoreFileScanner | |StoreFileScanner | | MemStoreScanner| | | | | | | +-------+---------+ +--------+---------+ +-------+--------+ | | | | | | | | | | | | +-------v---------+ +--------v---------+ +-------v--------+ | | | | | | | HFileScanner | | HFileScanner | | HFileScanner | | | | | | | +-----------------+ +------------------+ +----------------+
在HBase中,一張表能夠有多個Column Family,在一次Scan的流程中,每一個Column Family(後續叫Store)的數據讀取由一個StoreScanner對象負責。每一個Store的數據由一個內存中的MemStore和磁盤上的HFile文件組成,相對應的,StoreScanner對象僱傭一個MemStoreScanner和N個StoreFileScanner來進行實際的數據讀取。github
從邏輯上看,讀取一行的數據須要apache
實現上,這兩步都是經過堆完成。RegionScanner的讀取經過下面的多個StoreScanner組成的堆
完成,使用RegionScanner的成員變量KeyValueHeap storeHeap表示app
組成StoreScanner的多個Scanner在RegionScannerImpl構造函數中得到:ide
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); // 實際是StoreScanner類型 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } }
store.getScanner(scan, entry.getValue(), this.readPt)內部就是new 一個StoreScanner,邏輯都在StoreScanner的構造函數中函數
構造函數內部其實就是找到相關的HFile和MemStore,而後建堆,注意,這個堆是StoreScanner級別的,一個StoreScanner一個堆,堆中的元素就是底下包含的HFile和MemStore對應的StoreFileScanner和MemStoreScanner
獲得相關的HFile和MemStore邏輯在StoreScanner::getScannersNoCompaction()中,內部會根據請求指定的TimeRange,KeyRange過濾掉不須要的HFile,同時也會利用bloom filter過濾掉不須要的HFIle.接着,調用性能
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
對這些StoreFileScanner和MemStoreScanner分別進行seek,seekKey是matcher.getStartKey(),
以下構造優化
return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP, Type.DeleteFamily);
seek是針對KeyValue的,seek的語義是seek到指定KeyValue,若是指定KeyValue不存在,則seek到指定KeyValue的下一
個。舉例來講,假設名爲X的column family裏有兩列a和b,文件中有兩行rowkey分別爲aaa和
bbb,以下表所示.this
Column Family X | ||
rowkey | column a | column b |
aaa | 1 | abc |
bbb | 2 | def |
HBase客戶端設置scan請求的start key爲aaa,那麼matcher.getStartKey()會被初始化爲(rowkey, family, qualifier,timestamp,type)=(aaa,X,null,LATEST_TIMESTAMP,Type.DeleteFamily),根據KeyValue的比較原則,這個KeyValue比aaa行的第一個列a更
小(由於沒有qualifier),因此對這個StoreFileScanner seek時,會seek到aaa這行的第一列a
實際上
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
有可能不會對StoreFileScanner進行實際的seek,而是進行lazy seek,seek的工做放到不得不作的時候。後續會專門說lazy seek
上面獲得了請求scan涉及到的全部的column family對應的StoreScanner,隨後調用以下函數進行建堆:
protected void initializeKVHeap(List<KeyValueScanner> scanners, List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { this.storeHeap = new KeyValueHeap(scanners, region.comparator); if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator); } }
KeyValueScanner是一個接口,表示一個能夠向外迭代出KeyValue
的Scanner,StoreFileScanner,MemStoreScanner和StoreScanner都實現了該接口。這裏的comparator類型爲KVScannerComparator,用於比較兩個KeyValueScanner,實際上內部使用了KVComparator,它是用來比較兩個KeyValue的。從後面能夠看出,實際上,這個由KeyValueScanner組成的堆,堆頂KeyValueScanner知足的特徵是: 它的堆頂(KeyValue)最小
堆用類KeyValueHeap表示,看KeyValueHeap構造函數作了什麼
KeyValueHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) throws IOException { this.comparator = comparator; if (!scanners.isEmpty()) { // 根據傳入的KeyValueScanner構造出一個優先級隊列(內部實現就是堆) this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(), this.comparator); for (KeyValueScanner scanner : scanners) { if (scanner.peek() != null) { this.heap.add(scanner); } else { scanner.close(); } } //以上將元素加入堆中 // 從堆頂pop出一個KeyValueScanner放入成員變量current,那麼這個堆的堆頂 // 就是current這個KeyValueScanner的堆頂,KeyValueHeap的peek()取堆頂 // 操做直接返回current.peek() this.current = pollRealKV(); } }
在看pollRealKV()怎麼作的以前須要先看看HBase 0.94引入的Lazy Seek
在這個優化以前,讀取一個column family(Store),須要seek其下的全部HFile和MemStore到指定的查詢KeyValue(seek的語義爲若是KeyValue存在則seek到對應位置,若是不存在,則seek到這個KeyValue的後一個KeyValue,假設Store下有3個HFile和一個MemStore,按照時序遞增記爲[HFile1, HFile2, HFile3, MemStore],在lazy seek優化以前,須要對全部的HFile和MemStore進行seek,對HFile文件的seek比較慢,每每須要將HFile相應的block加載到內存,而後定位。在有了lazy seek優化以後,若是須要的KeyValue在HFile3中就存在,那麼HFIle1和HFile2都不須要進行seek,大大提升速度。大致來講,思路是請求seek某個KeyValue時實際上沒有對StoreFileScanner進行真正的seek,而是對於每一個StoreFileScanner,設置它的peek爲(rowkey,family,qualifier,lastTimestampInStoreFile)
KeyValueHeap有兩個重要的接口,peek()和next(),他們都是返回堆頂,區別在於next()會將堆頂出堆,而後從新調整堆,對外來講就是迭代器向前移動,而peek()不會將堆頂出堆,堆頂不變。實現中,
peek()操做很是簡單,只須要調用堆的成員變量current的peek()方法操做便可.拿StoreScanner堆舉例,current要麼是StoreFileScanner類型要麼是MemStore,那麼到底current是如何選擇出來的以及Lazy Seek是如何實現的?
下面舉個例子說明。
HBase開啓了Lazy Seek優化(實際上默認開啓)
Store下有三個HFile和MemStore,按照時間順序記做[HFile1,HFile2,HFile3,MemStore],seek KeyValue爲(rowkey,family,qualifier,timestamp),記做seekKV.
而且它只在HFile3中存在,不在其餘HFile和MemStore中存在
seekScanner()的邏輯,若是是lazy seek,則對於每一個Scanner都調
用requestSeek(seekKV)方法,方法內部首先進行rowcol類型的bloom filter過濾
public KeyValue createLastOnRowCol() { return new KeyValue( bytes, getRowOffset(), getRowLength(), bytes, getFamilyOffset(), getFamilyLength(), bytes, getQualifierOffset(), getQualifierLength(), HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0); }
能夠看出ts設置爲最小,說明這個KeyValue排在全部的同rowkey同column family同qualifier的KeyValue最後。顯然,當上層StoreScanner取堆頂時,
若是其它StoreFileScanner/MemStoreScanner中存在同rowkey同column family同qualifier的真實的KeyValue則會優先彈出。
realSeekDone = false; long maxTimestampInFile = reader.getMaxTimestamp(); long seekTimestamp = kv.getTimestamp(); if (seekTimestamp > maxTimestampInFile) { // Create a fake key that is not greater than the real next key. // (Lower timestamps correspond to higher KVs.) // To understand this better, consider that we are asked to seek // to // a higher timestamp than the max timestamp in this file. We // know that // the next point when we have to consider this file again is // when we // pass the max timestamp of this file (with the same // row/column). cur = kv.createFirstOnRowColTS(maxTimestampInFile); } else { enforceSeek(); }
顯然,當kv的ts比HFile中最大的ts都更大時,那麼這個HFile中顯然不存在seekKV,可是可能存在
相同rowkey,family,qualifier的不一樣ts的KeyValue,那麼這裏設置堆頂時要注意,不能把堆頂設置爲比當前HFile文件中的可能真實存在的相同rowkey,family,qualifier的KeyValue大,以下:
public KeyValue createFirstOnRowColTS(long ts) { return new KeyValue( bytes, getRowOffset(), getRowLength(), bytes, getFamilyOffset(), getFamilyLength(), bytes, getQualifierOffset(), getQualifierLength(), ts, Type.Maximum, bytes, getValueOffset(), getValueLength()); }
Type的比較中,Type.Maximum最小,這樣產生的KeyValue保證了不會大於當前HFile文件中的可能存在的相同rowkey,family,qualifier的KeyValue,同時將seekKV保存到StoreFileScanner成員變量delayedSeekKV中,以便後續真正seek的時候獲取.
考慮一下若是seekKV的ts比當前HFile中的maxTimestamp更小怎麼辦?能夠設置一個ts爲latest_timestamp
的KeyValue麼?若是設置了,它會比其它HFile中存在實際的KeyValue先彈出,這樣順序就亂了,因此這種狀況下,只能進行實際的seek,enforceSeek()函數中進行實際的seek後,將realSeekDone設置爲
true.
由於HFile3的latestTimestampInStoreFile最大,因此會首先取到HFile3對應的StoreFileScanner的pee
k(KeyValue的比較原則是timestamp大的KeyValue更小),
這個時候會檢查這個KeyValueScanner是否進行了實際的seek(對於StoreFileScanner來講,經過布爾變量realSeekDone進行標記,對於MemStoreScanner來講,始終返回true),在這裏,沒有進行real seek
,接着進行實際的seek操做,seek到HFile3中存在的seekKV,接着拿着seekKV去和HFile2的peek進行比較,顯然seekKV比HFile2的peek小(因爲timestamp > lastTimestampInStoreFile2),故
StoreScanner的peek操做返回seekKV。
實現中,KeyValueHeap有兩個重要的接口,peek()和next(),他們都是返回堆頂,區別在於next()會將堆頂出堆,而後從新調整堆,對外來講就是迭代器向前移動,而peek()不會將堆頂出堆,堆頂不變。實現中,
peek()操做很是簡單,只須要調用堆的成員變量current的peek()方法操做便可.拿StoreScanner堆舉例,current要麼是StoreFileScanner類型要麼是MemStore,而current的選擇則是pollRealKV()
完成的,這個函數之因此內部有while循環就是由於考慮了Lazy Seek優化,實際上,pollRealKV()代碼的邏輯就是例子中"取StoreScanner堆頂邏輯"。pollRealKV()的返回值會賦給current
protected KeyValueScanner pollRealKV() throws IOException { KeyValueScanner kvScanner = heap.poll(); if (kvScanner == null) { return null; } while (kvScanner != null && !kvScanner.realSeekDone()) { if (kvScanner.peek() != null) { kvScanner.enforceSeek(); KeyValue curKV = kvScanner.peek(); if (curKV != null) { KeyValueScanner nextEarliestScanner = heap.peek(); if (nextEarliestScanner == null) { // The heap is empty. Return the only possible scanner. return kvScanner; } // Compare the current scanner to the next scanner. We try to avoid // putting the current one back into the heap if possible. KeyValue nextKV = nextEarliestScanner.peek(); if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { // We already have the scanner with the earliest KV, so return it. return kvScanner; } // Otherwise, put the scanner back into the heap and let it compete // against all other scanners (both those that have done a "real // seek" and a "lazy seek"). heap.add(kvScanner); } else { // Close the scanner because we did a real seek and found out there // are no more KVs. kvScanner.close(); } } else { // Close the scanner because it has already run out of KVs even before // we had to do a real seek on it. kvScanner.close(); } kvScanner = heap.poll(); } return kvScanner; }
內存中的Memstore被flush到文件系統或者compaction完成都會改變Store的HFile文件集合。
在每次作完一批mutate操做後,會經過HRegion::isFlushSize(newSize)檢查是否須要對當前HRegion內的memstore進行flush
其實就是判斷HRegion內的全部的memstore大小和是否大於hbase.hregion.memstore.flush.size,默認128MB,若是須要flush,會將請求放入後臺flush線程(MemStoreFlusher)的隊列中,由後臺flush線程處理,調用路徑HRegion::flushcache()->internalFlushcache(...)->StoreFlushContext.flushCache(...)->StoreFlushContext.commit(...)=>HStore::updateStorefiles(),這塊邏輯在HBase Snapshot原理和實現中有講到,這裏不贅述。只說一下最後一步的updateStorefiles()操做,該函數主要工做是拿住HStore級別的寫鎖,而後將新產生的HFile文件插入到StoreEngine中,解寫鎖,而後釋放snapshot,最後調用
notifyChangedReadersObservers(),以下:
this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().insertNewFiles(sfs); this.memstore.clearSnapshot(set); } finally { // We need the lock, as long as we are updating the storeFiles // or changing the memstore. Let us release it before calling // notifyChangeReadersObservers. See HBASE-4485 for a possible // deadlock scenario that could have happened if continue to hold // the lock. this.lock.writeLock().unlock(); } // Tell listeners of the change in readers. notifyChangedReadersObservers();
重點在於notifyChangedReadersObservers(),看看代碼:
private void notifyChangedReadersObservers() throws IOException { for (ChangedReadersObserver o: this.changedReaderObservers) { o.updateReaders(); } }
實際上,每一個observer類型都是StoreScanner,每次新開一個StoreScanner都會註冊在Store內部的這個observer集合中,當Store下面的HFile集合變化時,通知這些註冊上來的StoreScanner便可。
具體的通知方式就是首先拿住StoreScanner的鎖,將這個時候的堆頂保存在成員變量lastTop中,
而後將StoreScanner內部的堆置爲null(this.heap=null)最後解鎖,而StoreScanner那邊next/seek/reseek時,都會首先經過函數checkReseek()函數來檢查是否this.heap爲null,爲null
,爲null說明當前Store下的HFile集合改變了,那麼調用resetScannerStack(lastTop),將當前
Store下的全部StoreFileScanner/MemStoreScanner都seek到lastTop,而後從新建StoreScanner對應的堆。checkReseek()代碼以下:
protected boolean checkReseek() throws IOException { if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); if (this.heap.peek() == null || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { LOG.debug("Storescanner.peek() is changed where before = " + this.lastTop.toString() + ",and after = " + this.heap.peek()); this.lastTop = null; return true; } this.lastTop = null; // gone! } // else dont need to reseek return false; }