在 上一篇的CliDriver 類中介紹了CliDriver 類會引用到CommandProcessor相關類,主要是根據命令來判斷具體實現類,好比經過本地的hive cli啓動時,運行hive的命令(非list/source/shell命令等)時在processCmd方法中有以下實現:
java
try { CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);// 根據命令判斷具體的CommandProcessor 實現類 ret = processLocalCmd(cmd, proc, ss); } catch (SQLException e) { console.printError( "Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(), org.apache.hadoop.util.StringUtils.stringifyException(e)); ret = 1; }
具體的決定什麼樣的命令對應什麼樣的具體實現類由 CommandProcessorFactory 規定:若是是set,reset,dfs,add delete,compile等命令,返回對應的CommandProcessor實現類。其他有效命令好比select,insert 都是返回Driver類。sql
CommandProcessor相關類在org.apache.hadoop.hive.ql.processors包中,類的具體的uml圖以下:
shell
簡單看下幾個類的實現:
apache
1.HiveCommand類,是一個迭代類,定義了非sql的一些語句ide
public enum HiveCommand { SET(), RESET(), DFS(), ADD(), DELETE(), COMPILE(); private static final Set<String> COMMANDS = new HashSet<String>(); static { for (HiveCommand command : HiveCommand. values()) { COMMANDS.add(command.name()); } } public static HiveCommand find(String[] command) { if (null == command){ return null; } String cmd = command[0]; if (cmd != null) { cmd = cmd.trim().toUpperCase(); if (command. length > 1 && "role".equalsIgnoreCase(command[1])) { // special handling for set role r1 statement return null ; } else if (COMMANDS .contains(cmd)) { return HiveCommand. valueOf(cmd); } } return null; } }
2.CommandProcessorFactory 類,主要用於獲取具體的命令實現類oop
主要定義了get和getForHiveCommand方法post
方法調用get----->getForHiveCommand,其中getForHiveCommand會調HiveCommand 類,HiveCommand 類是一個枚舉類型,定義了一些命令。對象
getForHiveCommand方法中: public static CommandProcessor getForHiveCommand(String[] cmd, HiveConf conf) throws SQLException { HiveCommand hiveCommand = HiveCommand.find(cmd); // sql語句返回值爲null if (hiveCommand == null || isBlank(cmd[0])) { return null; } if (conf == null) { conf = new HiveConf(); } Set<String> availableCommands = new HashSet<String>(); for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST).split( ",")) { availableCommands.add(availableCommand.toLowerCase().trim()); } if (!availableCommands.contains(cmd[0].trim().toLowerCase())) { throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000"); } switch (hiveCommand) { // 每種語句對應的具體的processor類 case SET: return new SetProcessor(); case RESET: return new ResetProcessor(); case DFS: SessionState ss = SessionState.get(); return new DfsProcessor(ss.getConf()); case ADD: return new AddResourceProcessor(); case DELETE: return new DeleteResourceProcessor(); case COMPILE: return new CompileProcessor(); default: throw new AssertionError("Unknown HiveCommand " + hiveCommand); } } get方法: public static CommandProcessor get(String[] cmd, HiveConf conf) throws SQLException { CommandProcessor result = getForHiveCommand(cmd, conf); if (result != null) { return result; // 若是result不爲空,即命令在HiveCommand 的迭代器中定義的話,直接返回對應的結果 } if (isBlank(cmd[0])) { return null; } else { // 爲空的話返回Driver類的實例 if (conf == null) { return new Driver(); } Driver drv = mapDrivers.get(conf); if (drv == null) { drv = new Driver(); mapDrivers.put(conf, drv); } drv.init(); return drv; } }
3.CommandProcessorResponse類封裝了processor的返回信息,好比返回碼,錯誤信息等。blog
4.CommandProcessor 類是一個接口,具體的實現類由下面幾個:token
AddResourceProcessor/CompileProcessor/DeleteResourceProcessor/DfsProcessor/ResetProcessor/SetProcessor/Driver
主要實現方法在各個實現類的run方法中,run方法返回一個CommandProcessorResponse的對象。
下面簡單的說下經常使用的幾個實現類:
a.AddResourceProcessor類是處理add xxx命令的。
主要有兩個步驟:
1)判斷命令的合法性(長度,類型是否在FILE,JAR,ARCHIVE 3種以內)
2)調用SessionState的add_resource方法(
SessionState.add_resource方法---->調用SessionState.downloadResource---> 調用FileSystem的copyToLocalFile方法,把文件下載到本地
)
public CommandProcessorResponse run(String command) { SessionState ss = SessionState.get(); command = new VariableSubstitution().substitute(ss.getConf(),command); String[] tokens = command.split( "\\s+"); SessionState.ResourceType t; if (tokens. length < 2 || (t = SessionState.find_resource_type(tokens[0])) == null) { console.printError( "Usage: add [" + StringUtils.join(SessionState .ResourceType.values(), "|" ) + "] <value> [<value>]*" ); return new CommandProcessorResponse(1); } for ( int i = 1; i < tokens.length ; i++) { String resourceFile = ss.add_resource(t, tokens[i]); if(resourceFile == null){ String errMsg = tokens[i]+ " does not exist."; return new CommandProcessorResponse(1,errMsg,null); } } return new CommandProcessorResponse(0); }
b.相反的DeleteResourceProcessor是用來處理delete xxx命令的。
最終調用了SessionState的delete_resource方法,把resource從HashMap中去掉。
SessionState的delete_resource方法 public boolean delete_resource(ResourceType t, String value) { if (resource_map.get(t) == null) { return false; } if (t. hook != null) { if (!t. hook.postHook( resource_map.get(t), value)) { return false; } } return ( resource_map.get(t).remove(value)); }
c.DfsProcessor類用來處理dfs 命令,即已「!dfs」開頭的命令,最終調用了FsShell的run方法
d.SetProcessor類用來處理set xxx等命令,能夠用來設置參數,變量等。
設置參數時
1)以system: 開頭的調用了 System.getProperties().setProperty方法。
好比
hive> set system:user.name=xxxx; hive> set system:user.name; system:user.name=xxxx
2)以hiveconf:開頭:
調用了HiveConf的verifyAndSet方法
3)以hivevar:開頭:
ss.getHiveVariables().put方法
Driver的實現比較複雜,放在下篇講解。