目錄html
spark 2.2.0java
//位置:src\java\org\apache\hive\beeline\BeeLine.java public static void main(String[] args) throws IOException { mainWithInputRedirection(args, null); } public static void mainWithInputRedirection(String[] args, InputStream inputStream) throws IOException { BeeLine beeLine = new BeeLine(); int status = beeLine.begin(args, inputStream); if (!Boolean.getBoolean("beeline.system.exit")) { System.exit(status); } }
public int begin(String[] args, InputStream inputStream) throws IOException { try { getOpts().load(); } catch (Exception e) {} try { int code = initArgs(args); int i; if (code != 0) { return code; } if (getOpts().getScriptFile() != null) { return executeFile(getOpts().getScriptFile()); } try { info(getApplicationTitle()); } catch (Exception e) {} ConsoleReader reader = getConsoleReader(inputStream); return execute(reader, false); } finally { close(); } } String line = getOpts().isSilent() ? reader.readLine(null, Character.valueOf('\000')) : reader.readLine(getPrompt()); if ((!dispatch(line)) && (exitOnError)) { return 2; }
this.commands.sql(line, getOpts().getEntireLineAsCommand());
if (hasResults) { do { ResultSet rs = stmnt.getResultSet(); try { int count = this.beeLine.print(rs); long end = System.currentTimeMillis(); this.beeLine.info(this.beeLine.loc("rows-selected", count) + " " + this.beeLine.locElapsedTime(end - start)); } finally { if (logThread != null) { logThread.join(10000L); showRemainingLogsIfAny(stmnt); logThread = null; } rs.close(); } } while (BeeLine.getMoreResults(stmnt)); } else { int count = stmnt.getUpdateCount(); long end = System.currentTimeMillis(); this.beeLine.info(this.beeLine.loc("rows-affected", count) + " " + this.beeLine.locElapsedTime(end - start)); }
TCancelOperationResp cancelResp = this.client.CancelOperation(cancelReq);
TCloseOperationReq closeReq = new TCloseOperationReq(this.stmtHandle); TCloseOperationResp closeResp = this.client.CloseOperation(closeReq);
TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq);
statusResp = this.client.GetOperationStatus(statusReq);
TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq);
this.resultSet = new HiveQueryResultSet .Builder(this) .setClient(this.client) .setSessionHandle(this.sessHandle) .setStmtHandle(this.stmtHandle) .setMaxRows(this.maxRows) .setFetchSize(this.fetchSize) .setScrollable(this.isScrollableResultset) .setTransportLock(this.transportLock) .build();
public Boolean execute(String sql) throws SQLException { checkConnection("execute"); closeClientOperation(); initFlags(); TExecuteStatementReq execReq = new TExecuteStatementReq(this.sessHandle, sql); execReq.setRunAsync(true); execReq.setConfOverlay(this.sessConf); this.transportLock.lock(); try { TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); this.stmtHandle = execResp.getOperationHandle(); this.isExecuteStatementFailed = false; } catch (SQLException eS) { this.isExecuteStatementFailed = true; throw eS; } catch (Exception ex) { this.isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } finally { this.transportLock.unlock(); } TGetOperationStatusReq statusReq = new TGetOperationStatusReq(this.stmtHandle); Boolean operationComplete = false; while (!operationComplete) { try { this.transportLock.lock(); TGetOperationStatusResp statusResp; try { statusResp = this.client.GetOperationStatus(statusReq); } finally { this.transportLock.unlock(); } Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (1.$SwitchMap$org$apache$hive$service$cli$thrift$TOperationState[statusResp.getOperationState().ordinal()]) { case 1: case 2: operationComplete = true; break; case 3: throw new SQLException("Query was cancelled", "01000"); case 4: throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), statusResp.getErrorCode()); case 5: throw new SQLException("Unknown query", "HY000"); } } } catch (SQLException e) { this.isLogBeingGenerated = false; throw e; } catch (Exception e) { this.isLogBeingGenerated = false; throw new SQLException(e.toString(), "08S01", e); } } this.isLogBeingGenerated = false; if (!this.stmtHandle.isHasResultSet()) { return false; } this.resultSet = new HiveQueryResultSet.Builder(this).setClient(this.client).setSessionHandle(this.sessHandle).setStmtHandle(this.stmtHandle).setMaxRows(this.maxRows).setFetchSize(this.fetchSize).setScrollable(this.isScrollableResultset).setTransportLock(this.transportLock).build(); return true; }
public void init(HiveConf hiveConf) { SparkSQLCLIService sparkSqlCliService = new SparkSQLCLIService(this, this.sqlContext); ReflectionUtils..MODULE$.setSuperField(this, "cliService", sparkSqlCliService); addService(sparkSqlCliService); ThriftCLIService thriftCliService = isHTTPTransportMode(hiveConf) ? new ThriftHttpCLIService(sparkSqlCliService) : new ThriftBinaryCLIService(sparkSqlCliService); ReflectionUtils..MODULE$.setSuperField(this, "thriftCLIService", thriftCliService); addService(thriftCliService); initCompositeService(hiveConf); }
private[thriftserver] trait ReflectedCompositeService { this: AbstractService => def initCompositeService(hiveConf: HiveConf) { // Emulating `CompositeService.init(hiveConf)` val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") serviceList.asScala.foreach(_.init(hiveConf)) // Emulating `AbstractService.init(hiveConf)` invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) setAncestorField(this, 3, "hiveConf", hiveConf) invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED) getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.") } }
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) SparkSQLEnv.sparkContext.addSparkListener(listener) HiveThriftServer2Listener實現的接口包括: onJobStart onSessionCreated onSessionClosed onStatementStart onStatementParsed onStatementError onStatementFinish 將這些信息主要記錄在了sessionList和executionList中
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { Some(new ThriftServerTab(SparkSQLEnv.sparkContext)) } else { None }
//位置:hive/hive-1.1.0-cdh5.7.0/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @Override public synchronized void start() { super.start(); if (!isStarted && !isEmbedded) { new Thread(this).start(); isStarted = true; } }
@Override public void run() { try { ... // HTTP Server httpServer = new org.eclipse.jetty.server.Server(threadPool); TProcessor processor = new TCLIService.Processor<Iface>(this); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); // 配置servlet,主要代碼邏輯在processor TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType, serviceUGI, httpUGI); context.addServlet(new ServletHolder(thriftHttpServlet), httpPath); httpServer.join(); } catch (Throwable t) { LOG.fatal( "Error starting HiveServer2: could not start " + ThriftHttpCLIService.class.getSimpleName(), t); System.exit(-1); } }
protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { super(iface, getProcessMap(processMap)); }
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("OpenSession", new OpenSession()); processMap.put("CloseSession", new CloseSession()); processMap.put("GetInfo", new GetInfo()); processMap.put("ExecuteStatement", new ExecuteStatement()); processMap.put("GetTypeInfo", new GetTypeInfo()); processMap.put("GetCatalogs", new GetCatalogs()); processMap.put("GetSchemas", new GetSchemas()); processMap.put("GetTables", new GetTables()); processMap.put("GetTableTypes", new GetTableTypes()); processMap.put("GetColumns", new GetColumns()); processMap.put("GetFunctions", new GetFunctions()); processMap.put("GetOperationStatus", new GetOperationStatus()); processMap.put("CancelOperation", new CancelOperation()); processMap.put("CloseOperation", new CloseOperation()); processMap.put("GetResultSetMetadata", new GetResultSetMetadata()); processMap.put("FetchResults", new FetchResults()); processMap.put("GetDelegationToken", new GetDelegationToken()); processMap.put("CancelDelegationToken", new CancelDelegationToken()); processMap.put("RenewDelegationToken", new RenewDelegationToken()); return processMap; }
@Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map<String, String> confOverlay) throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) .executeStatement(statement, confOverlay); LOG.debug(sessionHandle + ": executeStatement()"); return opHandle; } @Override public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, Map<String, String> confOverlay) throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) .executeStatementAsync(statement, confOverlay); LOG.debug(sessionHandle + ": executeStatementAsync()"); return opHandle; }
spark想借用hivejdbc服務,須要作如下幾件事:sql
override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext) setSuperField(this, "sessionManager", sparkSqlSessionManager) addService(sparkSqlSessionManager) var sparkServiceUGI: UserGroupInformation = null if (UserGroupInformation.isSecurityEnabled) { try { HiveAuthFactory.loginFromKeytab(hiveConf) sparkServiceUGI = Utils.getUGI() setSuperField(this, "serviceUGI", sparkServiceUGI) } catch { case e @ (_: IOException | _: LoginException) => throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) } } initCompositeService(hiveConf) }
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
private lazy val sparkSqlOperationManager = new SparkSQLOperationManager() override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) // Create operation log root directory, if operation logging is enabled if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { invoke(classOf[SessionManager], this, "initOperationLogRootDir") } val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) getAncestorField[Log](this, 3, "LOG").info( s"HiveServer2: Async execution pool size $backgroundPoolSize") setSuperField(this, "operationManager", sparkSqlOperationManager) addService(sparkSqlOperationManager) initCompositeService(hiveConf) }
public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { initOperationLogCapture(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); } else { this.LOG.debug("Operation level logging is turned off"); } super.init(hiveConf); }
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map<String, String> confOverlay) throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) .executeStatement(statement, confOverlay); LOG.debug(sessionHandle + ": executeStatement()"); return opHandle; }
@Override public OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException { return executeStatementInternal(statement, confOverlay, false); } private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { acquire(true); OperationManager operationManager = getOperationManager(); ExecuteStatementOperation operation = operationManager .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync); OperationHandle opHandle = operation.getHandle(); try { // 調用operation的run函數 operation.run(); opHandleSet.add(opHandle); return opHandle; } catch (HiveSQLException e) { // Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn // background operation submits to thread pool successfully at the same time. So, Cleanup // opHandle directly when got HiveSQLException operationManager.closeOperation(opHandle); throw e; } finally { release(true); } // Operation 中run函數的實現 public void run() throws HiveSQLException { beforeRun(); try { runInternal(); } finally { afterRun(); } }
private def execute(): Unit = { statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => sessionToActivePool.put(parentSession.getSessionHandle, value) logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { resultList = None result.toLocalIterator.asScala } else { resultList = Some(result.collect()) resultList.get.iterator } } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) }
// 結果集DataFrame private var result: DataFrame = _ // 結果集列表 private var resultList: Option[Array[SparkRow]] = _ 結果集迭代器 private var iter: Iterator[SparkRow] = _ 數據類型 private var dataTypes: Array[DataType] = _ 語句ID,例如 「61146141-2a0a-41ec-bce4-dd691a0fa63c」 private var statementId: String = _ 結果集schema,在getResultSetMetadata接口調用時,使用 private lazy val resultSchema: TableSchema = { if (result == null || result.schema.isEmpty) { new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { logInfo(s"Result Schema: ${result.schema}") SparkExecuteStatementOperation.getTableSchema(result.schema) } }
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
public static TColumnValue toTColumnValue(Type type, Object value) { switch (type) { case BOOLEAN_TYPE: return booleanValue((Boolean)value); case TINYINT_TYPE: return byteValue((Byte)value); case SMALLINT_TYPE: return shortValue((Short)value); case INT_TYPE: return intValue((Integer)value); case BIGINT_TYPE: return longValue((Long)value); case FLOAT_TYPE: return floatValue((Float)value); case DOUBLE_TYPE: return doubleValue((Double)value); case STRING_TYPE: return stringValue((String)value); case CHAR_TYPE: return stringValue((HiveChar)value); case VARCHAR_TYPE: return stringValue((HiveVarchar)value); case DATE_TYPE: return dateValue((Date)value); case TIMESTAMP_TYPE: return timestampValue((Timestamp)value); case INTERVAL_YEAR_MONTH_TYPE: return stringValue((HiveIntervalYearMonth) value); case INTERVAL_DAY_TIME_TYPE: return stringValue((HiveIntervalDayTime) value); case DECIMAL_TYPE: return stringValue(((HiveDecimal)value)); case BINARY_TYPE: return stringValue((String)value); case ARRAY_TYPE: case MAP_TYPE: case STRUCT_TYPE: case UNION_TYPE: case USER_DEFINED_TYPE: return stringValue((String)value); default: return null; } }
//位置:hive/hive-1.1.0-cdh5.7.0/service/src/java/org/apache/hive/service/cli/CLIService.java @Override public synchronized void start() { super.start(); }
//位置:hive/hive-1.1.0-cdh5.7.0/service/src/java/org/apache/hive/service/CompositeService.java @Override public synchronized void start() { int i = 0; try { for (int n = serviceList.size(); i < n; i++) { Service service = serviceList.get(i); service.start(); } super.start(); } catch (Throwable e) { LOG.error("Error starting services " + getName(), e); // Note that the state of the failed service is still INITED and not // STARTED. Even though the last service is not started completely, still // call stop() on all services including failed service to make sure cleanup // happens. stop(i); throw new ServiceException("Failed to Start " + getName(), e); } }