【原創】大數據基礎之Spark(3)Spark Thrift實現原理及代碼實現

spark 2.1.1html

一 啓動命令

啓動spark thrift命令sql

$SPARK_HOME/sbin/start-thriftserver.shapache

而後會執行session

org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2app

二 啓動過程及代碼分析

hive thrift代碼詳見:http://www.javashuo.com/article/p-opeaomay-eo.htmlide

HiveThriftServer2是spark thrift核心類,繼承自Hive的HiveServer2oop

org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 extends org.apache.hive.service.server.HiveServer2this

 

啓動過程:編碼

HiveThriftServer2.mainurl

         SparkSQLEnv.init (sparkConf sparkSession sparkContext sqlContext)

         HiveThriftServer2.init

                  addService(ThriftBinaryCLIService)

         HiveThriftServer2.start

                  ThriftBinaryCLIService.run

                          TServer.serve

 

類結構:【接口或父類->子類】

TServer->TThreadPoolServer

         TProcessorFactory->SQLPlainProcessorFactory

                  TProcessor->TSetIpAddressProcessor

                          ThriftCLIService->ThriftBinaryCLIService

                                   CLIService->SparkSQLCLIService (核心子類)

 

服務初始化過程:

CLIService.init

         SparkSQLCLIService.init

                  addService(SparkSQLSessionManager)

                  initCompositeService

                          SparkSQLSessionManager.init

                                   addService(SparkSQLOperationManager)

                                   initCompositeService

                                            SparkSQLOperationManager.init

三 DDL執行過程

ddl執行過程須要和hive metastore交互

從執行計劃開始:

spark-sql> explain create table test_table(id string);
== Physical Plan ==
ExecutedCommand
+- CreateTableCommand CatalogTable(
Table: `test_table`
Created: Wed Dec 19 18:04:15 CST 2018
Last Access: Thu Jan 01 07:59:59 CST 1970
Type: MANAGED
Schema: [StructField(id,StringType,true)]
Provider: hive
Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false
Time taken: 0.28 seconds, Fetched 1 row(s)

從執行計劃裏能夠找到具體的Command,這裏是CreateTableCommand 

 

org.apache.spark.sql.execution.command.tables

case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {

  override def run(sparkSession: SparkSession): Seq[Row] = {
    sparkSession.sessionState.catalog.createTable(table, ifNotExists)
    Seq.empty[Row]
  }
}

這裏能夠看到是直接將請求分發給sparkSession.sessionState.catalog

 

org.apache.spark.sql.internal.SessionState

  /**
   * Internal catalog for managing table and database states.
   */
  lazy val catalog = new SessionCatalog(
    sparkSession.sharedState.externalCatalog,
    sparkSession.sharedState.globalTempViewManager,
    functionResourceLoader,
    functionRegistry,
    conf,
    newHadoopConf())

取的是sparkSession.sharedState.externalCatalog

 

org.apache.spark.sql.internal.SharedState

  /**
   * A catalog that interacts with external systems.
   */
  val externalCatalog: ExternalCatalog =
    SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
      SharedState.externalCatalogClassName(sparkContext.conf),
      sparkContext.conf,
      sparkContext.hadoopConfiguration)
...
  private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog"

  private def externalCatalogClassName(conf: SparkConf): String = {
    conf.get(CATALOG_IMPLEMENTATION) match {
      case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
      case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
    }
  }

這裏能夠看到是經過externalCatalogClassName反射實例化的,代碼裏硬編碼使用的是org.apache.spark.sql.hive.HiveExternalCatalog

 

org.apache.spark.sql.hive.HiveExternalCatalog

  /**
   * A Hive client used to interact with the metastore.
   */
  val client: HiveClient = {
    HiveUtils.newClientForMetadata(conf, hadoopConf)
  }

  private def withClient[T](body: => T): T = synchronized {
    try {
      body
    } catch {
      case NonFatal(exception) if isClientException(exception) =>
        val e = exception match {
          // Since we are using shim, the exceptions thrown by the underlying method of
          // Method.invoke() are wrapped by InvocationTargetException
          case i: InvocationTargetException => i.getCause
          case o => o
        }
        throw new AnalysisException(
          e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
    }
  }

  override def createDatabase(
      dbDefinition: CatalogDatabase,
      ignoreIfExists: Boolean): Unit = withClient {
    client.createDatabase(dbDefinition, ignoreIfExists)
  }

這個類裏執行任何ddl方法都會執行withClient,而withClient有synchronized,執行過程是直接把請求分發給client,下面看client是什麼

 

org.apache.spark.sql.hive.client.IsolatedClientLoader

  /** The isolated client interface to Hive. */
  private[hive] def createClient(): HiveClient = {
    if (!isolationOn) {
      return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
    }
    // Pre-reflective instantiation setup.
    logDebug("Initializing the logger to avoid disaster...")
    val origLoader = Thread.currentThread().getContextClassLoader
    Thread.currentThread.setContextClassLoader(classLoader)

    try {
      classLoader
        .loadClass(classOf[HiveClientImpl].getName)
        .getConstructors.head
        .newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
        .asInstanceOf[HiveClient]
    } catch {

可見client直接用的是org.apache.spark.sql.hive.client.HiveClientImpl

 

org.apache.spark.sql.hive.client.HiveClientImpl

  def withHiveState[A](f: => A): A = retryLocked {
    val original = Thread.currentThread().getContextClassLoader
    // Set the thread local metastore client to the client associated with this HiveClientImpl.
    Hive.set(client)
    // The classloader in clientLoader could be changed after addJar, always use the latest
    // classloader
    state.getConf.setClassLoader(clientLoader.classLoader)
    // setCurrentSessionState will use the classLoader associated
    // with the HiveConf in `state` to override the context class loader of the current
    // thread.
    shim.setCurrentSessionState(state)
    val ret = try f finally {
      Thread.currentThread().setContextClassLoader(original)
      HiveCatalogMetrics.incrementHiveClientCalls(1)
    }
    ret
  }
  private def retryLocked[A](f: => A): A = clientLoader.synchronized {
...

  override def createDatabase(
      database: CatalogDatabase,
      ignoreIfExists: Boolean): Unit = withHiveState {
    client.createDatabase(
      new HiveDatabase(
        database.name,
        database.description,
        database.locationUri,
        Option(database.properties).map(_.asJava).orNull),
        ignoreIfExists)
  }

這個類執行任何ddl方法都會執行withHiveState,withHiveState會執行retryLocked,retryLocked上有synchronized;並且這裏也是直接將請求分發給client,這裏的client是hive的類org.apache.hadoop.hive.ql.metadata.Hive

 

四 DML執行過程

dml執行過程最後會執行到spark.sql

sql執行過程:

CLIService.executeStatement (返回OperationHandle)

         SessionManager.getSession

         SessionManager.openSession

                  SparkSQLSessionManager.openSession

                          SparkSQLOperationManager.sessionToContexts.set (openSession時:session和sqlContext創建映射)

         HiveSession.executeStatement

                  HiveSessionImpl.executeStatementInternal

                          OperationManager.newExecuteStatementOperation

                                   SparkSQLOperationManager.newExecuteStatementOperation

                                            SparkSQLOperationManager.sessionToContexts.get (經過session取到sqlContext)

                          ExecuteStatementOperation.run

                                   SparkExecuteStatementOperation.run

                                            SparkExecuteStatementOperation.execute

                                                     SQLContext.sql (熟悉的spark sql)

可見從SparkSQLCLIService初始化開始,逐個將各個類的實現類改成spark的子類好比:

org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager extends org.apache.hive.service.cli.session.SessionManager
org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager extends org.apache.hive.service.cli.operation.OperationManager
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation extends org.apache.hive.service.cli.operation.ExecuteStatementOperation

從而實現底層實現的替換;

 

hive的HiveServer2爲何這麼容易的被擴展,詳見spark代碼的sql/hive-thriftserver,這裏應該是將hive1.2代碼作了不少修改,之後升級就不那麼容易了;至於spark爲何要花這麼大力氣擴展HiveServer2而不是從新實現,多是爲了保持接口一致,這樣有利於原來使用hive thrift的用戶平滑的遷移到spark thrift,由於惟一的改動就是切換url,實際上,相同sql下的spark thrift和hive thrift表現仍是有不少不一樣的。

相關文章
相關標籤/搜索