hive 2.1html
hive執行sql有兩種方式:sql
下面分別看這些場景下sql是怎樣被執行的:apache
啓動hive客戶端命令session
$HIVE_HOME/bin/hiveapp
等價於ide
$HIVE_HOME/bin/hive --service clioop
會調用fetch
$HIVE_HOME/bin/ext/cli.shui
實際啓動類爲:org.apache.hadoop.hive.cli.CliDriverthis
org.apache.hadoop.hive.cli.CliDriver
public static void main(String[] args) throws Exception { int ret = new CliDriver().run(args); System.exit(ret); } public int run(String[] args) throws Exception { ... // execute cli driver work try { return executeDriver(ss, conf, oproc); } finally { ss.resetThreadName(); ss.close(); } ... private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc) throws Exception { ... if (ss.execString != null) { int cmdProcessStatus = cli.processLine(ss.execString); return cmdProcessStatus; } ... try { if (ss.fileName != null) { return cli.processFile(ss.fileName); } } catch (FileNotFoundException e) { System.err.println("Could not open input file for reading. (" + e.getMessage() + ")"); return 3; } ... while ((line = reader.readLine(curPrompt + "> ")) != null) { if (!prefix.equals("")) { prefix += '\n'; } if (line.trim().startsWith("--")) { continue; } if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { line = prefix + line; ret = cli.processLine(line, true); ... public int processFile(String fileName) throws IOException { ... rc = processReader(bufferReader); ... public int processReader(BufferedReader r) throws IOException { String line; StringBuilder qsb = new StringBuilder(); while ((line = r.readLine()) != null) { // Skipping through comments if (! line.startsWith("--")) { qsb.append(line + "\n"); } } return (processLine(qsb.toString())); } public int processLine(String line, boolean allowInterrupting) { ... ret = processCmd(command); ... public int processCmd(String cmd) { ... CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf); ret = processLocalCmd(cmd, proc, ss); ... int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) { int tryCount = 0; boolean needRetry; int ret = 0; do { try { needRetry = false; if (proc != null) { if (proc instanceof Driver) { Driver qp = (Driver) proc; PrintStream out = ss.out; long start = System.currentTimeMillis(); if (ss.getIsVerbose()) { out.println(cmd); } qp.setTryCount(tryCount); ret = qp.run(cmd).getResponseCode(); ... while (qp.getResults(res)) { for (String r : res) { out.println(r); } ...
CliDriver.main會調用run,run會調用executeDriver,在executeDriver中對應上邊提到的三種狀況:
上述三種狀況最終都會調用processLine,processLine會調用processLocalCmd,在processLocalCmd中會先調用到Driver.run執行sql,執行完以後再調用Driver.getResults輸出結果,這也是Driver最重要的兩個接口,Driver實現後邊再看;
beeline須要鏈接到hive thrift server,先看hive thrift server如何啓動:
啓動hive thrift server命令
$HIVE_HOME/bin/hiveserver2
等價於
$HIVE_HOME/bin/hive --service hiveserver2
會調用
$HIVE_HOME/bin/ext/hiveserver2.sh
實際啓動類爲:org.apache.hive.service.server.HiveServer2
HiveServer2.main
startHiveServer2
init
addService-CLIService,ThriftBinaryCLIService
start
Service.start
CLIService.start
ThriftBinaryCLIService.start
TThreadPoolServer.serve
TServer->TThreadPoolServer
TProcessorFactory->SQLPlainProcessorFactory
TProcessor->TSetIpAddressProcessor
ThriftCLIService->ThriftBinaryCLIService
CLIService
HiveSession
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) { super(cliService, ThriftBinaryCLIService.class.getSimpleName()); this.oomHook = oomHook; }
ThriftBinaryCLIService是一個核心類,其中會實際啓動thrift server,同時包裝一個CLIService,請求最後都會調用底層的CLIService處理,下面看CLIService代碼:
org.apache.hive.service.cli.CLIService
@Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map<String, String> confOverlay) throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay); LOG.debug(sessionHandle + ": executeStatement()"); return opHandle; } @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType); LOG.debug(opHandle + ": fetchResults()"); return rowSet; }
CLIService最重要的兩個接口,一個是executeStatement,一個是fetchResults,兩個接口都會轉發給HiveSession處理,下面看HiveSession實現類代碼:
org.apache.hive.service.cli.session.HiveSessionImpl
@Override public OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException { return executeStatementInternal(statement, confOverlay, false, 0); } private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { acquire(true, true); ExecuteStatementOperation operation = null; OperationHandle opHandle = null; try { operation = getOperationManager().newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync, queryTimeout); opHandle = operation.getHandle(); operation.run(); ... @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { acquire(true, false); try { if (fetchType == FetchType.QUERY_OUTPUT) { return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); } return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf); } finally { release(true, false); } }
可見
org.apache.hive.service.cli.operation.OperationManager
public RowSet getOperationNextRowSet(OperationHandle opHandle, FetchOrientation orientation, long maxRows) throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); }
下面寫詳細看Operation的run和getOperationNextRowSet:
org.apache.hive.service.cli.operation.Operation
public void run() throws HiveSQLException { beforeRun(); try { Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { metrics.incrementCounter(MetricsConstant.OPEN_OPERATIONS); } catch (Exception e) { LOG.warn("Error Reporting open operation to Metrics system", e); } } runInternal(); } finally { afterRun(); } } public RowSet getNextRowSet() throws HiveSQLException { return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS); }
Operation是一個抽象類,
下面會看到這兩個抽象方法在子類中的實現,最終會依賴Driver的run和getResults;
1)先看runInternal在子類HiveCommandOperation中被實現:
org.apache.hive.service.cli.operation.HiveCommandOperation
@Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); try { String command = getStatement().trim(); String[] tokens = statement.split("\\s"); String commandArgs = command.substring(tokens[0].length()).trim(); CommandProcessorResponse response = commandProcessor.run(commandArgs); ...
這裏會調用CommandProcessor.run,實際會調用Driver.run(Driver是CommandProcessor的實現類);
2)再看getNextRowSet在子類SQLOperation中被實現:
org.apache.hive.service.cli.operation.SQLOperation
public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { ... driver.setMaxRows((int) maxRows); if (driver.getResults(convey)) { return decode(convey, rowSet); } ...
這裏會調用Driver.getResults;
經過上面的代碼分析發現不管是hive命令行執行仍是beeline鏈接thrift server執行,最終都會依賴Driver,
Driver最核心的兩個接口:
org.apache.hadoop.hive.ql.Driver
@Override public CommandProcessorResponse run(String command) throws CommandNeedRetryException { return run(command, false); } public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); ... private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { ... ret = compileInternal(command, true); ... ret = execute(true); ... private int compileInternal(String command, boolean deferClose) { ... ret = compile(command, true, deferClose); ... public int compile(String command, boolean resetTaskIds, boolean deferClose) { ... plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema); ... public int execute(boolean deferClose) throws CommandNeedRetryException { ... // Add root Tasks to runnable for (Task<? extends Serializable> tsk : plan.getRootTasks()) { // This should never happen, if it does, it's a bug with the potential to produce // incorrect results. assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); driverCxt.addToRunnable(tsk); } ... // Loop while you either have tasks running, or tasks queued up while (driverCxt.isRunning()) { // Launch upto maxthreads tasks Task<? extends Serializable> task; while ((task = driverCxt.getRunnable(maxthreads)) != null) { TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); if (!runner.isRunning()) { break; } } // poll the Tasks to see which one completed TaskRunner tskRun = driverCxt.pollFinished(); if (tskRun == null) { continue; } hookContext.addCompleteTask(tskRun); queryDisplay.setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult()); Task<? extends Serializable> tsk = tskRun.getTask(); TaskResult result = tskRun.getTaskResult(); ... if (tsk.getChildTasks() != null) { for (Task<? extends Serializable> child : tsk.getChildTasks()) { if (DriverContext.isLaunchable(child)) { driverCxt.addToRunnable(child); } } } } public boolean getResults(List res) throws IOException, CommandNeedRetryException { if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) { throw new IOException("FAILED: query has been cancelled, closed, or destroyed."); } if (isFetchingTable()) { /** * If resultset serialization to thrift object is enabled, and if the destination table is * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file, * since it is a blob of row batches. */ if (fetchTask.getWork().isUsingThriftJDBCBinarySerDe()) { maxRows = 1; } fetchTask.setMaxRows(maxRows); return fetchTask.fetch(res); } ...
Hive SQL解析過程詳見: http://www.javashuo.com/article/p-oeaeytzt-ea.html