hbase源碼系列(十三)緩存機制MemStore與Block Cache

這一章講hbase的緩存機制,這裏面涉及的內容也是比較多,呵呵,我理解中的緩存是保存在內存中的特定的便於檢索的數據結構就是緩存。html

以前在講put的時候,put是被添加到Store裏面,這個Store是個接口,實現是在HStore裏面,MemStore實際上是它底下的小子。算法

那它和Region Server、Region是什麼關係?數組

Region Server下面有若干個Region,每一個Region下面有若干的列族,每一個列族對應着一個HStore。緩存

HStore裏面有三個很重要的類,在這章的內容都會提到。數據結構

protected final MemStore memstore;
private final CacheConfig cacheConf;
final StoreEngine<?, ?, ?, ?> storeEngine;

MemStore是存儲着兩個有序的kv集合,kv進來先寫到裏面,超過閥值以後就會寫入硬盤。ide

CacheConf是針對HFileBlock的緩存,專門用來緩存快,默認是在讀的時候緩存塊,也能夠修改列族的參數,讓它在寫的時候也緩存,這個在數據模型定義的時候提到過。函數

StoreEngine是StoreFile的管理器,它管理着這個列族對應的全部StoreFiles。this

1. MemStore

memstore比較有意思,咱們先看它的add方法,這個是入口。spa

long add(final KeyValue kv) {
    this.lock.readLock().lock();
    try {
      KeyValue toAdd = maybeCloneWithAllocator(kv);
      return internalAdd(toAdd);
    } finally {
      this.lock.readLock().unlock();
    }
}

先把kv放到maybeCloneWithAllocator裏面複製出來一個新的kv,而後再走internalAdd的方法,爲啥要這麼搞呢?線程

1.1 MemStoreLAB

先看maybeCloneWithAllocator,咱們慢慢看,不要緊。

private KeyValue maybeCloneWithAllocator(KeyValue kv) {
    if (allocator == null) {
      return kv;
    }
    int len = kv.getLength();
    //從allocator當中分配出來len長度的非堆空間
    Allocation alloc = allocator.allocateBytes(len);
    if (alloc == null) {
      // 太大了,allocator決定不給它分配
      return kv;
    }
    //用allocator生成的空間,new一個kv出來
    assert alloc != null && alloc.getData() != null;
    System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
    KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
    newKv.setMvccVersion(kv.getMvccVersion());
    return newKv;
 }

allocator是何許人也,它是一個MemStoreLAB,它是幹啥的呀,這個讓人很糾結呀?

public Allocation allocateBytes(int size) {
    // 若是申請的size比maxAlloc大,就不分了
    if (size > maxAlloc) {
      return null;
    }

    while (true) {
      Chunk c = getOrMakeChunk();

      // 給它分配個位置,返回數組的起始位置
      int allocOffset = c.alloc(size);
      if (allocOffset != -1) {
        // 用一個數據結構Allocation來描述這個,它主要包括兩個信息,1:數組的引用,2:數據在數組當中的起始位置
        return new Allocation(c.data, allocOffset);
      }

      // 空間不足了,釋放掉它
      tryRetireChunk(c);
    }
}
View Code

下面看看getOrMakeChunk看看是啥狀況,挺疑惑的東西。

private Chunk getOrMakeChunk() {
    while (true) {
      // 當前的Chunk不爲空,就取當前的
      Chunk c = curChunk.get();
      if (c != null) {
        return c;
      }
      // 這裏還有個Chunk的Pool,默認是沒有的,走的是new Chunk這條路徑
      c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
      if (curChunk.compareAndSet(null, c)) {
        // curChunk是爲空的話,就設置爲c,而後加到chunkQueue裏面
        c.init();
        this.chunkQueue.add(c);
        return c;
      } else if (chunkPool != null) {
        // 先放回去,待會兒再拿出來
        chunkPool.putbackChunk(c);
      }      
    }
}
View Code

Chunk是一個持有一個byte[]數組的數據結構,屬性以下。

static class Chunk {
    /* 實際數據保存的地方,被不停地分配 */
    private byte[] data;
    private static final int UNINITIALIZED = -1;
    private static final int OOM = -2;
    /* 下一個chunk的起始位置,也是上一個chunk的結束位置 */
    private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);

    /** 分配給了多少個kv */
    private AtomicInteger allocCount = new AtomicInteger();

    /** Chunk的大小 */
    private final int size;

好吧,咱們如今清楚了,它是給每一個kv的數據又從新找了個地方混,從註釋上面講這個Chunk未初始化,沒有被分配內存,因此開銷小。不太理解這個東西,人家以前也是在byte數組裏面混,只不顧挪了個窩了,莫非是爲了減小內存碎片?尼瑪,還真被我說中了,在我之前的資料裏面有《調優》

無論怎麼樣吧,把多個小的kv寫到一個連續的數組裏面多是好點好處吧,下面講一下它的相關參數吧。

/** 可分配的最大值,超過這個值就不給它分配了,默認值是256K */
hbase.hregion.memstore.mslab.max.allocation 默認值是256  * 1024
/** 每一個Chunk的大小,默認是2M */
hbase.hregion.memstore.mslab.chunksize 默認值是2048 * 1024

那咱們繼續講講這個MemStoreChunkPool吧,它默認是不被開啓的,由於它的參數hbase.hregion.memstore.chunkpool.maxsize默認是0 (只容許輸入0->1的數值),它是經過堆內存的最大值*比例來計算得出來的結果。

它能夠承受的最大的Chunk的數量是這麼計算的 MaxCount = MemStore內存限制 * Chunkpool.Maxsize / Chunksize。

MemStore的內存最大最小值分別是0.35 --> 0.4,這個在我以前的博客裏面也有。

hbase.regionserver.global.memstore.upperLimit 
hbase.regionserver.global.memstore.lowerLimit

還有這個參數hbase.hregion.memstore.chunkpool.initialsize須要設置,默認又是0,輸入0->1的數值,MaxCount乘以它就設置初始的Chunk大小。

沒試過開啓這個Pool效果是否會好,它是依附在MemStore裏面的,它設置過大了,最直接的影響就是,另外兩個集合的空間就小了。

1.2 有序集合

分配完Chunk以後,乾的是這個函數,就是添加到一個有序集合當中kvset。

private long internalAdd(final KeyValue toAdd) {
    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
    //把時間戳範圍加到內部去
    timeRangeTracker.includeTimestamp(toAdd);
    this.size.addAndGet(s);
    return s;
}

MemStore裏面有兩個有序的集合,kvset和snapshot,KeyValueSkipListSet的內部實現是ConcurrentNavigableMap。

volatile KeyValueSkipListSet kvset;
volatile KeyValueSkipListSet snapshot;

它們的排序規則上一章已經說過了,排過序的在搜索的時候方便查找,這裏爲何還有一個snapshot呢?snapshot是一個和它同樣的東西,咱們都知道MemStore是要flush到文件生成StoreFile的,那我不能寫文件的時候讓別人都無法讀了吧,那怎麼辦,先把它拷貝到snapshot當中,這個時間很短,複製完了就能夠訪問kvset,實際flush的以後,咱們flush掉snapshot當中的kv就能夠啦。

2. CacheConfig

在看這個以前,先推薦看一下個人另一篇文章《緩存機制以及能夠利用SSD做爲存儲的BucketCache》,不然後面有不少概念,你看不懂的。

這裏咱們主要關注的是LruBlockCache和BucketCache,至於他們的使用,請參照上面的博客設置,這裏再也不介紹哦。

CacheConfig是一個HStore一個,屬性是根據列族定製的,好比是否常駐內存,可是它內存用來緩存塊的BlockCache是Region Server全局共享的的globalBlockCache,在new一個CacheConfig的時候,它會調用instantiateBlockCache方法返回一個BlockCache緩存Block的,若是已經存在globalBlockCache,就直接返回,沒有才會從新實例化一個globalBlockCache。

這裏還分堆上內存和直接分配的內存,堆上的內存的參數hfile.block.cache.size默認是0.25。

2.1 DoubleCache

直接分配的內存,要經過設置JVM參數-XX:MaxDirectMemorySize來設置,設置了這個以後咱們還須要設置hbase.offheapcache.percentage(默認是0)來設置佔直接分配內存的比例。

offHeapCacheSize =offheapcache.percentage * DirectMemorySize

這裏咱們還真不能設置它,由於若是設置了它的話,它會把new一個DoubleCache出來,它是LruBlockCache和SlabCache的合體,以前我提到的那篇文章裏面說到SlabCache是一個只能存固定大小的Block大小的Cache,比較垃圾。

2.2 LruBlockCache

若是offHeapCacheSize <= 0,就走下面的邏輯,這裏我就簡單陳述一下了,代碼沒啥可貼的。

LruBlockCache和BucketCache的合做方式有兩種,一種是BucketCache做爲二級緩存使用,好比SSD,一種是在內存當中,它倆各佔比列0.1和0.9,仍是建議上SSD作二級緩存,其實也不貴。

無論如何,BlockCache這塊的總大小是固定的,是由這個參數決定hfile.block.cache.size,默認它是0.25,因此LruBlockCache最大也就是0.25的最大堆內存。

在LruBlockCache當中還分了三種優先級的緩存塊,分別是SINGLE、MULTI、MEMORY,比列分別是0.2五、0.五、0.25,當快要滿的時候,要把塊剔除出內存的時候,就要遍歷全部的塊了,而後計算他們的分別佔的比例,剔除的代碼還挺有意思。

     PriorityQueue<BlockBucket> bucketQueue =
        new PriorityQueue<BlockBucket>(3);

      bucketQueue.add(bucketSingle);
      bucketQueue.add(bucketMulti);
      bucketQueue.add(bucketMemory);

      int remainingBuckets = 3;
      long bytesFreed = 0;

      BlockBucket bucket;
      while((bucket = bucketQueue.poll()) != null) {
        long overflow = bucket.overflow();
        if(overflow > 0) {
          //把要釋放的空間bytesToFree分給3個bucket,3個分完
          long bucketBytesToFree = Math.min(overflow,
            (bytesToFree - bytesFreed) / remainingBuckets);
          bytesFreed += bucket.free(bucketBytesToFree);
        }
        remainingBuckets--;
      }

搞了一個優先級隊列,先從SINGLE的開刀、SINGLE不行了,再拿MULTI開刀,最後是MEMORY。bytesToFree是以前計算好的,要釋放的大小=當前值-最小值。

在咱們設置列族參數的時候,有一個InMemory的參數,若是設置了它就是MEMORY,若是沒設置,就是SINGLE,SINGLE類型的一旦被訪問過以後,立馬變成高富帥的MULTI,可是沒有但願變成MEMORY。

這裏以前百度的一個哥麼問我,Meta表的塊會不會一直被保存在MEMORY當中呢,這塊的代碼寫得讓人有點兒鬱悶的,它是按照列族的參數設置的,可是我怎麼去找Meta表的列族設置啊,啊被我找到了,在代碼裏面寫着的。

public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
      TableName.META_TABLE_NAME,
      new HColumnDescriptor[] {
          new HColumnDescriptor(HConstants.CATALOG_FAMILY)
              // 保持10個版本是爲了幫助調試
              .setMaxVersions(10)
              .setInMemory(true)
              .setBlocksize(8 * 1024)
              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
              // 不使用BloomFilter
.setBloomFilterType(BloomType.NONE) });

能夠看出來Meta表的塊只有8K,常駐內存,不使用BloomFilter,容許集羣間複製。

再吐槽一下hbase這個Lru算法吧,作得挺粗糙的,它記錄了每一個Block塊的訪問次數,可是它並無按照這個來排序,就是簡單的依賴哈希值來排序。

Tips:江湖傳言一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大於等於heapsize * 0.8,不然HBase不能正常啓動,想一想也是,hbase是內存大戶,內存稍有不夠就掛掉,你們要當心設置這個緩存的參數。

 2.3 BucketCache

原來這塊的圖在上面的那篇文章已經提到了,我就再也不重複了,以前沒看的請必定要看,那邊有很詳細的圖解,我這裏只是講點我瞭解的實現。

咱們能夠從兩個方法裏面看LruBlockCache和BucketCache的關係,一個是getBlock,一個是evictBlock,先看evictBlock。

protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
  //從map裏面刪除 map.remove(block.getCacheKey());
if (evictedByEvictionProcess && victimHandler != null) { boolean wait = getCurrentSize() < acceptableSize(); boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
   //保存到victimHandler裏面 victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), inMemory, wait); }
return block.heapSize()
}

在把block剔除出內存以後,就把塊加到victimHandler裏面,這個victimHandler就是BucketCache,在CacheConfig實例化LruBlockCache以後就用setVictimCache方法傳進去的。

看完這個咱們再看getBlock。

public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
    CachedBlock cb = map.get(cacheKey);
    if(cb == null) {if (victimHandler != null)
        return victimHandler.getBlock(cacheKey, caching, repeat);
      return null;
    }
    return cb.getBuffer();
}

 先從map中取,若是找不到就從victimHandler中取得。

從上面兩個方法,咱們能夠看出來BucketCache是LruBlockCache的二級緩存,它不要了纔會存到BucketCache當中,取得時候也是,找不到了纔想起人家來。

好,咱們如今進入到BucketCache裏面看看,它裏面有幾個重要的屬性。

// Store/read block data
IOEngine ioEngine;
// 內存map
private ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> ramCache;
// 後備隊列,質保存塊的索引信息,好比offset, length
private ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;

這裏怎麼又來了兩個,一個內存的,一個後備隊裏的,這個是有區別的RAMQueueEntry當中直接保存了塊的buffer數據,BucketEntry只是保存了起始位置和長度。

下面咱們看看這個流程吧,仍是老規矩,先看入口,再看出口,入口在哪裏,前面的代碼中提到了,入口在cacheBlockWithWait方法。

    //已經有就不加啦
    if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
      return;
    //寫入一級緩存
    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
    ramCache.put(cacheKey, re);
    //用哈希值給計算出一個隨機的隊列來
    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
    //把實體也插入到寫入隊列
    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);

 

能夠看得出來在這個方法當中,先把塊寫入到ramCache當中,而後再插入到一個隨機的寫入隊列,寫入線程有3個,每一個寫入線程持有一個寫入隊列,線程的數量由參數hbase.bucketcache.writer.threads控制。

咱們看看這個WriterThread的run方法吧。

     List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
      try {
        while (cacheEnabled && writerEnabled) {
          try {
            //從inputQueue拿出來放到entries,而後再對entries操做
            entries.add(inputQueue.take());
            inputQueue.drainTo(entries);
          } catch (InterruptedException ie) {
            if (!cacheEnabled) break;
          }
          doDrain(entries);
        }

 

那咱們要關注的就是doDrain的方法了,在這個方法裏面,它主要乾了4件事情。

一、把ramCache當中的實體給剔除出來轉換成BucketEntry,並切入到ioEngine。

二、ioEngine同步,ioEngine包括3種(file,offheap,heap),第一種就是寫入SSD,用的是FileChannel,後兩種是寫入到一個ByteBufferArray

三、把BucketEntry添加到backingMap

四、若是空間不足的話,調用freeSpace清理空間,清理空間的方法和LruBlockCache的方法相似。

這裏面的Bucket它也不是一個具體的東西,它裏面記住的也是起始位置,使用了多少次的這些參數,因此說它是一個邏輯上的,而不是物理上的分配的一塊隨機的地址。

final private static class Bucket {
    //基準起始位置
    private long baseOffset;
    //每一個item分配的大小
    private int itemAllocationSize; 
    //對應的在bucketSizeInfos中的位置
    private int sizeIndex;
    //總容量
    private int itemCount;
    private int freeList[];
    //空閒的數量
    private int freeCount;
    //已經使用的數量
    private int usedCount;
}

 

咱們是否是能夠這麼理解:就是當咱們不須要某個塊的時候咱們不用去物理的刪除它,只須要不斷的重用它裏面的空間就能夠了,而不須要管怎麼刪除、釋放等相關內容。

BucketSizeInfo是負責管理這些Bucket的,它管理着3個隊列,同時它能夠動態根據需求,new一些新的不一樣大小的Bucket出來,也能夠把現有的Bucket變動它的大小,Bucket的大小最小是5K,最大是513K。

final class BucketSizeInfo {
    // Free bucket means it has space to allocate a block;
    // Completely free bucket means it has no block.
    private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
    private int sizeIndex;
}

 

sizeIndex是啥意思?是在BucketSizeInfo的數組裏面的位置,它的大小都是有固定的值的,不能多也不能少,這裏就不詳細介紹了。咱們直接看WriteToCache這個方法吧,好驗證一下以前的想法。
    //序列化長度 = 數據長度 + 額外的序列化的長度16個字節
      int len = data.getSerializedLength();
      // This cacheable thing can't be serialized...
      if (len == 0) return null;
      //bucketAllocator給分配點空間
      long offset = bucketAllocator.allocateBlock(len);
      //生成一個實體
      BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
      //設置Deserializer,具體的實如今HFileBlock當中
      bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
      try {
        if (data instanceof HFileBlock) {
          ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();
          sliceBuf.rewind();
          assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
          ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
          ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
          //先寫入數據信息,再寫入頭信息
          ioEngine.write(sliceBuf, offset);
          ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
        } else {
          //若是不是HFileBlock的話,把數據序列化到bb當中,而後寫入到IOEngine
          ByteBuffer bb = ByteBuffer.allocate(len);
          data.serialize(bb);
          ioEngine.write(bb, offset);
        }
      } catch (IOException ioe) {
        // 出錯了就釋放掉這個這個塊
        bucketAllocator.freeBlock(offset);
        throw ioe;
    }

 

這裏咱們看這一句就能夠了ioEngine.write(sliceBuf, offset);  在寫入ioEngine的時候是要傳這個offset的,也正好驗證了我以前的想法,因此BucketAllocator.allocateBlock的分配管理這塊就很關鍵了。

關於怎麼分配這塊,仍是留個能人講吧,我是講很差了。

相關文章
相關標籤/搜索