SparkThriftServer 源碼分析

版本

spark 2.2.0java

起點

  • Spark thrift server複用了Hive Server2的源碼,插入了本身的覆蓋的方法。
  • 整個過程裏面須要穿插着Hive和Spark的源碼。
  • 整個流程是從Beeline開始的,Beeline屬因而Hive的源碼,下面開始進入流程:

客戶端——Beeline

  • jar包:hive-beeline-1.2.1.spark2.jar
  • SparkJDBC經過Beeline做爲客戶端,發送請求,與Spark服務進行交互
  • Beeline的入口:
//位置:src\java\org\apache\hive\beeline\BeeLine.java
  public static void main(String[] args) throws IOException {
    mainWithInputRedirection(args, null);
}
  public static void mainWithInputRedirection(String[] args, InputStream inputStream)
    throws IOException
  {
    BeeLine beeLine = new BeeLine();
    int status = beeLine.begin(args, inputStream);
    if (!Boolean.getBoolean("beeline.system.exit")) {
      System.exit(status);
    }
  }
  • 調用beeLine.begin:解析傳入的參數,調用execute方法
public int begin(String[] args, InputStream inputStream)
    throws IOException
  {
    try
    {
      getOpts().load();
    }
    catch (Exception e) {}
    try
    {
      int code = initArgs(args);
      int i;
      if (code != 0) {
        return code;
      }
      if (getOpts().getScriptFile() != null) {
        return executeFile(getOpts().getScriptFile());
      }
      try
      {
        info(getApplicationTitle());
      }
      catch (Exception e) {}
      ConsoleReader reader = getConsoleReader(inputStream);
      return execute(reader, false);
    }
    finally
    {
      close();
    }
  }
  

String line = getOpts().isSilent() ? reader.readLine(null, Character.valueOf('\000')) : reader.readLine(getPrompt());
if ((!dispatch(line)) && (exitOnError)) {
  return 2;
}
  • execute讀取輸入流,調用dispath方法
  • dispatch:處理無效字符及help請求,
    • 若以!開始,則建立CommandHandler處理;
    • 不然調用Commands的sql函數處理sql命令:org.apache.hive.beeline.DatabaseConnection的getConnection建立鏈接
      this.commands.sql(line, getOpts().getEntireLineAsCommand());
  • 執行SQL: Commands.execute中調用JDBC標準接口,stmnt.execute(sql);具體流程參考下節jdbc中HiveStatement的execute執行
  • 處理結果集:Commands.execute中處理結果集,代碼以下:
if (hasResults)
              {
                do
                {
                  ResultSet rs = stmnt.getResultSet();
                  try
                  {
                    int count = this.beeLine.print(rs);
                    long end = System.currentTimeMillis();
                    
                    this.beeLine.info(this.beeLine.loc("rows-selected", count) + " " + this.beeLine.locElapsedTime(end - start));
                  }
                  finally
                  {
                    if (logThread != null)
                    {
                      logThread.join(10000L);
                      showRemainingLogsIfAny(stmnt);
                      logThread = null;
                    }
                    rs.close();
                  }
                } while (BeeLine.getMoreResults(stmnt));
              }
              else
              {
                int count = stmnt.getUpdateCount();
                long end = System.currentTimeMillis();
                this.beeLine.info(this.beeLine.loc("rows-affected", count) + " " + this.beeLine.locElapsedTime(end - start));
              }

服務端

  • Spark JDBC基於thrift框架,實現RPC服務,並複用了hiveServer中大量的代碼
  • hive-jdbc經過封裝rpc client請求,結果集處理,實現了JDBC標準接口
  • SparkThrift中實現了實際計算的流程

Hive-jdbc

TCLIService.Iface客戶端請求

  • 取消請求
TCancelOperationResp cancelResp = this.client.CancelOperation(cancelReq);
  • 關閉請求
TCloseOperationReq closeReq = new TCloseOperationReq(this.stmtHandle);
    TCloseOperationResp closeResp = this.client.CloseOperation(closeReq);
  • 執行查詢
TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq);
  • 查看執行狀態
statusResp = this.client.GetOperationStatus(statusReq);

流程

  • jar包:hive-jdbc-1.2.1.spark2.jar
  • hive jdbc基於thrift框架(Facebook開源的RPC框架)實現,client(TCLIService.Iface client)RPC調用的客戶端,遠程調用HiveServer裏面的TCLIService服務
  • 分析HiveStatement的execute方法
    • 執行查詢:TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq);
    • 檢查operationComplete,在操做未結束時,持續調用client.GetOperationStatus,獲取服務端執行狀態
    • 解析狀態碼,若爲2,則執行結束。接下來判斷是否存在結果集,若存在則解析結果集
    • 解析結果集
    this.resultSet = new HiveQueryResultSet
        .Builder(this)
        .setClient(this.client)
        .setSessionHandle(this.sessHandle)
        .setStmtHandle(this.stmtHandle)
        .setMaxRows(this.maxRows)
        .setFetchSize(this.fetchSize)
        .setScrollable(this.isScrollableResultset)
        .setTransportLock(this.transportLock)
        .build();
  • execute代碼
public Boolean execute(String sql)
    throws SQLException
  {
    checkConnection("execute");
    closeClientOperation();
    initFlags();
    TExecuteStatementReq execReq = new TExecuteStatementReq(this.sessHandle, sql);
    execReq.setRunAsync(true);
    execReq.setConfOverlay(this.sessConf);
    this.transportLock.lock();
    try
        {
        TExecuteStatementResp execResp = this.client.ExecuteStatement(execReq);
        Utils.verifySuccessWithInfo(execResp.getStatus());
        this.stmtHandle = execResp.getOperationHandle();
        this.isExecuteStatementFailed = false;
    }
    catch (SQLException eS)
        {
        this.isExecuteStatementFailed = true;
        throw eS;
    }
    catch (Exception ex)
        {
        this.isExecuteStatementFailed = true;
        throw new SQLException(ex.toString(), "08S01", ex);
    }
    finally
        {
        this.transportLock.unlock();
    }
    TGetOperationStatusReq statusReq = new TGetOperationStatusReq(this.stmtHandle);
    Boolean operationComplete = false;
    while (!operationComplete) {
        try
              {
            this.transportLock.lock();
            TGetOperationStatusResp statusResp;
            try
                    {
                statusResp = this.client.GetOperationStatus(statusReq);
            }
            finally
                    {
                this.transportLock.unlock();
            }
            Utils.verifySuccessWithInfo(statusResp.getStatus());
            if (statusResp.isSetOperationState()) {
                switch (1.$SwitchMap$org$apache$hive$service$cli$thrift$TOperationState[statusResp.getOperationState().ordinal()])
                          {
                    case 1: 
                              case 2: 
                                operationComplete = true;
                    break;
                    case 3: 
                                throw new SQLException("Query was cancelled", "01000");
                    case 4: 
                                throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), statusResp.getErrorCode());
                    case 5: 
                                throw new SQLException("Unknown query", "HY000");
                }
            }
        }
        catch (SQLException e)
              {
            this.isLogBeingGenerated = false;
            throw e;
        }
        catch (Exception e)
              {
            this.isLogBeingGenerated = false;
            throw new SQLException(e.toString(), "08S01", e);
        }
    }
    this.isLogBeingGenerated = false;
    if (!this.stmtHandle.isHasResultSet()) {
        return false;
    }
    this.resultSet = new HiveQueryResultSet.Builder(this).setClient(this.client).setSessionHandle(this.sessHandle).setStmtHandle(this.stmtHandle).setMaxRows(this.maxRows).setFetchSize(this.fetchSize).setScrollable(this.isScrollableResultset).setTransportLock(this.transportLock).build();
    return true;
}

SparkThrift

  • SparkThrift服務中,啓動了兩個service:SparkSQLCLIService和ThriftHttpCLIService(ThriftBinaryCLIService)
  • ThriftHttpCLIService:是RPC調用的通道
  • SparkSQLCLIService:是用來對客戶提出的請求進行服務的服務

主函數HiveThriftServer2

  • 調用 SparkSQLEnv.init(),建立Sparksession、sparkConf等
  • init
    • 建立SparkSQLCLIService,並經過反射,設置cliService,並經過addService加入到父類的serviceList中,而後調用initCompositeService
    public void init(HiveConf hiveConf)
      {
        SparkSQLCLIService sparkSqlCliService = new SparkSQLCLIService(this, this.sqlContext);
        ReflectionUtils..MODULE$.setSuperField(this, "cliService", sparkSqlCliService);
        addService(sparkSqlCliService);
    
        ThriftCLIService thriftCliService = isHTTPTransportMode(hiveConf) ? 
          new ThriftHttpCLIService(sparkSqlCliService) : 
    
          new ThriftBinaryCLIService(sparkSqlCliService);
    
    
        ReflectionUtils..MODULE$.setSuperField(this, "thriftCLIService", thriftCliService);
        addService(thriftCliService);
        initCompositeService(hiveConf);
      }
    • initCompositeService,該函數封裝了ReflectedCompositeService的initCompositeService
    private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
      def initCompositeService(hiveConf: HiveConf) {
        // Emulating `CompositeService.init(hiveConf)`
        val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
        serviceList.asScala.foreach(_.init(hiveConf))
    
        // Emulating `AbstractService.init(hiveConf)`
        invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
        setAncestorField(this, 3, "hiveConf", hiveConf)
        invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
        getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
      }
    }
    • 經過反射,拿到祖先類的serviceList成員變量,對這個List裏面的每一個成員調用了一次init方法
    • 因爲已經把ThriftHttpCLIService和SparkSQLCLIService放入到這個List裏面了,所以這裏會調用到它們的init方法。
  • 給sc加了個HiveThriftServer2Listener,它也是繼承自SparkListener,它會記錄每一個sql statement的時間、狀態等信息。
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
    SparkSQLEnv.sparkContext.addSparkListener(listener)
    
    HiveThriftServer2Listener實現的接口包括:
    onJobStart
    onSessionCreated
    onSessionClosed
    onStatementStart
    onStatementParsed
    onStatementError
    onStatementFinish
    將這些信息主要記錄在了sessionList和executionList中
  • thrift server啓動後會向spark ui註冊一個TAB:「JDBC/ODBC Server」。
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
        Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
      } else {
        None
      }

ThriftHttpCLIService/ThriftBinaryCLIService

  • 封裝SparkSQLCLIService
  • 對外提供http或者TCP服務

ThriftHttpCLIService

  • 因爲Spark裏面沒有實現這個類,而是徹底複用的Hive的源碼,這裏直接看一下Hive中的ThriftHttpCLIService的start方法,因爲ThriftHttpCLIService沒有實現start方法,繼續跟進到它的父類裏面:
//位置:hive/hive-1.1.0-cdh5.7.0/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
  @Override
  public synchronized void start() {
    super.start();
    if (!isStarted && !isEmbedded) {
      new Thread(this).start();
      isStarted = true;
    }
  }
  • 這個方法很簡單,首先是調用父類的start方法,這裏的父類也是AbstractService,所以,也是把服務的狀態從INITED重置爲STARTED。而後,啓動了包裹本身的一個線程,這個線程會調用ThriftHttpCLIService類裏面的run方法
@Override
  public void run() {
    try {
      ...
      // HTTP Server
      httpServer = new org.eclipse.jetty.server.Server(threadPool);

      TProcessor processor = new TCLIService.Processor<Iface>(this);
      TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
      
      // 配置servlet,主要代碼邏輯在processor
      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
          serviceUGI, httpUGI);
      context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);

      httpServer.join();
    } catch (Throwable t) {
      LOG.fatal(
          "Error starting HiveServer2: could not start "
              + ThriftHttpCLIService.class.getSimpleName(), t);
      System.exit(-1);
    }
  }
  • 這個方法是經過jetty啓動了一個http服務,而後配置ThriftHttpServlet來處理用戶的請求。
  • 注意這段代碼中的processor對象,它是這個jetty服務的最主要的處理邏輯,下面跟進一下:
protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }
  • 這裏的getProcessMap是用來處理各類請求的函數:
private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("OpenSession", new OpenSession());
      processMap.put("CloseSession", new CloseSession());
      processMap.put("GetInfo", new GetInfo());
      processMap.put("ExecuteStatement", new ExecuteStatement());
      processMap.put("GetTypeInfo", new GetTypeInfo());
      processMap.put("GetCatalogs", new GetCatalogs());
      processMap.put("GetSchemas", new GetSchemas());
      processMap.put("GetTables", new GetTables());
      processMap.put("GetTableTypes", new GetTableTypes());
      processMap.put("GetColumns", new GetColumns());
      processMap.put("GetFunctions", new GetFunctions());
      processMap.put("GetOperationStatus", new GetOperationStatus());
      processMap.put("CancelOperation", new CancelOperation());
      processMap.put("CloseOperation", new CloseOperation());
      processMap.put("GetResultSetMetadata", new GetResultSetMetadata());
      processMap.put("FetchResults", new FetchResults());
      processMap.put("GetDelegationToken", new GetDelegationToken());
      processMap.put("CancelDelegationToken", new CancelDelegationToken());
      processMap.put("RenewDelegationToken", new RenewDelegationToken());
      return processMap;
    }
  • 查看ExecuteStatement()接口,該接口的實如今CLIService中
@Override
  public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
      Map<String, String> confOverlay)
          throws HiveSQLException {
    OperationHandle opHandle = sessionManager.getSession(sessionHandle)
        .executeStatement(statement, confOverlay);
    LOG.debug(sessionHandle + ": executeStatement()");
    return opHandle;
  }

  @Override
  public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement,
      Map<String, String> confOverlay) throws HiveSQLException {
    OperationHandle opHandle = sessionManager.getSession(sessionHandle)
        .executeStatementAsync(statement, confOverlay);
    LOG.debug(sessionHandle + ": executeStatementAsync()");
    return opHandle;
  }
  • 能夠看出,要想改變執行層,須要修改sessionManager、OperationHandle

小結

spark想借用hivejdbc服務,須要作如下幾件事:sql

  • 重寫OperationHandle,將執行操做交給spark sql來作
  • 重寫sessionManager,保證獲取的OperationHandle是spark OperationHandle
  • 重寫SparkSQLCLIService,保證spark相關配置能傳入執行類;保證重寫的sparkSqlSessionManager加入到serviceList中

SparkSQLCLIService

  • 繼承CLIService

SparkSQLCLIService

  • init代碼
override def init(hiveConf: HiveConf) {
    setSuperField(this, "hiveConf", hiveConf)

    val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext)
    setSuperField(this, "sessionManager", sparkSqlSessionManager)
    addService(sparkSqlSessionManager)
    var sparkServiceUGI: UserGroupInformation = null

    if (UserGroupInformation.isSecurityEnabled) {
      try {
        HiveAuthFactory.loginFromKeytab(hiveConf)
        sparkServiceUGI = Utils.getUGI()
        setSuperField(this, "serviceUGI", sparkServiceUGI)
      } catch {
        case e @ (_: IOException | _: LoginException) =>
          throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
      }
    }

    initCompositeService(hiveConf)
  }
  • 這裏會建立一個SparkSQLSessionManager的實例,而後把這個實例放入到父類中,添加上這個服務。而後再經過initCompositeService方法,來調用到SparkSQLSessionManager實例的init方法

SparkSQLSessionManager

  • SessionManager初始化時,會註冊SparkSQLOperationManager,它用來:
    • 管理會話和hiveContext的關係,根據會話能夠找到其hc;
    • 代替hive的OperationManager管理句柄和operation的關係。
  • thriftserver能夠配置爲單會話,即全部beeline共享一個hiveContext,也能夠配置爲新起一個會話,每一個會話獨享HiveContext,這樣能夠得到獨立的UDF/UDAF,臨時表,會話狀態等。默認是新起會話。
  • 新建會話的時候,會將會話和hiveContext的對應關係添加到OperationManager的sessionToContexts這個Map
    sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
  • init
    • 建立一個SparkSQLOperationManager對象,而後經過initCompositeService來調用SparkSQLOperationManager對象的init方法,因爲這個對象並無重寫這個方法,所以須要追到它的父類OperationManager:
    private lazy val sparkSqlOperationManager = new SparkSQLOperationManager()
    
      override def init(hiveConf: HiveConf) {
        setSuperField(this, "hiveConf", hiveConf)
    
        // Create operation log root directory, if operation logging is enabled
        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
          invoke(classOf[SessionManager], this, "initOperationLogRootDir")
        }
    
        val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
        setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
        getAncestorField[Log](this, 3, "LOG").info(
          s"HiveServer2: Async execution pool size $backgroundPoolSize")
    
        setSuperField(this, "operationManager", sparkSqlOperationManager)
        addService(sparkSqlOperationManager)
    
        initCompositeService(hiveConf)
      }
    • OperationManager.init
    public synchronized void init(HiveConf hiveConf)
      {
        if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
          initOperationLogCapture(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL));
        } else {
          this.LOG.debug("Operation level logging is turned off");
        }
        super.init(hiveConf);
      }
  • execute
    • CLIService根據句柄找到Session並執行statement
    public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
      Map<String, String> confOverlay)
          throws HiveSQLException {
        OperationHandle opHandle = sessionManager.getSession(sessionHandle)
            .executeStatement(statement, confOverlay);
        LOG.debug(sessionHandle + ": executeStatement()");
        return opHandle;
      }
    • 每次statement執行,都是新申請的operation,都會加到OperationManager去管理。newExecuteStatementOperation被SparkSQLOperationManager.newExecuteStatementOperation覆蓋了,建立的operation實際是SparkExecuteStatementOperation。
    @Override
    public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
      throws HiveSQLException {
    return executeStatementInternal(statement, confOverlay, false);
    }
    
    private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
      boolean runAsync)
          throws HiveSQLException {
    acquire(true);
    
    OperationManager operationManager = getOperationManager();
    ExecuteStatementOperation operation = operationManager
        .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
    OperationHandle opHandle = operation.getHandle();
    try {
    // 調用operation的run函數
      operation.run();
      opHandleSet.add(opHandle);
      return opHandle;
    } catch (HiveSQLException e) {
      // Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn
      // background operation submits to thread pool successfully at the same time. So, Cleanup
      // opHandle directly when got HiveSQLException
      operationManager.closeOperation(opHandle);
      throw e;
    } finally {
      release(true);
    }
    
    
    // Operation 中run函數的實現
      public void run() throws HiveSQLException {
    beforeRun();
    try {
      runInternal();
    } finally {
      afterRun();
    }
    }
  • 整個的過程與Hive原生流程基本是一致的。在Hive的原生流程中,會在HiveServer2裏面建立一個CLIService服務,這個跟Spark中的SparkSQLCLIService對應;而後在CLIService服務裏面會建立一個SessionManager服務,這個跟Spark中的SparkSQLSessionManager對應;再以後,在SessionManager裏面會建立一個OperationManager服務,這個跟Spark中的SparkSQLOperationManager對應。

SparkExecuteStatementOperation

  • 如前所述,一個Operation分三步:beforeRun、runInternal、afterRun,beforeRun和afterRun用來記錄日誌,直接看runInternal。
  • runInternal默認爲異步,即後臺執行,SparkExecuteStatementOperation建立一個Runable對象,並將其提交到裏面backgroundOperationPool,新起一個線程來作excute。
  • excute中最主要的是sqlContext.sql(statement)
private def execute(): Unit = {
    statementId = UUID.randomUUID().toString
    logInfo(s"Running query '$statement' with $statementId")
    setState(OperationState.RUNNING)

  result = sqlContext.sql(statement)
  logDebug(result.queryExecution.toString())
  result.queryExecution.logical match {
    case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
      sessionToActivePool.put(parentSession.getSessionHandle, value)
      logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
    case _ =>
  }
  HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
  iter = {
    if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
      resultList = None
      result.toLocalIterator.asScala
    } else {
      resultList = Some(result.collect())
      resultList.get.iterator
    }
  }
  dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray

    setState(OperationState.FINISHED)
    HiveThriftServer2.listener.onStatementFinish(statementId)
  }

成員變量

// 結果集DataFrame
private var result: DataFrame = _
// 結果集列表
private var resultList: Option[Array[SparkRow]] = _
結果集迭代器
private var iter: Iterator[SparkRow] = _
數據類型
private var dataTypes: Array[DataType] = _
語句ID,例如 「61146141-2a0a-41ec-bce4-dd691a0fa63c」
private var statementId: String = _


結果集schema,在getResultSetMetadata接口調用時,使用
  private lazy val resultSchema: TableSchema = {
    if (result == null || result.schema.isEmpty) {
      new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
    } else {
      logInfo(s"Result Schema: ${result.schema}")
      SparkExecuteStatementOperation.getTableSchema(result.schema)
    }
  }

execute

  • 執行查詢: sqlContext.sql(statement)
  • 保存結果集private var result: DataFrame
  • 保存結果集迭代器: private var iter: Iterator[SparkRow] = _
  • 結果集schema: dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray

getNextRowSet

  • def getNextRowSet(order: FetchOrientation, maxRowsL: Long)
  • order:是否從開始取;maxRowsL:最多取多少行;
  • 根據數據結構,構建rowset:
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
  • 解析結果集
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
  • addRow調用RowBasedSet的addRow
  • 調用ColumnValue.toTColumnValue,將相應object進行數據類型轉換
public static TColumnValue toTColumnValue(Type type, Object value) {
    switch (type) {
    case BOOLEAN_TYPE:
      return booleanValue((Boolean)value);
    case TINYINT_TYPE:
      return byteValue((Byte)value);
    case SMALLINT_TYPE:
      return shortValue((Short)value);
    case INT_TYPE:
      return intValue((Integer)value);
    case BIGINT_TYPE:
      return longValue((Long)value);
    case FLOAT_TYPE:
      return floatValue((Float)value);
    case DOUBLE_TYPE:
      return doubleValue((Double)value);
    case STRING_TYPE:
      return stringValue((String)value);
    case CHAR_TYPE:
      return stringValue((HiveChar)value);
    case VARCHAR_TYPE:
      return stringValue((HiveVarchar)value);
    case DATE_TYPE:
      return dateValue((Date)value);
    case TIMESTAMP_TYPE:
      return timestampValue((Timestamp)value);
    case INTERVAL_YEAR_MONTH_TYPE:
      return stringValue((HiveIntervalYearMonth) value);
    case INTERVAL_DAY_TIME_TYPE:
      return stringValue((HiveIntervalDayTime) value);
    case DECIMAL_TYPE:
      return stringValue(((HiveDecimal)value));
    case BINARY_TYPE:
      return stringValue((String)value);
    case ARRAY_TYPE:
    case MAP_TYPE:
    case STRUCT_TYPE:
    case UNION_TYPE:
    case USER_DEFINED_TYPE:
      return stringValue((String)value);
    default:
      return null;
    }
  }

整體啓動調用邏輯

  • CLIService 父類:CompositeService
//位置:hive/hive-1.1.0-cdh5.7.0/service/src/java/org/apache/hive/service/cli/CLIService.java
  @Override
  public synchronized void start() {
    super.start();
  }
  • CompositeService,調用全部serviceList中的服務start方法,而後調用父類start
//位置:hive/hive-1.1.0-cdh5.7.0/service/src/java/org/apache/hive/service/CompositeService.java
  @Override
  public synchronized void start() {
    int i = 0;
    try {
      for (int n = serviceList.size(); i < n; i++) {
        Service service = serviceList.get(i);
        service.start();
      }
      super.start();
    } catch (Throwable e) {
      LOG.error("Error starting services " + getName(), e);
      // Note that the state of the failed service is still INITED and not
      // STARTED. Even though the last service is not started completely, still
      // call stop() on all services including failed service to make sure cleanup
      // happens.
      stop(i);
      throw new ServiceException("Failed to Start " + getName(), e);
    }
  }
  • SparkSQLCLIService的父類是CLIService,serviceList中包含SparkSQLSessionManager
  • SparkSQLSessionManager的父類是SessionManager,其父類是CompositeService,其serviceList包含SparkSQLOperationManager
  • 流程圖

參考

  • http://yoelee.com/2017/09/11/SparkThriftServer%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/
  • https://ieevee.com/tech/2016/06/01/spark-sql-1.html
相關文章
相關標籤/搜索