本文主要介紹hbase寫入流程。hbase的寫入主要有put、delete、append 3種操做。java
Client API -> RPC -> server IPC -> write WAL -> write memStore -> flush(ifNeed)
- Table類定義了 Hbase Client 擁有的API,HTable是Table的實現類,看代碼可從Htable看起。Client生成一個PUT對象,而後調用HTable的put方法apache
public void put(final Put put) throws IOException {
//請求RPC前進行驗證,驗證是否有須要提交的列,及提交內容是否超過設定大小(默認大小爲10485760(10M))。 驗證失敗拋出異常。 validatePut(put); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Void rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); //使用probubuf協議序列化,並提交RPC請求 doMutate(request); return null; } }; rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, this.operationTimeoutMs); }
客戶端在提交RPC請求以前會進行一次校驗,校驗內容爲1).是否有列, put.isEmpty()。 2). put對象大小是否超過設定值(默認最大值爲10485760(10M),客戶端配置參數爲MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"),校驗不經過時拋出異常 throw new IllegalArgumentException("No columns to insert" ) 或者 throw new IllegalArgumentException("KeyValue size too large")bash
// validate for well-formedness public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
//校驗內容1).是否有提交列 if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); }
//校驗內容2).是否超過客戶端默認配置10485760,超過拋出異常 if (maxKeyValueSize > 0) { for (List<Cell> list : put.getFamilyCellMap().values()) { for (Cell cell : list) { if (KeyValueUtil.length(cell) > maxKeyValueSize) { throw new IllegalArgumentException("KeyValue size too large"); } } } } }
客戶端的RPC請求,是由 ClientServiceCallable.doMutate 提交,RPC通訊使用的是谷歌的protobuf協議。服務器端IPC實現類爲RSRpcServices類,由mutate方法實現。服務器
在mutate方法中,會將protobuf流反序列化爲PUT、APPEND、INCREMENT、DELETE對象。在進行PUT對象處理時會檢查相關表是否有協處理器,若是沒有即調用HRegion的PUT方法進行處理。 RSRpcServices.mutate部分實現代碼以下:mvc
case PUT:
//反序列化成PUT對象 Put put = ProtobufUtil.toPut(mutation, cellScanner);
//服務器端也有校驗提交內容大小的限制。默認值同客戶端,即10485760(10M) checkCellSizeLimit(region, put); // Throws an exception when violated spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); quota.addMutation(put);
//檢查是否有協處理器 if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); byte[] family = condition.getFamily().toByteArray(); byte[] qualifier = condition.getQualifier().toByteArray(); CompareOperator compareOp = CompareOperator.valueOf(condition.getCompareType().name()); ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); TimeRange timeRange = condition.hasTimeRange() ? ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); if (region.getCoprocessorHost() != null) { processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, compareOp, comparator, put); } if (processed == null) { boolean result = region.checkAndMutate(row, family, qualifier, compareOp, comparator, timeRange, put); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndPut(row, family, qualifier, compareOp, comparator, put, result); } processed = result; } } else {
//若是沒有,提交給HRegion.put()方法處理 region.put(put); processed = Boolean.TRUE; } break;
void checkResources() throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. //若是操做的是Meta表,則不處理 if (this.getRegionInfo().isMetaRegion()) return; //當Region的MemSize大於blockingMemStoreSize時,進行一次flush(requestFlush0(FlushLifeCycleTracker.DUMMY);),本次flush是阻塞的,其它寫入請求先暫停 MemStoreSize mss = this.memStoreSizing.getMemStoreSize(); //region size = onHead size + offHead size (不是dataSize) // blockingMemStoreSize =flushSize(默認值128M) * mult (默認值4) if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) { blockedRequestsCount.increment(); requestFlush(); // Don't print current limit because it will vary too much. The message is used as a key // over in RetriesExhaustedWithDetailsException processing. throw new RegionTooBusyException("Over memstore limit=" + org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(this.blockingMemStoreSize) + ", regionName=" + (this.getRegionInfo() == null? "unknown": this.getRegionInfo().getEncodedName()) + ", server=" + (this.getRegionServerServices() == null? "unknown": this.getRegionServerServices().getServerName())); } } // blockingMemStoreSize 值初始化 void setHTableSpecificConf() { //flushSize能夠在建表時在htableDescriptor制定 if (this.htableDescriptor == null) return; long flushSize = this.htableDescriptor.getMemStoreFlushSize(); //未指定時讀取配配hbase.hregion.memstore.flush.size. // 默認值DEFAULT_MEMSTORE_FLUSH_SIZE = 1024 * 1024 * 128L 即128M if (flushSize <= 0) { flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); } this.memstoreFlushSize = flushSize; //hbase.hregion.memstore.block.multiplier // 默認值 DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER = 4 long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); //若是超過blockingMemStoreSize則進行阻塞處理 this.blockingMemStoreSize = this.memstoreFlushSize * mult; }
檢查完region 的memStore 以後,即進入寫入流程。忽略掉其它一些檢查,進入doMiniBatchMutate方法(核心操做)分佈式
步驟1: 對BatchOperation對象上鎖,返回的是一個表示正在處理中的對象MiniBatchOperationInProgress
步驟3: 初始化或構造 WAL edit對象
步驟6:完成寫入操做 ide
/** * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} * In here we also handle replay of edits on region recover. * 在這裏還處理 region恢復時重放寫入操做(WAL操做的回放) * @return Change in size brought about by applying <code>batchOp</code> * 返回處理的數量 */ private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException { boolean success = false; WALEdit walEdit = null; WriteEntry writeEntry = null; boolean locked = false; // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive) MiniBatchOperationInProgress<Mutation> miniBatchOp = null; /** Keep track of the locks we hold so we can release them in finally clause */ List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size()); try { // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with // locked rows // 對BatchOperation對象上鎖,返回的是一個表示正在處理中的對象MiniBatchOperationInProgress miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks); // We've now grabbed as many mutations off the list as we can // Ensure we acquire at least one. if (miniBatchOp.getReadyToWriteCount() <= 0) { // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? return; } lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount()); locked = true; // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp // 步驟2:更新全部操做對象的時間戳,確保是最新的。 long now = EnvironmentEdgeManager.currentTime(); batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit // 步驟3:初始化或構造 WAL edit對象 List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp); // STEP 4. Append the WALEdits to WAL and sync. //步驟4: 將WALEdits對象提交併持久化(即寫WAL) for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) { Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next(); walEdit = nonceKeyWALEditPair.getSecond(); NonceKey nonceKey = nonceKeyWALEditPair.getFirst(); if (walEdit != null && !walEdit.isEmpty()) { writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now, nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum()); } // Complete mvcc for all but last writeEntry (for replay case) if (it.hasNext() && writeEntry != null) { mvcc.complete(writeEntry); writeEntry = null; } } // STEP 5. Write back to memStore // 步驟5: 寫memStore // NOTE: writeEntry can be null here writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry); // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and // complete mvcc for last writeEntry batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry); writeEntry = null; success = true; } finally { // Call complete rather than completeAndWait because we probably had error if walKey != null if (writeEntry != null) mvcc.complete(writeEntry); if (locked) { this.updatesLock.readLock().unlock(); } releaseRowLocks(acquiredRowLocks); final int finalLastIndexExclusive = miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size(); final boolean finalSuccess = success; batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> { batchOp.retCodeDetails[i] = finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE; return true; }); batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess); batchOp.nextIndexToProcess = finalLastIndexExclusive; } }
至此HBASE的put, delete流程即算完畢。