最近隨着項目的深刻,發現hive meta有些弊端,就是你會發現它的元數據操做與操做物理集羣的代碼耦合在一塊兒,很是不利於擴展。好比:在create_table的時候同時進行路徑校驗及建立,以下代碼:java
1 if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { 2 if (tbl.getSd().getLocation() == null 3 || tbl.getSd().getLocation().isEmpty()) { 4 tblPath = wh.getTablePath( 5 ms.getDatabase(tbl.getDbName()), tbl.getTableName()); 6 } else { 7 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { 8 LOG.warn("Location: " + tbl.getSd().getLocation() 9 + " specified for non-external table:" + tbl.getTableName()); 10 } 11 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); 12 } 13 tbl.getSd().setLocation(tblPath.toString()); 14 } 15 16 if (tblPath != null) { 17 if (!wh.isDir(tblPath)) { 18 if (!wh.mkdirs(tblPath, true)) { 19 throw new MetaException(tblPath 20 + " is not a directory or unable to create one"); 21 } 22 madeDir = true; 23 }
因此這是meta沒法統一全部元數據的緣由麼。。其實hive metastore的代碼從大的來看,就比如元數據的增刪改查,從上次梳理中咱們看到,在建立HiveMetaStore的init方法中,同時建立了三種Listener---MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener用於對每一步事件的監聽與記錄。同時呢,它還new出了WareHouse,用以進行物理操做。app
1 public void init() throws MetaException { 2 rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); 3 initListeners = MetaStoreUtils.getMetaStoreListeners( 4 MetaStoreInitListener.class, hiveConf, 5 hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); 6 for (MetaStoreInitListener singleInitListener: initListeners) { 7 MetaStoreInitContext context = new MetaStoreInitContext(); 8 singleInitListener.onInit(context); 9 } 10 11 String alterHandlerName = hiveConf.get("hive.metastore.alter.impl", 12 HiveAlterHandler.class.getName()); 13 alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass( 14 alterHandlerName), hiveConf); 15 wh = new Warehouse(hiveConf); 16 。。。。 17 }
接下來,咱們從元數據的生命週期開始,學習下Partiiton的生命週期。在HiveMetaStoreClient中,查找add_partition做爲入口,這種操做在咱們insert overwrite 以表中某個字段爲分區時,好比dt=20170830,做用到的操做。或者是add_partitions,建立分區表後進行數據的導入,那麼會建立多個分區路徑,下面以add_partiitons爲例:ide
1 public int add_partitions(List<Partition> new_parts) 2 throws InvalidObjectException, AlreadyExistsException, MetaException, 3 TException { 4 return client.add_partitions(new_parts); 5 } 6 7 @Override 8 public List<Partition> add_partitions( 9 List<Partition> parts, boolean ifNotExists, boolean needResults) 10 throws InvalidObjectException, AlreadyExistsException, MetaException, TException { 11 if (parts.isEmpty()) { 12 return needResults ? new ArrayList<Partition>() : null; 13 } 14 Partition part = parts.get(0); 15 AddPartitionsRequest req = new AddPartitionsRequest( 16 part.getDbName(), part.getTableName(), parts, ifNotExists); 17 req.setNeedResult(needResults); 18 AddPartitionsResult result = client.add_partitions_req(req); 19 return needResults ? filterHook.filterPartitions(result.getPartitions()) : null; 20 }
這裏的client來自於ThriftHiveMetastore.Iface接口對象,其實現子類HiveMetaStore並調用init方法進行建立。隨後將封裝了AddPartitionsRequest類,其實這個類仍是partition的屬性,可是這樣封裝的好處是,從此再調用的時候不用再去獲取partition的DbName,,TableName等信息,一次性封裝以便後續直接使用該對象。隨後,咱們查看client調用add_partitions_req,下面代碼高能預警,很是多,咱們一點點分析。學習
1 private List<Partition> add_partitions_core( 2 RawStore ms, String dbName, String tblName, List<Partition> parts, boolean ifNotExists) 3 throws MetaException, InvalidObjectException, AlreadyExistsException, TException { 4 logInfo("add_partitions"); 5 boolean success = false; 6 // Ensures that the list doesn't have dups, and keeps track of directories we have created. 7 Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper, Boolean>(); 8 List<Partition> result = new ArrayList<Partition>(); 9 List<Partition> existingParts = null; 10 Table tbl = null; 11 try { 12 ms.openTransaction(); 13 tbl = ms.getTable(dbName, tblName); 14 if (tbl == null) { 15 throw new InvalidObjectException("Unable to add partitions because " 16 + "database or table " + dbName + "." + tblName + " does not exist"); 17 } 18 19 if (!parts.isEmpty()) { 20 firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); 21 } 22 23 for (Partition part : parts) { 24 if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { 25 throw new MetaException("Partition does not belong to target table " 26 + dbName + "." + tblName + ": " + part); 27 } 28 boolean shouldAdd = startAddPartition(ms, part, ifNotExists); 29 if (!shouldAdd) { 30 if (existingParts == null) { 31 existingParts = new ArrayList<Partition>(); 32 } 33 existingParts.add(part); 34 LOG.info("Not adding partition " + part + " as it already exists"); 35 continue; 36 } 37 boolean madeDir = createLocationForAddedPartition(tbl, part); 38 if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { 39 // Technically, for ifNotExists case, we could insert one and discard the other 40 // because the first one now "exists", but it seems better to report the problem 41 // upstream as such a command doesn't make sense. 42 throw new MetaException("Duplicate partitions in the list: " + part); 43 } 44 initializeAddedPartition(tbl, part, madeDir); 45 result.add(part); 46 } 47 if (!result.isEmpty()) { 48 success = ms.addPartitions(dbName, tblName, result); 49 } else { 50 success = true; 51 } 52 success = success && ms.commitTransaction(); 53 } finally { 54 if (!success) { 55 ms.rollbackTransaction(); 56 for (Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet()) { 57 if (e.getValue()) { 58 wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); 59 // we just created this directory - it's not a case of pre-creation, so we nuke 60 } 61 } 62 fireMetaStoreAddPartitionEvent(tbl, parts, null, false); 63 } else { 64 fireMetaStoreAddPartitionEvent(tbl, result, null, true); 65 if (existingParts != null) { 66 // The request has succeeded but we failed to add these partitions. 67 fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false); 68 } 69 } 70 } 71 return result; 72 }
首先呢fetch
一、ms.openTransaction(),這個上次已經提到過,是爲了保證操做的原子性。隨後 tbl = ms.getTable(dbName, tblName);this
二、經過dbName以及tableName獲取正個Table對象。spa
三、經過firePreEvent記錄事件。debug
四、開始循環遍歷partiiton,經過startAddPartition方法校驗該partition是否在元數據中存在code
五、調用createLocationForAddedPartition方法進行文件路徑建立,隨後調用initializeAddedPartition,主要是將table的param信息賦給partition,與hive的表結構有關,最終會將param擴展信息寫入相似meta_partition_param的擴展信息表。orm
六、待物理操做完畢以後,進行ms.addPartitions(dbName, tblName, result)元數據信息的meta錄入。
七、若是說partition的路徑已經存在,則拋出異常,而且在最後刪除已經建立的路徑。這個有一次,請看上面,首先建立了一個Map,
Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper, Boolean>();將partition對象做爲key,mkdir成功失敗的布爾值做爲value,最終經過判斷value的值,來刪除建立成功的partition.
刪除,和查詢就不說了,由於太過簡單,那麼alter_partition來了,client.alter_partition(dbName, tblName, newPart);從client端調用我也不說了~,傳入dbName,tbleName以及新的partition,隨之在hivemetaStore中調用了rename_partition方法:
@Override public void rename_partition(final String db_name, final String tbl_name, final List<String> part_vals, final Partition new_part) throws InvalidOperationException, MetaException, TException { // Call rename_partition without an environment context. rename_partition(db_name, tbl_name, part_vals, new_part, null); } private void rename_partition(final String db_name, final String tbl_name, final List<String> part_vals, final Partition new_part, final EnvironmentContext envContext) throws InvalidOperationException, MetaException, TException { startTableFunction("alter_partition", db_name, tbl_name); if (LOG.isInfoEnabled()) { LOG.info("New partition values:" + new_part.getValues()); if (part_vals != null && part_vals.size() > 0) { LOG.info("Old Partition values:" + part_vals); } } Partition oldPart = null; Exception ex = null; try { firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this)); if (part_vals != null && !part_vals.isEmpty()) { MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(), partitionValidationPattern); } oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part); // Only fetch the table if we actually have a listener Table table = null; for (MetaStoreEventListener listener : listeners) { if (table == null) { table = getMS().getTable(db_name, tbl_name); } AlterPartitionEvent alterPartitionEvent = new AlterPartitionEvent(oldPart, new_part, table, true, this); alterPartitionEvent.setEnvironmentContext(envContext); listener.onAlterPartition(alterPartitionEvent); } } catch (InvalidObjectException e) { ex = e; throw new InvalidOperationException(e.getMessage()); } catch (AlreadyExistsException e) { ex = e; throw new InvalidOperationException(e.getMessage()); } catch (Exception e) { ex = e; if (e instanceof MetaException) { throw (MetaException) e; } else if (e instanceof InvalidOperationException) { throw (InvalidOperationException) e; } else if (e instanceof TException) { throw (TException) e; } else { throw newMetaException(e); } } finally { endFunction("alter_partition", oldPart != null, ex, tbl_name); } return; }
咱們繼續來看:
一、startTableFunction方法主要用來計數
二、new_part.getValues()其實獲取的是partition的具體列值信息,好比dt=20170830,那麼獲取的就是這個20170830
三、隨之經過validatePartitionNameCharacters校驗partitionName是否合法。
四、隨後經過alterHandler.alterPartition進行partition的更改,可是爲何要用oldPart命名?已經修改了呀?(疑問)咱們跟進去會發現,其調用了updatePartColumnStats方法:
private void updatePartColumnStats(RawStore msdb, String dbName, String tableName, List<String> partVals, Partition newPart) throws MetaException, InvalidObjectException { dbName = HiveStringUtils.normalizeIdentifier(dbName); tableName = HiveStringUtils.normalizeIdentifier(tableName); String newDbName = HiveStringUtils.normalizeIdentifier(newPart.getDbName()); String newTableName = HiveStringUtils.normalizeIdentifier(newPart.getTableName()); Table oldTable = msdb.getTable(dbName, tableName); if (oldTable == null) { return; } try { String oldPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), partVals); String newPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), newPart.getValues()); if (!dbName.equals(newDbName) || !tableName.equals(newTableName) || !oldPartName.equals(newPartName)) { msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals, null); } else { Partition oldPartition = msdb.getPartition(dbName, tableName, partVals); if (oldPartition == null) { return; } if (oldPartition.getSd() != null && newPart.getSd() != null) { List<FieldSchema> oldCols = oldPartition.getSd().getCols(); if (!MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())) { updatePartColumnStatsForAlterColumns(msdb, oldPartition, oldPartName, partVals, oldCols, newPart); } } } } catch (NoSuchObjectException nsoe) { LOG.debug("Could not find db entry." + nsoe); //ignore } catch (InvalidInputException iie) { throw new InvalidObjectException("Invalid input to update partition column stats." + iie); } }
五、經過Warehouse.makePartName組裝partition的原有和新的表達,好比:dt=20180830,新的爲dataPart=20180830
六、這裏會有層判斷,若是新的表達與舊的表達不一樣則刪除原有meta信息,不然將會調用updatePartColumnStatsForAlterColumns進行meta元數據的更新。
隨後就木有了。。太晚了,碎覺啦,明天還要做死上班呢哈哈哈哈~