hive執行流程(2)-CommandProcessor相關類


  在 上一篇的CliDriver 類中介紹了CliDriver 類會引用到CommandProcessor相關類,主要是根據命令來判斷具體實現類,好比經過本地的hive cli啓動時,運行hive的命令(非list/source/shell命令等)時在processCmd方法中有以下實現:
java

try {
        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);// 根據命令判斷具體的CommandProcessor 實現類
        ret = processLocalCmd(cmd, proc, ss);
      } catch (SQLException e) {
        console.printError( "Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
          org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }

具體的決定什麼樣的命令對應什麼樣的具體實現類由 CommandProcessorFactory 規定:若是是set,reset,dfs,add delete,compile等命令,返回對應的CommandProcessor實現類。其他有效命令好比select,insert 都是返回Driver類。sql

CommandProcessor相關類在org.apache.hadoop.hive.ql.processors包中,類的具體的uml圖以下:
shell

wKioL1RHw_LzbuQHAATW62gKz60796.jpg

簡單看下幾個類的實現:
apache

1.HiveCommand類,是一個迭代類,定義了非sql的一些語句ide

public enum HiveCommand {
  SET(),
  RESET(),
  DFS(),
  ADD(),
  DELETE(),
  COMPILE();
  private static final Set<String> COMMANDS = new HashSet<String>();
  static {
    for (HiveCommand command : HiveCommand. values()) {
      COMMANDS.add(command.name());
    }
  }
  public static HiveCommand find(String[] command) {
    if (null == command){
      return null;
    }
    String cmd = command[0];
    if (cmd != null) {
      cmd = cmd.trim().toUpperCase();
      if (command. length > 1 && "role".equalsIgnoreCase(command[1])) {
        // special handling for set role r1 statement
        return null ;
      } else if (COMMANDS .contains(cmd)) {
        return HiveCommand. valueOf(cmd);
      }
    }
    return null;
  }
}

2.CommandProcessorFactory 類,主要用於獲取具體的命令實現類oop

主要定義了get和getForHiveCommand方法post

方法調用get----->getForHiveCommand,其中getForHiveCommand會調HiveCommand 類,HiveCommand 類是一個枚舉類型,定義了一些命令。對象

getForHiveCommand方法中:
  public static CommandProcessor getForHiveCommand(String[] cmd, HiveConf conf)
      throws SQLException {
    HiveCommand hiveCommand = HiveCommand.find(cmd);  // sql語句返回值爲null
    if (hiveCommand == null || isBlank(cmd[0])) {
      return null;
    }
    if (conf == null) {
      conf = new HiveConf();
    }
    Set<String> availableCommands = new HashSet<String>();
    for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST).split( ",")) {
      availableCommands.add(availableCommand.toLowerCase().trim());
    }
    if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
      throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000");
    }
    switch (hiveCommand) {  // 每種語句對應的具體的processor類
      case SET:
        return new SetProcessor();
      case RESET:
        return new ResetProcessor();
      case DFS:
        SessionState ss = SessionState.get();
        return new DfsProcessor(ss.getConf());
      case ADD:
        return new AddResourceProcessor();
      case DELETE:
        return new DeleteResourceProcessor();
      case COMPILE:
        return new CompileProcessor();
      default:
        throw new AssertionError("Unknown HiveCommand " + hiveCommand);
    }
  }
get方法:
  public static CommandProcessor get(String[] cmd, HiveConf conf)
      throws SQLException {
    CommandProcessor result = getForHiveCommand(cmd, conf);
    if (result != null) {
      return result;  // 若是result不爲空,即命令在HiveCommand 的迭代器中定義的話,直接返回對應的結果
    }
    if (isBlank(cmd[0])) {
      return null;
    } else {  // 爲空的話返回Driver類的實例
      if (conf == null) {
        return new Driver();    
      }
      Driver drv = mapDrivers.get(conf);
      if (drv == null) {
        drv = new Driver();
        mapDrivers.put(conf, drv);
      }
      drv.init();
      return drv;
    }
  }

3.CommandProcessorResponse類封裝了processor的返回信息,好比返回碼,錯誤信息等。blog

4.CommandProcessor 類是一個接口,具體的實現類由下面幾個:token

AddResourceProcessor/CompileProcessor/DeleteResourceProcessor/DfsProcessor/ResetProcessor/SetProcessor/Driver

主要實現方法在各個實現類的run方法中,run方法返回一個CommandProcessorResponse的對象。

下面簡單的說下經常使用的幾個實現類:

a.AddResourceProcessor類是處理add xxx命令的。

主要有兩個步驟:

1)判斷命令的合法性(長度,類型是否在FILE,JAR,ARCHIVE 3種以內)

2)調用SessionState的add_resource方法(

SessionState.add_resource方法---->調用SessionState.downloadResource--->
調用FileSystem的copyToLocalFile方法,把文件下載到本地

)

 public CommandProcessorResponse run(String command) {
    SessionState ss = SessionState.get();
    command = new VariableSubstitution().substitute(ss.getConf(),command);
    String[] tokens = command.split( "\\s+");
    SessionState.ResourceType t;
    if (tokens. length < 2
        || (t = SessionState.find_resource_type(tokens[0])) == null) {
      console.printError( "Usage: add ["
          + StringUtils.join(SessionState .ResourceType.values(), "|" )
          + "] <value> [<value>]*" );
      return new CommandProcessorResponse(1);
    }
    for ( int i = 1; i < tokens.length ; i++) {
      String resourceFile = ss.add_resource(t, tokens[i]);
      if(resourceFile == null){
        String errMsg = tokens[i]+ " does not exist.";
        return new CommandProcessorResponse(1,errMsg,null);
      }
    }
    return new CommandProcessorResponse(0);
  }

b.相反的DeleteResourceProcessor是用來處理delete  xxx命令的。

最終調用了SessionState的delete_resource方法,把resource從HashMap中去掉。

SessionState的delete_resource方法
  public boolean delete_resource(ResourceType t, String value) {
    if (resource_map.get(t) == null) {
      return false;
    }
    if (t. hook != null) {
      if (!t. hook.postHook( resource_map.get(t), value)) {
        return false;
      }
    }
    return ( resource_map.get(t).remove(value));
  }

c.DfsProcessor類用來處理dfs 命令,即已「!dfs」開頭的命令,最終調用了FsShell的run方法

d.SetProcessor類用來處理set xxx等命令,能夠用來設置參數,變量等。

設置參數時

1)以system: 開頭的調用了 System.getProperties().setProperty方法。

好比

hive> set system:user.name=xxxx;
hive> set system:user.name;    
system:user.name=xxxx

2)以hiveconf:開頭:

調用了HiveConf的verifyAndSet方法

3)以hivevar:開頭:

ss.getHiveVariables().put方法

Driver的實現比較複雜,放在下篇講解。

相關文章
相關標籤/搜索