Hive metastore總體代碼分析及詳解

  從上一篇對Hive metastore表結構的簡要分析中,我再根據數據設計的實體對象,再進行整個代碼結構的總結。那麼咱們先打開metadata的目錄,其目錄結構:redis

  能夠看到,整個hivemeta的目錄包含metastore(客戶端與服務端調用邏輯)、events(事件目錄包含table生命週期中的檢查、權限認證等listener實現)、hooks(這裏的hooks僅包含了jdo connection的相關接口)、parser(對於表達樹的解析)、spec(partition的相關代理類)、tools(jdo execute相關方法)及txn及model,下來咱們從整個metadata分逐一進行代碼分析及註釋:sql

  沒有把包打開,不少類?是否是感受懼怕很想死?我也想死,我們繼續。。一開始,咱們可能以爲一團亂麻煩躁,這是啥玩意兒啊這。。冷靜下來,咱們從Hive這個大類開始看,由於它是metastore元數據調用的入口。整個生命週期分析流程爲: HiveMetaStoreClient客戶端的建立及加載、HiveMetaStore服務端的建立及加載、createTable、dropTable、AlterTable、createPartition、dropPartition、alterPartition。固然,這只是完整metadata的一小部分。數據庫

  一、HiveMetaStoreClient客戶端的建立及加載apache

  那麼咱們從Hive這個類一點點開始看:設計模式

 1   private HiveConf conf = null;
 2   private IMetaStoreClient metaStoreClient;
 3   private UserGroupInformation owner;
 4 
 5   // metastore calls timing information
 6   private final Map<String, Long> metaCallTimeMap = new HashMap<String, Long>();
 7 
 8   private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
 9     @Override
10     protected synchronized Hive initialValue() {
11       return null;
12     }
13 
14     @Override
15     public synchronized void remove() {
16       if (this.get() != null) {
17         this.get().close();
18       }
19       super.remove();
20     }
21   };

  這裏聲明的有hiveConf對象、metaStoreClient 、操做用戶組userGroupInfomation以及調用時間Map,這裏存成一個map,用來記錄每個動做的運行時長。同時維護了一個本地線程hiveDB,若是db爲空的狀況下,會從新建立一個Hive對象,代碼以下:api

 1   public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
 2     Hive db = hiveDB.get();
 3     if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
 4       if (db != null) {
 5         LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
 6           ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
 7       }
 8       closeCurrent();
 9       c.set("fs.scheme.class", "dfs");
10       Hive newdb = new Hive(c);
11       hiveDB.set(newdb);
12       return newdb;
13     }
14     db.conf = c;
15     return db;
16   }

  隨後咱們會發現,在建立Hive對象時,便已經將function進行註冊,什麼是function呢,經過上次的表結構分析,能夠理解爲全部udf等jar包的元數據存儲。代碼以下:數組

 1   // register all permanent functions. need improvement
 2   static {
 3     try {
 4       reloadFunctions();
 5     } catch (Exception e) {
 6       LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
 7     }
 8   }
 9 
10   public static void reloadFunctions() throws HiveException {
    //獲取 Hive對象,用於後續方法的調用
11 Hive db = Hive.get();
    //經過遍歷每個dbName
12 for (String dbName : db.getAllDatabases()) {
      //經過dbName查詢掛在該db下的全部function的信息。
13 for (String functionName : db.getFunctions(dbName, "*")) { 14 Function function = db.getFunction(dbName, functionName); 15 try {
     //這裏的register即是將查詢到的function的數據註冊到Registry類中的一個Map<String,FunctionInfo>中,以便計算引擎在調用時,沒必要再次查詢數據庫。
16 FunctionRegistry.registerPermanentFunction( 17 FunctionUtils.qualifyFunctionName(functionName, dbName), function.getClassName(), 18 false, FunctionTask.toFunctionResource(function.getResourceUris())); 19 } catch (Exception e) { 20 LOG.warn("Failed to register persistent function " + 21 functionName + ":" + function.getClassName() + ". Ignore and continue."); 22 } 23 } 24 } 25 }

  調用getMSC()方法,進行metadataClient客戶端的建立,代碼以下:安全

 1  1   private IMetaStoreClient createMetaStoreClient() throws MetaException {
 2  2   
 3     //這裏實現接口HiveMetaHookLoader
 4  3     HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
 5  4         @Override
 6  5         public HiveMetaHook getHook(
 7  6           org.apache.hadoop.hive.metastore.api.Table tbl)
 8  7           throws MetaException {
 9  8 
10  9           try {
11 10             if (tbl == null) {
12 11               return null;
13 12             }
14          //根據tble的kv屬性加載不一樣storage的實例,好比hbase、redis等等拓展存儲,做爲外部表進行存儲
15 13             HiveStorageHandler storageHandler =
16 14               HiveUtils.getStorageHandler(conf,
17 15                 tbl.getParameters().get(META_TABLE_STORAGE));
18 16             if (storageHandler == null) {
19 17               return null;
20 18             }
21 19             return storageHandler.getMetaHook();
22 20           } catch (HiveException ex) {
23 21             LOG.error(StringUtils.stringifyException(ex));
24 22             throw new MetaException(
25 23               "Failed to load storage handler:  " + ex.getMessage());
26 24           }
27 25         }
28 26       };
29 27     return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
30 28         SessionHiveMetaStoreClient.class.getName());
31 29   }

  二、HiveMetaStore服務端的建立及加載session

  在HiveMetaStoreClient初始化時,會初始化HiveMetaStore客戶端,代碼以下:app

 1   public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
 2     throws MetaException {
 3 
 4     this.hookLoader = hookLoader;
 5     if (conf == null) {
 6       conf = new HiveConf(HiveMetaStoreClient.class);
 7     }
 8     this.conf = conf;
 9     filterHook = loadFilterHooks();
10    //根據hive-site.xml中的hive.metastore.uris配置,若是配置該參數,則認爲是遠程鏈接,不然爲本地鏈接
11     String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
12     localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
13     if (localMetaStore) {
      //本地鏈接直接鏈接HiveMetaStore
16       client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
17       isConnected = true;
18       snapshotActiveConf();
19       return;
20     }
21 
22     //獲取配置中的重試次數及timeout時間
23     retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
24     retryDelaySeconds = conf.getTimeVar(
25         ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
26 
27     //拼接metastore uri
28     if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
29       String metastoreUrisString[] = conf.getVar(
30           HiveConf.ConfVars.METASTOREURIS).split(",");
31       metastoreUris = new URI[metastoreUrisString.length];
32       try {
33         int i = 0;
34         for (String s : metastoreUrisString) {
35           URI tmpUri = new URI(s);
36           if (tmpUri.getScheme() == null) {
37             throw new IllegalArgumentException("URI: " + s
38                 + " does not have a scheme");
39           }
40           metastoreUris[i++] = tmpUri;
41 
42         }
43       } catch (IllegalArgumentException e) {
44         throw (e);
45       } catch (Exception e) {
46         MetaStoreUtils.logAndThrowMetaException(e);
47       }
48     } else {
49       LOG.error("NOT getting uris from conf");
50       throw new MetaException("MetaStoreURIs not found in conf file");
51     }
52     調用open方法建立鏈接
53     open();
54   }

  從上面代碼中能夠看出,若是咱們是遠程鏈接,須要配置hive-site.xml中的hive.metastore.uri,是否是很熟悉?加入你的client與server不在同一臺機器,就須要配置進行遠程鏈接。那麼咱們繼續往下面看,建立鏈接的open方法:

  1   private void open() throws MetaException {
  2     isConnected = false;
  3     TTransportException tte = null;
     //是否使用Sasl
4 boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
     //If true, the metastore Thrift interface will use TFramedTransport. When false (default) a standard TTransport is used.
5 boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
     //If true, the metastore Thrift interface will use TCompactProtocol. When false (default) TBinaryProtocol will be used 具體他們之間的區別咱們後續再討論
6 boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
     //獲取socket timeout時間
7 int clientSocketTimeout = (int) conf.getTimeVar( 8 ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); 9 10 for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { 11 for (URI store : metastoreUris) { 12 LOG.info("Trying to connect to metastore with URI " + store); 13 try { 14 transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); 15 if (useSasl) { 16 // Wrap thrift connection with SASL for secure connection. 17 try {
           //建立HadoopThriftAuthBridge client
18 HadoopThriftAuthBridge.Client authBridge = 19 ShimLoader.getHadoopThriftAuthBridge().createClient(); 20          //權限認證相關 21 // check if we should use delegation tokens to authenticate 22 // the call below gets hold of the tokens if they are set up by hadoop 23 // this should happen on the map/reduce tasks if the client added the 24 // tokens into hadoop's credential store in the front end during job 25 // submission. 26 String tokenSig = conf.get("hive.metastore.token.signature"); 27 // tokenSig could be null 28 tokenStrForm = Utils.getTokenStrForm(tokenSig); 29 if(tokenStrForm != null) { 30 // authenticate using delegation tokens via the "DIGEST" mechanism 31 transport = authBridge.createClientTransport(null, store.getHost(), 32 "DIGEST", tokenStrForm, transport, 33 MetaStoreUtils.getMetaStoreSaslProperties(conf)); 34 } else { 35 String principalConfig = 36 conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); 37 transport = authBridge.createClientTransport( 38 principalConfig, store.getHost(), "KERBEROS", null, 39 transport, MetaStoreUtils.getMetaStoreSaslProperties(conf)); 40 } 41 } catch (IOException ioe) { 42 LOG.error("Couldn't create client transport", ioe); 43 throw new MetaException(ioe.toString()); 44 } 45 } else if (useFramedTransport) { 46 transport = new TFramedTransport(transport); 47 } 48 final TProtocol protocol;
         //後續詳細說明二者的區別(由於俺還沒看,哈哈)
49 if (useCompactProtocol) { 50 protocol = new TCompactProtocol(transport); 51 } else { 52 protocol = new TBinaryProtocol(transport); 53 }
         //建立ThriftHiveMetastore client
54 client = new ThriftHiveMetastore.Client(protocol); 55 try { 56 transport.open(); 57 isConnected = true; 58 } catch (TTransportException e) { 59 tte = e; 60 if (LOG.isDebugEnabled()) { 61 LOG.warn("Failed to connect to the MetaStore Server...", e); 62 } else { 63 // Don't print full exception trace if DEBUG is not on. 64 LOG.warn("Failed to connect to the MetaStore Server..."); 65 } 66 } 67       //用戶組及用戶的加載 68 if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){ 69 // Call set_ugi, only in unsecure mode. 70 try { 71 UserGroupInformation ugi = Utils.getUGI(); 72 client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames())); 73 } catch (LoginException e) { 74 LOG.warn("Failed to do login. set_ugi() is not successful, " + 75 "Continuing without it.", e); 76 } catch (IOException e) { 77 LOG.warn("Failed to find ugi of client set_ugi() is not successful, " + 78 "Continuing without it.", e); 79 } catch (TException e) { 80 LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. " 81 + "Continuing without it.", e); 82 } 83 } 84 } catch (MetaException e) { 85 LOG.error("Unable to connect to metastore with URI " + store 86 + " in attempt " + attempt, e); 87 } 88 if (isConnected) { 89 break; 90 } 91 } 92 // Wait before launching the next round of connection retries. 93 if (!isConnected && retryDelaySeconds > 0) { 94 try { 95 LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt."); 96 Thread.sleep(retryDelaySeconds * 1000); 97 } catch (InterruptedException ignore) {} 98 } 99 } 100 101 if (!isConnected) { 102 throw new MetaException("Could not connect to meta store using any of the URIs provided." + 103 " Most recent failure: " + StringUtils.stringifyException(tte)); 104 } 105 106 snapshotActiveConf(); 107 108 LOG.info("Connected to metastore."); 109 }

  本篇先對對protocol的原理放置一邊。從代碼中能夠看出HiveMetaStore服務端是經過ThriftHiveMetaStore建立,它本是一個class類,但其中定義了接口Iface、AsyncIface,這樣作的好處是利於繼承實現。那麼下來,咱們看一下HMSHandler的初始化。若是是在本地調用的過程當中,直接調用newRetryingHMSHandler,便會直接進行HMSHandler的初始化。代碼以下:

1     public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
2       super(name);
3       hiveConf = conf;
4       if (init) {
5         init();
6       }
7     }

  下倆咱們繼續看它的init方法:

 1     public void init() throws MetaException {
      //獲取與數據交互的實現類className,該類爲objectStore,是RawStore的實現,負責JDO與數據庫的交互。
2 rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
      //加載Listeners,來自hive.metastore.init.hooks,可自行實現並加載
3 initListeners = MetaStoreUtils.getMetaStoreListeners( 4 MetaStoreInitListener.class, hiveConf, 5 hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); 6 for (MetaStoreInitListener singleInitListener: initListeners) { 7 MetaStoreInitContext context = new MetaStoreInitContext(); 8 singleInitListener.onInit(context); 9 } 10     //初始化alter的實現類 11 String alterHandlerName = hiveConf.get("hive.metastore.alter.impl", 12 HiveAlterHandler.class.getName()); 13 alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass( 14 alterHandlerName), hiveConf);
      //初始化warehouse
15 wh = new Warehouse(hiveConf); 16     //建立默認db以及用戶,同時加載currentUrl 17 synchronized (HMSHandler.class) { 18 if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) { 19 createDefaultDB(); 20 createDefaultRoles(); 21 addAdminUsers(); 22 currentUrl = MetaStoreInit.getConnectionURL(hiveConf); 23 } 24 } 25     //計數信息的初始化 26 if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) { 27 try { 28 Metrics.init(); 29 } catch (Exception e) { 30 // log exception, but ignore inability to start 31 LOG.error("error in Metrics init: " + e.getClass().getName() + " " 32 + e.getMessage(), e); 33 } 34 } 35     //Listener的PreListener的初始化 36 preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class, 37 hiveConf, 38 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS)); 39 listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf, 40 hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); 41 listeners.add(new SessionPropertiesListener(hiveConf)); 42 endFunctionListeners = MetaStoreUtils.getMetaStoreListeners( 43 MetaStoreEndFunctionListener.class, hiveConf, 44 hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS)); 45     //針對partitionName的正則校驗,可自行設置,根據hive.metastore.partition.name.whitelist.pattern進行設置 46 String partitionValidationRegex = 47 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN); 48 if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { 49 partitionValidationPattern = Pattern.compile(partitionValidationRegex); 50 } else { 51 partitionValidationPattern = null; 52 } 53 54 long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS); 55 if (cleanFreq > 0) { 56 // In default config, there is no timer. 57 Timer cleaner = new Timer("Metastore Events Cleaner Thread", true); 58 cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq); 59 } 60 }

  它初始化了與數據庫交互的rawStore的實現類、物理操做的warehouse以及Event與Listener。從而經過接口調用相關meta生命週期方法進行表的操做。

  三、createTable

 從createTable方法開始。上代碼:

 1   public void createTable(String tableName, List<String> columns, List<String> partCols,
 2                           Class<? extends InputFormat> fileInputFormat,
 3                           Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
 4                           Map<String, String> parameters) throws HiveException {
 5     if (columns == null) {
 6       throw new HiveException("columns not specified for table " + tableName);
 7     }
 8 
 9     Table tbl = newTable(tableName);
    //SD表屬性,設置該表的input及output class名,在計算引擎計算時,拉取相應的ClassName 經過反射進行input及output類的加載
10 tbl.setInputFormatClass(fileInputFormat.getName()); 11 tbl.setOutputFormatClass(fileOutputFormat.getName()); 12   
    //封裝FileSchema對象,該爲每一個column的名稱及字段類型,並加入到sd對象的的column屬性中 13 for (String col : columns) { 14 FieldSchema field = new FieldSchema(col, STRING_TYPE_NAME, "default"); 15 tbl.getCols().add(field); 16 } 17
    //若是在建立表時,設置了分區信息,好比dt字段爲該分區。則進行分區信息的記錄,最終寫入Partition表中 18 if (partCols != null) { 19 for (String partCol : partCols) { 20 FieldSchema part = new FieldSchema(); 21 part.setName(partCol); 22 part.setType(STRING_TYPE_NAME); // default partition key 23 tbl.getPartCols().add(part); 24 } 25 }
    //設置序列化的方式
26 tbl.setSerializationLib(LazySimpleSerDe.class.getName());
    //設置分桶信息
27 tbl.setNumBuckets(bucketCount); 28 tbl.setBucketCols(bucketCols);
    //設置table額外添加的kv信息
29 if (parameters != null) { 30 tbl.setParamters(parameters); 31 } 32 createTable(tbl); 33 }

  從代碼中能夠看到,Hive 構造了一個Table的對象,該對象能夠當作是一個model,包含了幾乎全部以Tbls表爲主表的全部以table_id爲的外鍵表屬性(具體可參考hive metastore表結構),封裝完畢後在進行createTable的調用,接下來的調用以下:

  public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
    try {
    //這裏再次獲取SessionState中的CurrentDataBase進行setDbName(安全)
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) { tbl.setDbName(SessionState.get().getCurrentDatabase()); }
    //這裏主要對每個column屬性進行校驗,好比是否有非法字符等等
if (tbl.getCols().size() == 0 || tbl.getSd().getColsSize() == 0) { tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(), tbl.getDeserializer())); }
    //該方法對table屬性中的input、output以及column屬性的校驗 tbl.checkValidity();
if (tbl.getParameters() != null) { tbl.getParameters().remove(hive_metastoreConstants.DDL_TIME); } org.apache.hadoop.hive.metastore.api.Table tTbl = tbl.getTTable();
    //這裏開始進行權限認證,牽扯到的即是咱們再hive中配置的 hive.security.authorization.createtable.user.grants、hive.security.authorization.createtable.group.grants、
    hive.security.authorization.createtable.role.grants配置參數,來自於hive本身封裝的 用戶、角色、組的概念。
PrincipalPrivilegeSet principalPrivs
= new PrincipalPrivilegeSet(); SessionState ss = SessionState.get(); if (ss != null) { CreateTableAutomaticGrant grants = ss.getCreateTableGrants(); if (grants != null) { principalPrivs.setUserPrivileges(grants.getUserGrants()); principalPrivs.setGroupPrivileges(grants.getGroupGrants()); principalPrivs.setRolePrivileges(grants.getRoleGrants()); tTbl.setPrivileges(principalPrivs); } }
   //經過客戶端連接服務端進行table的建立 getMSC().createTable(tTbl); }
catch (AlreadyExistsException e) { if (!ifNotExists) { throw new HiveException(e); } } catch (Exception e) { throw new HiveException(e); } }

  那麼下來,咱們來看一下受到調用的HiveMetaClient中createTable方法,代碼以下:

 1   public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
 2       InvalidObjectException, MetaException, NoSuchObjectException, TException {
    //這裏獲取HiveMeetaHook對象,針對不一樣的存儲引擎進行建立前的加載及驗證
3 HiveMetaHook hook = getHook(tbl); 4 if (hook != null) { 5 hook.preCreateTable(tbl); 6 } 7 boolean success = false; 8 try {      //隨即調用HiveMetaStore進行服務端與數據庫的建立交互 10 create_table_with_environment_context(tbl, envContext); 11 if (hook != null) { 12 hook.commitCreateTable(tbl); 13 } 14 success = true; 15 } finally {
      若是建立失敗的話,進行回滾操做
16 if (!success && (hook != null)) { 17 hook.rollbackCreateTable(tbl); 18 } 19 } 20 }

  這裏簡要說下Hook的做用,HiveMetaHook爲接口,接口方法包括preCreate、rollbackCreateTable、preDropTable等等操做,它的實現爲不一樣存儲類型的預建立加載及驗證,以及失敗回滾等動做。代碼以下:

 1 public interface HiveMetaHook {
 2   /**
 3    * Called before a new table definition is added to the metastore
 4    * during CREATE TABLE.
 5    *
 6    * @param table new table definition
 7    */
 8   public void preCreateTable(Table table)
 9     throws MetaException;
10 
11   /** 
12    * Called after failure adding a new table definition to the metastore
13    * during CREATE TABLE.
14    *
15    * @param table new table definition
16    */
17   public void rollbackCreateTable(Table table)
18     throws MetaException;
35   public void preDropTale(Table table)
36     throws MetaException;
...............................

  隨後,咱們再看一下HiveMetaStore服務端的createTable方法,以下:

 1  private void create_table_core(final RawStore ms, final Table tbl,
2
final EnvironmentContext envContext) 3 throws AlreadyExistsException, MetaException, 4 InvalidObjectException, NoSuchObjectException { 5     //名稱正則校驗,校驗是否含有非法字符 6 if (!MetaStoreUtils.validateName(tbl.getTableName())) { 7 throw new InvalidObjectException(tbl.getTableName() 8 + " is not a valid object name"); 9 }
      //改端代碼屬於校驗代碼,對於column的名稱及column type類型j及partitionKey的名稱校驗
10 String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols()); 11 if (validate != null) { 12 throw new InvalidObjectException("Invalid column " + validate); 13 }
14 if (tbl.getPartitionKeys() != null) { 15 validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys()); 16 if (validate != null) { 17 throw new InvalidObjectException("Invalid partition column " + validate); 18 } 19 } 20 SkewedInfo skew = tbl.getSd().getSkewedInfo(); 21 if (skew != null) { 22 validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames()); 23 if (validate != null) { 24 throw new InvalidObjectException("Invalid skew column " + validate); 25 } 26 validate = MetaStoreUtils.validateSkewedColNamesSubsetCol( 27 skew.getSkewedColNames(), tbl.getSd().getCols()); 28 if (validate != null) { 29 throw new InvalidObjectException("Invalid skew column " + validate); 30 } 31 } 32 33 Path tblPath = null; 34 boolean success = false, madeDir = false; 35 try {
       //建立前的事件調用,metastore已實現的listner事件包含DummyPreListener、AuthorizationPreEventListener、AlternateFailurePreListener以及MetaDataExportListener。
       //這些Listener是幹嗎的呢?詳細解釋由分析meta設計模式時,詳細說明。
36 firePreEvent(new PreCreateTableEvent(tbl, this)); 37
       //打開事務 38 ms.openTransaction(); 39
       //若是db不存在的狀況下,則拋異常 40 Database db = ms.getDatabase(tbl.getDbName()); 41 if (db == null) { 42 throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist"); 43 } 44      45  // 校驗該db下,table是否存在 46 if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) { 47 throw new AlreadyExistsException("Table " + tbl.getTableName() 48 + " already exists"); 49 } 50      // 若是該表不爲視圖表,則組裝完整的tbleParth ->
fs.getUri().getScheme()+fs.getUri().getAuthority()+path.toUri().getPath())
 51         if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {  52           if (tbl.getSd().getLocation() == null
 53               || tbl.getSd().getLocation().isEmpty()) {  54             tblPath = wh.getTablePath(  55  ms.getDatabase(tbl.getDbName()), tbl.getTableName());  56           } else {
          //若是該表不是內部表同時tbl的kv中storage_handler爲空時,則只是警告
57 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { 58 LOG.warn("Location: " + tbl.getSd().getLocation() 59 + " specified for non-external table:" + tbl.getTableName()); 60 } 61 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); 62 }
        //將拼接完的tblPath set到sd的location中
63 tbl.getSd().setLocation(tblPath.toString()); 64 } 65      //建立table的路徑 66 if (tblPath != null) { 67 if (!wh.isDir(tblPath)) { 68 if (!wh.mkdirs(tblPath, true)) { 69 throw new MetaException(tblPath 70 + " is not a directory or unable to create one"); 71 } 72 madeDir = true; 73 } 74 }
       // hive.stats.autogather 配置判斷
75 if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) && 76 !MetaStoreUtils.isView(tbl)) { 77 if (tbl.getPartitionKeysSize() == 0) { // Unpartitioned table 78 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir); 79 } else { // Partitioned table with no partitions. 80 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true); 81 } 82 } 83 84 // set create time 85 long time = System.currentTimeMillis() / 1000; 86 tbl.setCreateTime((int) time); 87 if (tbl.getParameters() == null || 88 tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { 89 tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); 90 }
       執行createTable數據庫操做
91 ms.createTable(tbl); 92 success = ms.commitTransaction(); 93 94 } finally { 95 if (!success) { 96 ms.rollbackTransaction();
         //若是因爲某些緣由沒有建立,則進行已建立表路徑的刪除
97 if (madeDir) { 98 wh.deleteDir(tblPath, true); 99 } 100 }
       //進行create完成時的listener類發送 好比 noftify通知
101 for (MetaStoreEventListener listener : listeners) { 102 CreateTableEvent createTableEvent = 103 new CreateTableEvent(tbl, success, this); 104 createTableEvent.setEnvironmentContext(envContext); 105 listener.onCreateTable(createTableEvent); 106 } 107 } 108 }

  這裏的listener後續會詳細說明,那麼咱們繼續垂直往下看,這裏的 ms.createTable方法。ms即是RawStore接口對象,這個接口對象包含了全部生命週期的統一方法調用,部分代碼以下:

 1   public abstract Database getDatabase(String name)
 2       throws NoSuchObjectException;
 3 
 4   public abstract boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException;
 5 
 6   public abstract boolean alterDatabase(String dbname, Database db) throws NoSuchObjectException, MetaException;
 7 
 8   public abstract List<String> getDatabases(String pattern) throws MetaException;
 9 
10   public abstract List<String> getAllDatabases() throws MetaException;
11 
12   public abstract boolean createType(Type type);
13 
14   public abstract Type getType(String typeName);
15 
16   public abstract boolean dropType(String typeName);
17 
18   public abstract void createTable(Table tbl) throws InvalidObjectException,
19       MetaException;
20 
21   public abstract boolean dropTable(String dbName, String tableName)
22       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException;
23 
24   public abstract Table getTable(String dbName, String tableName)
25       throws MetaException;
26   ..................

  那麼下來咱們來看一下具體怎麼實現的,首先hive metastore會經過調用getMS()方法,獲取本地線程中的RawStore的實現,代碼以下:

 1     public RawStore getMS() throws MetaException {
     //獲取本地線程中已存在的RawStore
2 RawStore ms = threadLocalMS.get();
     //若是不存在,則建立該對象的實現,並加入到本地線程中
3 if (ms == null) { 4 ms = newRawStore(); 5 ms.verifySchema(); 6 threadLocalMS.set(ms); 7 ms = threadLocalMS.get(); 8 } 9 return ms; 10 }

  看到這裏,是否是很想看看newRawStore它幹嗎啦?那麼咱們繼續:

 1   public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName,
 2       int id) throws MetaException {
 3   //經過反射,建立baseClass,隨後再進行該實現對象的建立
 4     Class<? extends RawStore> baseClass = (Class<? extends RawStore>) MetaStoreUtils.getClass(
 5         rawStoreClassName);
 6 
 7     RawStoreProxy handler = new RawStoreProxy(hiveConf, conf, baseClass, id);
 8 
 9     // Look for interfaces on both the class and all base classes.
10     return (RawStore) Proxy.newProxyInstance(RawStoreProxy.class.getClassLoader(),
11         getAllInterfaces(baseClass), handler);
12   }

  那麼問題來了,rawstoreClassName從哪裏來呢?它是在HiveMetaStore進行初始化時加載的,來源於HiveConf中的METASTORE_RAW_STORE_IMPL,配置參數,也就是RawStore的實現類ObjectStore。好了,既然RawStore的實現類已經建立,那麼咱們繼續深刻ObjectStore,代碼以下:

  

 1   @Override
 2   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
 3     boolean commited = false;
 4     try {
     //建立事務
5 openTransaction();
      //這裏再次進行db 、table的校驗,代碼再也不貼出來,具體爲何又要作一次校驗,還須要深刻思考
6 MTable mtbl = convertToMTable(tbl);
     這裏的pm爲ObjectStore建立時,init的JDO PersistenceManage對象。這裏即是提交Table對象的地方,具體可研究下JDO module對象與數據庫的交互
7 pm.makePersistent(mtbl);
     //封裝權限用戶、角色、組對象並寫入
8 PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges(); 9 List<Object> toPersistPrivObjs = new ArrayList<Object>(); 10 if (principalPrivs != null) { 11 int now = (int)(System.currentTimeMillis()/1000); 12 13 Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges(); 14 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER); 15 16 Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges(); 17 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP); 18 19 Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges(); 20 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE); 21 } 22 pm.makePersistentAll(toPersistPrivObjs); 23 commited = commitTransaction(); 24 } finally {
      //若是失敗則回滾
25 if (!commited) { 26 rollbackTransaction(); 27 } 28 } 29 }

  四、dropTable

   二話不說上從Hive類中上代碼:

1   public void dropTable(String tableName, boolean ifPurge) throws HiveException {
    //這裏Hive 將dbName與TableName合併成一個數組
2 String[] names = Utilities.getDbTableName(tableName); 3 dropTable(names[0], names[1], true, true, ifPurge); 4 }

  爲何要進行這樣的處理呢,實際上是由於 drop table的時候 咱們的sql語句會是drop table dbName.tableName 或者是drop table tableName,這裏進行tableName和DbName的組裝,若是爲drop table tableName,則獲取當前session中的dbName,代碼以下:

 1   public static String[] getDbTableName(String dbtable) throws SemanticException {
    //獲取當前Session中的DbName
2 return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable); 3 } 4 5 public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException { 6 if (dbtable == null) { 7 return new String[2]; 8 } 9 String[] names = dbtable.split("\\."); 10 switch (names.length) { 11 case 2: 12 return names;
     //若是長度爲1,則從新組裝
13 case 1: 14 return new String [] {defaultDb, dbtable}; 15 default: 16 throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable); 17 } 18 }

  隨後經過getMSC()調用HiveMetaStoreClient中的dropTable,代碼以下:

 1   public void dropTable(String dbname, String name, boolean deleteData,
 2       boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
 3       NoSuchObjectException, UnsupportedOperationException {
 4     Table tbl;
 5     try {
     //經過dbName與tableName獲取正個Table對象,也就是經過dbName與TableName獲取該Table存儲的全部元數據
6 tbl = getTable(dbname, name); 7 } catch (NoSuchObjectException e) { 8 if (!ignoreUnknownTab) { 9 throw e; 10 } 11 return; 12 }
    //根據table type來判斷是否爲IndexTable,若是爲索引表則不容許刪除  
13 if (isIndexTable(tbl)) { 14 throw new UnsupportedOperationException("Cannot drop index tables"); 15 }
    //這裏的getHook 與create時getHook一致,獲取對應table存儲的hook
16 HiveMetaHook hook = getHook(tbl); 17 if (hook != null) { 18 hook.preDropTable(tbl); 19 } 20 boolean success = false; 21 try {
      調用HiveMetaStore服務端的dropTable方法
22 drop_table_with_environment_context(dbname, name, deleteData, envContext); 23 if (hook != null) { 24 hook.commitDropTable(tbl, deleteData); 25 } 26 success=true; 27 } catch (NoSuchObjectException e) { 28 if (!ignoreUnknownTab) { 29 throw e; 30 } 31 } finally { 32 if (!success && (hook != null)) { 33 hook.rollbackDropTable(tbl); 34 } 35 } 36 }

  下面咱們重點看下服務端HiveMetaStore幹了些什麼,代碼以下:

 1    private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
 2         final boolean deleteData, final EnvironmentContext envContext,
 3         final String indexName) throws NoSuchObjectException,
 4         MetaException, IOException, InvalidObjectException, InvalidInputException {
 5       boolean success = false;
 6       boolean isExternal = false;
 7       Path tblPath = null;
 8       List<Path> partPaths = null;
 9       Table tbl = null;
10       boolean ifPurge = false;
11       try {
12         ms.openTransaction();
13         // 獲取正個Table的對象屬性
14         tbl = get_table_core(dbname, name);
15         if (tbl == null) {
16           throw new NoSuchObjectException(name + " doesn't exist");
17         }
       //若是sd數據爲空,則認爲該表數據損壞
18 if (tbl.getSd() == null) { 19 throw new MetaException("Table metadata is corrupted"); 20 } 21 ifPurge = isMustPurge(envContext, tbl); 22 23 firePreEvent(new PreDropTableEvent(tbl, deleteData, this));        //判斷若是該表存在索引,則須要先刪除該表的索引 25 boolean isIndexTable = isIndexTable(tbl); 26 if (indexName == null && isIndexTable) { 27 throw new RuntimeException( 28 "The table " + name + " is an index table. Please do drop index instead."); 29 }        //若是不是索引表,則刪除索引元數據 31 if (!isIndexTable) { 32 try { 33 List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE); 34 while (indexes != null && indexes.size() > 0) { 35 for (Index idx : indexes) { 36 this.drop_index_by_name(dbname, name, idx.getIndexName(), true); 37 } 38 indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE); 39 } 40 } catch (TException e) { 41 throw new MetaException(e.getMessage()); 42 } 43 }
       //判斷是否爲外部表
44 isExternal = isExternal(tbl); 45 if (tbl.getSd().getLocation() != null) { 46 tblPath = new Path(tbl.getSd().getLocation()); 47 if (!wh.isWritable(tblPath.getParent())) { 48 String target = indexName == null ? "Table" : "Index table"; 49 throw new MetaException(target + " metadata not deleted since " + 50 tblPath.getParent() + " is not writable by " + 51 hiveConf.getUser()); 52 } 53 } 54 56 checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge); 57 //獲取全部partition的location path 這裏有個奇怪的地方,爲何不將Table對象直接傳入,而是又在該方法中從新getTable,同時校驗上級目錄的讀寫權限 58 partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath, 59 tbl.getPartitionKeys(), deleteData && !isExternal); 60      //調用ObjectStore進行meta數據的刪除 61 if (!ms.dropTable(dbname, name)) { 62 String tableName = dbname + "." + name; 63 throw new MetaException(indexName == null ? "Unable to drop table " + tableName: 64 "Unable to drop index table " + tableName + " for index " + indexName); 65 } 66 success = ms.commitTransaction(); 67 } finally { 68 if (!success) { 69 ms.rollbackTransaction(); 70 } else if (deleteData && !isExternal) {         //刪除物理partition 73 deletePartitionData(partPaths, ifPurge); 74 //刪除Table路徑 75 deleteTableData(tblPath, ifPurge); 76 // ok even if the data is not deleted 77
       //Listener 處理
78 for (MetaStoreEventListener listener : listeners) { 79 DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this); 80 dropTableEvent.setEnvironmentContext(envContext); 81 listener.onDropTable(dropTableEvent); 82 } 83 } 84 return success; 85 }

   咱們繼續深刻ObjectStore中的dropTable,會發現 再一次經過dbName與tableName獲取整個Table對象,隨後逐一刪除。也許代碼並非同一我的寫的也多是因爲安全性考慮?不少能夠經過接口傳入的Table對象,都從新獲取了,這樣會不會加劇數據庫的負擔呢?ObjectStore代碼以下:

 1   public boolean dropTable(String dbName, String tableName) throws MetaException,
 2     NoSuchObjectException, InvalidObjectException, InvalidInputException {
 3     boolean success = false;
 4     try {
 5       openTransaction();
      //從新獲取Table對象
6 MTable tbl = getMTable(dbName, tableName); 7 pm.retrieve(tbl); 8 if (tbl != null) { 9 //下列代碼查詢並刪除全部的權限 10 List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName); 11 if (tabGrants != null && tabGrants.size() > 0) { 12 pm.deletePersistentAll(tabGrants); 13 }
      
14 List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(dbName, 15 tableName); 16 if (tblColGrants != null && tblColGrants.size() > 0) { 17 pm.deletePersistentAll(tblColGrants); 18 } 19 20 List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(dbName, tableName); 21 if (partGrants != null && partGrants.size() > 0) { 22 pm.deletePersistentAll(partGrants); 23 } 24 25 List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName, 26 tableName); 27 if (partColGrants != null && partColGrants.size() > 0) { 28 pm.deletePersistentAll(partColGrants); 29 } 30 // delete column statistics if present 31 try {
        //刪除column統計表數據
32 deleteTableColumnStatistics(dbName, tableName, null); 33 } catch (NoSuchObjectException e) { 34 LOG.info("Found no table level column statistics associated with db " + dbName + 35 " table " + tableName + " record to delete"); 36 } 37      //刪除mcd表數據 38 preDropStorageDescriptor(tbl.getSd()); 39 //刪除整個Table對象相關表數據 40 pm.deletePersistentAll(tbl); 41 } 42 success = commitTransaction(); 43 } finally { 44 if (!success) { 45 rollbackTransaction(); 46 } 47 } 48 return success; 49 }

  五、AlterTable

  下來咱們看下AlterTable,AlterTable包含的邏輯較多,由於牽扯到物理存儲上的路徑修改等,那麼咱們來一點點查看。仍是從Hive類中開始,上代碼:

 1   public void alterTable(String tblName, Table newTbl, boolean cascade)
 2       throws InvalidOperationException, HiveException {
 3     String[] names = Utilities.getDbTableName(tblName);
 4     try {
 5       //刪除table kv中的DDL_TIME 由於要alterTable因此,該事件會被改變
 6       if (newTbl.getParameters() != null) {
 7         newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
 8       }
      //進行相關校驗,包含dbName、tableName、column、inputOutClass、outputClass的校驗等,若是校驗不經過則拋出HiveException
9 newTbl.checkValidity();
      //調用alterTable
10 getMSC().alter_table(names[0], names[1], newTbl.getTTable(), cascade); 11 } catch (MetaException e) { 12 throw new HiveException("Unable to alter table. " + e.getMessage(), e); 13 } catch (TException e) { 14 throw new HiveException("Unable to alter table. " + e.getMessage(), e); 15 } 16 }

  對於HiveMetaClient,並無作相應處理,因此咱們直接來看HiveMetaStore服務端作了些什麼呢?

 1     private void alter_table_core(final String dbname, final String name, final Table newTable,
 2         final EnvironmentContext envContext, final boolean cascade)
 3         throws InvalidOperationException, MetaException {
 4       startFunction("alter_table", ": db=" + dbname + " tbl=" + name
 5           + " newtbl=" + newTable.getTableName());
 6 
 7       //更新DDL_Time
 8       if (newTable.getParameters() == null ||
 9           newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
10         newTable.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
11             .currentTimeMillis() / 1000));
12       }
13       boolean success = false;
14       Exception ex = null;
15       try {
       //獲取已有Table的整個對象
16 Table oldt = get_table_core(dbname, name);
       //進行Event處理
17 firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
       //進行alterTable處理,後面詳細說明
18 alterHandler.alterTable(getMS(), wh, dbname, name, newTable, cascade); 19 success = true; 20     
       //進行Listener處理 21 for (MetaStoreEventListener listener : listeners) { 22 23 AlterTableEvent alterTableEvent = 24 new AlterTableEvent(oldt, newTable, success, this); 25 alterTableEvent.setEnvironmentContext(envContext); 26 listener.onAlterTable(alterTableEvent); 27 } 28 } catch (NoSuchObjectException e) { 29 // thrown when the table to be altered does not exist 30 ex = e; 31 throw new InvalidOperationException(e.getMessage()); 32 } catch (Exception e) { 33 ex = e; 34 if (e instanceof MetaException) { 35 throw (MetaException) e; 36 } else if (e instanceof InvalidOperationException) { 37 throw (InvalidOperationException) e; 38 } else { 39 throw newMetaException(e); 40 } 41 } finally { 42 endFunction("alter_table", success, ex, name); 43 } 44 }

  那麼,咱們重點看下alterHandler具體所作的事情,在這以前簡要說下alterHandler的初始化,它是在HiveMetaStore init時獲取的hive.metastore.alter.impl參數的className,也就是HiveAlterHandler的name,那麼具體,咱們來看下它alterTable時的實現,前方高能,當心火燭:)

  1   public void alterTable(RawStore msdb, Warehouse wh, String dbname,
  2       String name, Table newt, boolean cascade) throws InvalidOperationException, MetaException {
  3     if (newt == null) {
  4       throw new InvalidOperationException("New table is invalid: " + newt);
  5     }
  6    //校驗新的tableName是否合法
  7     if (!MetaStoreUtils.validateName(newt.getTableName())) {
  8       throw new InvalidOperationException(newt.getTableName()
  9           + " is not a valid object name");
 10     }
     //校驗新的column Name type是否合法
11 String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols()); 12 if (validate != null) { 13 throw new InvalidOperationException("Invalid column " + validate); 14 } 15 16 Path srcPath = null; 17 FileSystem srcFs = null; 18 Path destPath = null; 19 FileSystem destFs = null; 20 21 boolean success = false; 22 boolean moveData = false; 23 boolean rename = false; 24 Table oldt = null; 25 List<ObjectPair<Partition, String>> altps = new ArrayList<ObjectPair<Partition, String>>(); 26 27 try { 28 msdb.openTransaction();
      //這裏直接轉換小寫,能夠看出 代碼不是一我的寫的
29 name = name.toLowerCase(); 30 dbname = dbname.toLowerCase(); 31 32 //校驗新的tableName是否存在 33 if (!newt.getTableName().equalsIgnoreCase(name) 34 || !newt.getDbName().equalsIgnoreCase(dbname)) { 35 if (msdb.getTable(newt.getDbName(), newt.getTableName()) != null) { 36 throw new InvalidOperationException("new table " + newt.getDbName() 37 + "." + newt.getTableName() + " already exists"); 38 } 39 rename = true; 40 } 41 42 //獲取老的table對象 43 oldt = msdb.getTable(dbname, name); 44 if (oldt == null) { 45 throw new InvalidOperationException("table " + newt.getDbName() + "." 46 + newt.getTableName() + " doesn't exist"); 47 } 48     //alter Table時 獲取 METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES配置項,若是爲true的話,將改變column的type類型,這裏爲false 49 if (HiveConf.getBoolVar(hiveConf, 50 HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, 51 false)) { 52 // Throws InvalidOperationException if the new column types are not 53 // compatible with the current column types. 54 MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange( 55 oldt.getSd().getCols(), newt.getSd().getCols()); 56 } 57     //cascade參數由調用Hive altertable方法穿過來的,也就是引擎調用時參數的設置,這裏用來查看是否須要alterPartition信息 58 if (cascade) { 59 //校驗新的column是否與老的column一致,如不一致,說明進行了column的添加或刪除操做 60 if(MetaStoreUtils.isCascadeNeededInAlterTable(oldt, newt)) {
        //根據dbName與tableName獲取整個partition的信息
61 List<Partition> parts = msdb.getPartitions(dbname, name, -1); 62 for (Partition part : parts) { 63 List<FieldSchema> oldCols = part.getSd().getCols(); 64 part.getSd().setCols(newt.getSd().getCols()); 65 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
          //若是columns不一致,則刪除已有的column統計信息
66 updatePartColumnStatsForAlterColumns(msdb, part, oldPartName, part.getValues(), oldCols, part);
          //更新整個Partition的信息
67 msdb.alterPartition(dbname, name, part.getValues(), part); 68 } 69 } else { 70 LOG.warn("Alter table does not cascade changes to its partitions."); 71 } 72 } 73 74 //判斷parititonkey是否改變,也就是dt 或 hour等partName是否改變 76 boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(), 77 newt.getPartitionKeys()); 78     
      //若是已有表爲視圖表,同時發現老的partkey與新的partKey不一致,則報錯 79 if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){ 80 if (oldt.getPartitionKeys().size() != newt.getPartitionKeys().size() 81 || !partKeysPartiallyEqual) { 82 throw new InvalidOperationException( 83 "partition keys can not be changed."); 84 } 85 } 86       //若是該表不爲視圖表,同時,該表的location信息並未發生變化,同時新的location信息並不爲空,而且已有的該表不爲外部表,說明用戶是想要移動數據到新的location地址,那麼該操做
       // 爲alter table rename操做
91 if (rename 92 && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) 93 && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0 94 || StringUtils.isEmpty(newt.getSd().getLocation())) 95 && !MetaStoreUtils.isExternalTable(oldt)) { 96      //獲取新的location信息 97 srcPath = new Path(oldt.getSd().getLocation()); 98 srcFs = wh.getFs(srcPath); 99 100 // that means user is asking metastore to move data to new location 101 // corresponding to the new name 102 // get new location 103 Database db = msdb.getDatabase(newt.getDbName()); 104 Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath); 105 destPath = new Path(databasePath, newt.getTableName()); 106 destFs = wh.getFs(destPath); 107      //設置新的table location信息 用於後續更新動做 108 newt.getSd().setLocation(destPath.toString()); 109 moveData = true; 110        //校驗物理目標地址是否存在,若是存在則會override全部數據,是不容許的。 114 if (!FileUtils.equalsFileSystem(srcFs, destFs)) { 115 throw new InvalidOperationException("table new location " + destPath 116 + " is on a different file system than the old location " 117 + srcPath + ". This operation is not supported"); 118 } 119 try { 120 srcFs.exists(srcPath); // check that src exists and also checks 121 // permissions necessary 122 if (destFs.exists(destPath)) { 123 throw new InvalidOperationException("New location for this table " 124 + newt.getDbName() + "." + newt.getTableName() 125 + " already exists : " + destPath); 126 } 127 } catch (IOException e) { 128 throw new InvalidOperationException("Unable to access new location " 129 + destPath + " for table " + newt.getDbName() + "." 130 + newt.getTableName()); 131 } 132 String oldTblLocPath = srcPath.toUri().getPath(); 133 String newTblLocPath = destPath.toUri().getPath(); 134      135 //獲取old table中的全部partition信息 136 List<Partition> parts = msdb.getPartitions(dbname, name, -1); 137 for (Partition part : parts) { 138 String oldPartLoc = part.getSd().getLocation();
        //這裏,便開始新老partition地址的變換,修改partition元數據信息
139 if (oldPartLoc.contains(oldTblLocPath)) { 140 URI oldUri = new Path(oldPartLoc).toUri(); 141 String newPath = oldUri.getPath().replace(oldTblLocPath, newTblLocPath); 142 Path newPartLocPath = new Path(oldUri.getScheme(), oldUri.getAuthority(), newPath); 143 altps.add(ObjectPair.create(part, part.getSd().getLocation())); 144 part.getSd().setLocation(newPartLocPath.toString()); 145 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues()); 146 try { 147 //existing partition column stats is no longer valid, remove them 148 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, part.getValues(), null); 149 } catch (InvalidInputException iie) { 150 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 151 } 152 msdb.alterPartition(dbname, name, part.getValues(), part); 153 } 154 }
       //更新stats相關信息
155 } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) && 156 (newt.getPartitionKeysSize() == 0)) { 157 Database db = msdb.getDatabase(newt.getDbName()); 158 // Update table stats. For partitioned table, we update stats in 159 // alterPartition() 160 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true); 161 } 162 updateTableColumnStatsForAlterTable(msdb, oldt, newt); 163 // now finally call alter table 164 msdb.alterTable(dbname, name, newt); 165 // commit the changes 166 success = msdb.commitTransaction(); 167 } catch (InvalidObjectException e) { 168 LOG.debug(e); 169 throw new InvalidOperationException( 170 "Unable to change partition or table." 171 + " Check metastore logs for detailed stack." + e.getMessage()); 172 } catch (NoSuchObjectException e) { 173 LOG.debug(e); 174 throw new InvalidOperationException( 175 "Unable to change partition or table. Database " + dbname + " does not exist" 176 + " Check metastore logs for detailed stack." + e.getMessage()); 177 } finally { 178 if (!success) { 179 msdb.rollbackTransaction(); 180 } 181 if (success && moveData) {        //開始更新hdfs路徑,進行老路徑的rename到新路徑 ,調用fileSystem的rename操做 185 try { 186 if (srcFs.exists(srcPath) && !srcFs.rename(srcPath, destPath)) { 187 throw new IOException("Renaming " + srcPath + " to " + destPath + " failed"); 188 } 189 } catch (IOException e) { 190 LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e); 191 boolean revertMetaDataTransaction = false; 192 try { 193 msdb.openTransaction();
         //這裏會發現,又一次進行了alterTable元數據動做,或許跟JDO的特性有關?仍是由於安全?
194 msdb.alterTable(newt.getDbName(), newt.getTableName(), oldt); 195 for (ObjectPair<Partition, String> pair : altps) { 196 Partition part = pair.getFirst(); 197 part.getSd().setLocation(pair.getSecond()); 198 msdb.alterPartition(newt.getDbName(), name, part.getValues(), part); 199 } 200 revertMetaDataTransaction = msdb.commitTransaction(); 201 } catch (Exception e1) { 202 // we should log this for manual rollback by administrator 203 LOG.error("Reverting metadata by HDFS operation failure failed During HDFS operation failed", e1); 204 LOG.error("Table " + Warehouse.getQualifiedName(newt) + 205 " should be renamed to " + Warehouse.getQualifiedName(oldt)); 206 LOG.error("Table " + Warehouse.getQualifiedName(newt) + 207 " should have path " + srcPath); 208 for (ObjectPair<Partition, String> pair : altps) { 209 LOG.error("Partition " + Warehouse.getQualifiedName(pair.getFirst()) + 210 " should have path " + pair.getSecond()); 211 } 212 if (!revertMetaDataTransaction) { 213 msdb.rollbackTransaction(); 214 } 215 } 216 throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name + 217 " failed to move data due to: '" + getSimpleMessage(e) + "' See hive log file for details."); 218 } 219 } 220 } 221 if (!success) { 222 throw new MetaException("Committing the alter table transaction was not successful."); 223 } 224 }

  六、createPartition
  在分區數據寫入以前,會先進行partition的元數據註冊及物理文件路徑的建立(內部表),Hive類代碼以下:

1   public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
2     try {
    //new出來一個Partition對象,傳入Table對象,調用Partition的構造方法來initialize Partition的信息
3 return new Partition(tbl, getMSC().add_partition( 4 Partition.createMetaPartitionObject(tbl, partSpec, null))); 5 } catch (Exception e) { 6 LOG.error(StringUtils.stringifyException(e)); 7 throw new HiveException(e); 8 } 9 }

  這裏的createMetaPartitionObject做用在於整個Partition傳入對象的校驗對對象的封裝,代碼以下:

 1   public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject(
 2       Table tbl, Map<String, String> partSpec, Path location) throws HiveException {
 3     List<String> pvals = new ArrayList<String>();
    //遍歷整個PartCols,而且校驗partMap中是否一一對應
4 for (FieldSchema field : tbl.getPartCols()) { 5 String val = partSpec.get(field.getName()); 6 if (val == null || val.isEmpty()) { 7 throw new HiveException("partition spec is invalid; field " 8 + field.getName() + " does not exist or is empty"); 9 } 10 pvals.add(val); 11 } 12   //set相關的屬性信息,包括DbName、TableName、PartValues、以及sd信息 13 org.apache.hadoop.hive.metastore.api.Partition tpart = 14 new org.apache.hadoop.hive.metastore.api.Partition(); 15 tpart.setDbName(tbl.getDbName()); 16 tpart.setTableName(tbl.getTableName()); 17 tpart.setValues(pvals); 18 19 if (!tbl.isView()) { 20 tpart.setSd(cloneS d(tbl)); 21 tpart.getSd().setLocation((location != null) ? location.toString() : null); 22 } 23 return tpart; 24 }

  隨之MetaDataClient對於該對象調用MetaDataService的addPartition,並進行了深拷貝,這裏再也不詳細說明,那麼咱們直接看下服務端幹了什麼:

 1     private Partition add_partition_core(final RawStore ms,
 2         final Partition part, final EnvironmentContext envContext)
 3         throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
 4       boolean success = false;
 5       Table tbl = null;
 6       try {
 7         ms.openTransaction(); 
       //根據DbName、TableName獲取整個Table對象信息
8 tbl = ms.getTable(part.getDbName(), part.getTableName()); 9 if (tbl == null) { 10 throw new InvalidObjectException( 11 "Unable to add partition because table or database do not exist"); 12 } 13      //事件處理 14 firePreEvent(new PreAddPartitionEvent(tbl, part, this)); 15      //在建立Partition以前,首先會校驗元數據中該partition是否存在 16 boolean shouldAdd = startAddPartition(ms, part, false); 17 assert shouldAdd; // start would throw if it already existed here
       //建立Partition路徑 18 boolean madeDir = createLocationForAddedPartition(tbl, part); 19 try {
        //加載一些kv信息
20 initializeAddedPartition(tbl, part, madeDir);
        //寫入元數據
21 success = ms.addPartition(part); 22 } finally { 23 if (!success && madeDir) {
         //若是沒有成功,便刪除物理路徑
24 wh.deleteDir(new Path(part.getSd().getLocation()), true); 25 } 26 } 27 // we proceed only if we'd actually succeeded anyway, otherwise, 28 // we'd have thrown an exception 29 success = success && ms.commitTransaction(); 30 } finally { 31 if (!success) { 32 ms.rollbackTransaction(); 33 } 34 fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success); 35 } 36 return part; 37 }

  這裏說起一個設計上的點,從以前的表結構設計上,沒有直接存儲PartName,而是將key與value單獨存在與kv表中,這裏咱們看下createLocationForAddedPartition:

 1     private boolean createLocationForAddedPartition(
 2         final Table tbl, final Partition part) throws MetaException {
 3       Path partLocation = null;
 4       String partLocationStr = null;
      //若是sd不爲null,則將sd的location信息做爲表跟目錄賦給partLocationStr
5 if (part.getSd() != null) { 6 partLocationStr = part.getSd().getLocation(); 7 } 8     //若是爲null,則從新拼接part Location 9 if (partLocationStr == null || partLocationStr.isEmpty()) { 10 // set default location if not specified and this is 11 // a physical table partition (not a view) 12 if (tbl.getSd().getLocation() != null) {
        //若是不爲null,則繼續拼接文件路徑及part的路徑,組成完成的Partition location
13 partLocation = new Path(tbl.getSd().getLocation(), Warehouse 14 .makePartName(tbl.getPartitionKeys(), part.getValues())); 15 } 16 } else { 17 if (tbl.getSd().getLocation() == null) { 18 throw new MetaException("Cannot specify location for a view partition"); 19 } 20 partLocation = wh.getDnsPath(new Path(partLocationStr)); 21 } 22 23 boolean result = false;
     //將location信息寫入sd表
24 if (partLocation != null) { 25 part.getSd().setLocation(partLocation.toString()); 26 27 // Check to see if the directory already exists before calling 28 // mkdirs() because if the file system is read-only, mkdirs will 29 // throw an exception even if the directory already exists. 30 if (!wh.isDir(partLocation)) { 31 if (!wh.mkdirs(partLocation, true)) { 32 throw new MetaException(partLocation 33 + " is not a directory or unable to create one"); 34 } 35 result = true; 36 } 37 } 38 return result; 39 }

  總結:

  七、dropPartition

  刪除partition就再也不從Hive開始了,咱們直接看HiveMetaStore服務端作了什麼:

 1     private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
 2       List<String> part_vals, final boolean deleteData, final EnvironmentContext envContext)
 3       throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
 4       InvalidInputException {
 5       boolean success = false;
 6       Path partPath = null;
 7       Table tbl = null;
 8       Partition part = null;
 9       boolean isArchived = false;
10       Path archiveParentDir = null;
11       boolean mustPurge = false;
12 
13       try {
14         ms.openTransaction();
       //根據dbName、tableName、part_values獲取整個part信息
15 part = ms.getPartition(db_name, tbl_name, part_vals);
       //獲取全部Table對象
16 tbl = get_table_core(db_name, tbl_name); 17 firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this)); 18 mustPurge = isMustPurge(envContext, tbl); 19 20 if (part == null) { 21 throw new NoSuchObjectException("Partition doesn't exist. " 22 + part_vals); 23 } 24      //這一片尚未深刻看Arrchived partition 25 isArchived = MetaStoreUtils.isArchived(part); 26 if (isArchived) { 27 archiveParentDir = MetaStoreUtils.getOriginalLocation(part); 28 verifyIsWritablePath(archiveParentDir); 29 checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, mustPurge); 30 } 31 if (!ms.dropPartition(db_name, tbl_name, part_vals)) { 32 throw new MetaException("Unable to drop partition"); 33 } 34 success = ms.commitTransaction(); 35 if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { 36 partPath = new Path(part.getSd().getLocation()); 37 verifyIsWritablePath(partPath); 38 checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, mustPurge); 39 } 40 } finally { 41 if (!success) { 42 ms.rollbackTransaction(); 43 } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) { 44 if (tbl != null && !isExternal(tbl)) { 45 if (mustPurge) { 46 LOG.info("dropPartition() will purge " + partPath + " directly, skipping trash."); 47 } 48 else { 49 LOG.info("dropPartition() will move " + partPath + " to trash-directory."); 50 }
         //刪除partition
51 // Archived partitions have har:/to_har_file as their location. 52 // The original directory was saved in params 53 if (isArchived) { 54 assert (archiveParentDir != null); 55 wh.deleteDir(archiveParentDir, true, mustPurge); 56 } else { 57 assert (partPath != null); 58 wh.deleteDir(partPath, true, mustPurge); 59 deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge); 60 } 61 // ok even if the data is not deleted 62 } 63 } 64 for (MetaStoreEventListener listener : listeners) { 65 DropPartitionEvent dropPartitionEvent = 66 new DropPartitionEvent(tbl, part, success, deleteData, this); 67 dropPartitionEvent.setEnvironmentContext(envContext); 68 listener.onDropPartition(dropPartitionEvent); 69 } 70 } 71 return true; 72 }

  八、alterPartition

  alterPartition牽扯的校驗及文件目錄的修改,咱們直接從HiveMetaStore中的rename_partition中查看:

 1     private void rename_partition(final String db_name, final String tbl_name,
 2         final List<String> part_vals, final Partition new_part,
 3         final EnvironmentContext envContext)
 4         throws InvalidOperationException, MetaException,
 5         TException {
      //日誌記錄
6 startTableFunction("alter_partition", db_name, tbl_name); 7 8 if (LOG.isInfoEnabled()) { 9 LOG.info("New partition values:" + new_part.getValues()); 10 if (part_vals != null && part_vals.size() > 0) { 11 LOG.info("Old Partition values:" + part_vals); 12 } 13 } 14 15 Partition oldPart = null; 16 Exception ex = null; 17 try { 18 firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this)); 19      //校驗PartName的規範性 20 if (part_vals != null && !part_vals.isEmpty()) { 21 MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(), 22 partitionValidationPattern); 23 } 24      調用alterHandler的alterPartition進行partition物理上的rename,以及元數據修改 25 oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part); 26 27 // Only fetch the table if we actually have a listener 28 Table table = null; 29 for (MetaStoreEventListener listener : listeners) { 30 if (table == null) { 31 table = getMS().getTable(db_name, tbl_name); 32 } 33 AlterPartitionEvent alterPartitionEvent = 34 new AlterPartitionEvent(oldPart, new_part, table, true, this); 35 alterPartitionEvent.setEnvironmentContext(envContext); 36 listener.onAlterPartition(alterPartitionEvent); 37 } 38 } catch (InvalidObjectException e) { 39 ex = e; 40 throw new InvalidOperationException(e.getMessage()); 41 } catch (AlreadyExistsException e) { 42 ex = e; 43 throw new InvalidOperationException(e.getMessage()); 44 } catch (Exception e) { 45 ex = e; 46 if (e instanceof MetaException) { 47 throw (MetaException) e; 48 } else if (e instanceof InvalidOperationException) { 49 throw (InvalidOperationException) e; 50 } else if (e instanceof TException) { 51 throw (TException) e; 52 } else { 53 throw newMetaException(e); 54 } 55 } finally { 56 endFunction("alter_partition", oldPart != null, ex, tbl_name); 57 } 58 return; 59 }

  這裏咱們着重看一下,alterHandler.alterPartition方法,前方高能:

  1   public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
  2       final String name, final List<String> part_vals, final Partition new_part)
  3       throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
  4       MetaException {
  5     boolean success = false;
  6 
  7     Path srcPath = null;
  8     Path destPath = null;
  9     FileSystem srcFs = null;
 10     FileSystem destFs = null;
 11     Partition oldPart = null;
 12     String oldPartLoc = null;
 13     String newPartLoc = null;
 14 
 15     //修改新的partition的DDL時間
 16     if (new_part.getParameters() == null ||
 17         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
 18         Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
 19       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
 20           .currentTimeMillis() / 1000));
 21     }
 22    //根據dbName、tableName獲取整個Table對象
 23     Table tbl = msdb.getTable(dbname, name);
 24     //若是傳入的part_vals爲空或爲0,說明修改的只是partition的其餘元數據信息而不牽扯到partKV,則直接元數據,在msdb.alterPartition會直接更新
 25     if (part_vals == null || part_vals.size() == 0) {
 26       try {
 27         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
 28         if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
 29           MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
 30         }
 31         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
 32         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
 33       } catch (InvalidObjectException e) {
 34         throw new InvalidOperationException("alter is not possible");
 35       } catch (NoSuchObjectException e){
 36         //old partition does not exist
 37         throw new InvalidOperationException("alter is not possible");
 38       }
 39       return oldPart;
 40     }
 41     //rename partition
 42     try {
 43       msdb.openTransaction();
 44       try {
       //獲取oldPart對象信息
45 oldPart = msdb.getPartition(dbname, name, part_vals); 46 } catch (NoSuchObjectException e) { 47 // this means there is no existing partition 48 throw new InvalidObjectException( 49 "Unable to rename partition because old partition does not exist"); 50 } 51 Partition check_part = null; 52 try {
       //組裝newPart的partValues等Partition信息
53 check_part = msdb.getPartition(dbname, name, new_part.getValues()); 54 } catch(NoSuchObjectException e) { 55 // this means there is no existing partition 56 check_part = null; 57 }
      //若是check_part組裝成功,說明該part已經存在,則報already exists
58 if (check_part != null) { 59 throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." + 60 new_part.getValues()); 61 }
      //table的信息校驗
62 if (tbl == null) { 63 throw new InvalidObjectException( 64 "Unable to rename partition because table or database do not exist"); 65 } 66 67 //若是是外部表的分區變化了,那麼不須要操做文件系統,直接更新meta信息便可 68 if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { 69 new_part.getSd().setLocation(oldPart.getSd().getLocation()); 70 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues()); 71 try { 72 //existing partition column stats is no longer valid, remove 73 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null); 74 } catch (NoSuchObjectException nsoe) { 75 //ignore 76 } catch (InvalidInputException iie) { 77 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 78 } 79 msdb.alterPartition(dbname, name, part_vals, new_part); 80 } else { 81 try {
         //獲取Table的文件路徑
82 destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name), 83 Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
         //拼接新的Partition的路徑
84 destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation())); 85 } catch (NoSuchObjectException e) { 86 LOG.debug(e); 87 throw new InvalidOperationException( 88 "Unable to change partition or table. Database " + dbname + " does not exist" 89 + " Check metastore logs for detailed stack." + e.getMessage()); 90 }
       //若是destPath不爲空,說明改變了文件路徑
91 if (destPath != null) { 92 newPartLoc = destPath.toString(); 93 oldPartLoc = oldPart.getSd().getLocation(); 94       //根據原有sd的路徑獲取老的part路徑信息 95 srcPath = new Path(oldPartLoc); 96 97 LOG.info("srcPath:" + oldPartLoc); 98 LOG.info("descPath:" + newPartLoc); 99 srcFs = wh.getFs(srcPath); 100 destFs = wh.getFs(destPath); 101 //查看srcFS與destFs是否Wie同一個fileSystem 102 if (!FileUtils.equalsFileSystem(srcFs, destFs)) { 103 throw new InvalidOperationException("table new location " + destPath 104 + " is on a different file system than the old location " 105 + srcPath + ". This operation is not supported"); 106 } 107 try {
          //校驗老的partition路徑與新的partition路徑是否一致,同時新的partition路徑是否已經存在  
108 srcFs.exists(srcPath); // check that src exists and also checks 109 if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) { 110 throw new InvalidOperationException("New location for this table " 111 + tbl.getDbName() + "." + tbl.getTableName() 112 + " already exists : " + destPath); 113 } 114 } catch (IOException e) { 115 throw new InvalidOperationException("Unable to access new location " 116 + destPath + " for partition " + tbl.getDbName() + "." 117 + tbl.getTableName() + " " + new_part.getValues()); 118 } 119 new_part.getSd().setLocation(newPartLoc); 120 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) { 121 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true); 122 }
         //拼接oldPartName,而且刪除原有oldPart的信息,寫入新的partition信息
123 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues()); 124 try { 125 //existing partition column stats is no longer valid, remove 126 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null); 127 } catch (NoSuchObjectException nsoe) { 128 //ignore 129 } catch (InvalidInputException iie) { 130 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 131 } 132 msdb.alterPartition(dbname, name, part_vals, new_part); 133 } 134 } 135 136 success = msdb.commitTransaction(); 137 } finally { 138 if (!success) { 139 msdb.rollbackTransaction(); 140 } 141 if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) { 142 //rename the data directory 143 try{ 144 if (srcFs.exists(srcPath)) { 145 //若是根路徑海微建立,須要從新進行建立,就比如計算引擎先調用了alterTable,又調用了alterPartition,這時partition的根路徑或許還未建立 146 Path destParentPath = destPath.getParent(); 147 if (!wh.mkdirs(destParentPath, true)) { 148 throw new IOException("Unable to create path " + destParentPath); 149 }
          //進行原路徑與目標路徑的rename
150 wh.renameDir(srcPath, destPath, true); 151 LOG.info("rename done!"); 152 } 153 } catch (IOException e) { 154 boolean revertMetaDataTransaction = false; 155 try { 156 msdb.openTransaction(); 157 msdb.alterPartition(dbname, name, new_part.getValues(), oldPart); 158 revertMetaDataTransaction = msdb.commitTransaction(); 159 } catch (Exception e1) { 160 LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1); 161 if (!revertMetaDataTransaction) { 162 msdb.rollbackTransaction(); 163 } 164 } 165 throw new InvalidOperationException("Unable to access old location " 166 + srcPath + " for partition " + tbl.getDbName() + "." 167 + tbl.getTableName() + " " + part_vals); 168 } 169 } 170 } 171 return oldPart; 172 }

  暫時到這裏吧~後續我們慢慢玩哈~

相關文章
相關標籤/搜索