spark 2.1.1html
啓動spark thrift命令sql
$SPARK_HOME/sbin/start-thriftserver.shapache
而後會執行session
org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2app
hive thrift代碼詳見:http://www.javashuo.com/article/p-opeaomay-eo.htmlide
HiveThriftServer2是spark thrift核心類,繼承自Hive的HiveServer2oop
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 extends org.apache.hive.service.server.HiveServer2this
啓動過程:編碼
HiveThriftServer2.mainurl
SparkSQLEnv.init (sparkConf sparkSession sparkContext sqlContext)
HiveThriftServer2.init
addService(ThriftBinaryCLIService)
HiveThriftServer2.start
ThriftBinaryCLIService.run
TServer.serve
類結構:【接口或父類->子類】
TServer->TThreadPoolServer
TProcessorFactory->SQLPlainProcessorFactory
TProcessor->TSetIpAddressProcessor
ThriftCLIService->ThriftBinaryCLIService
CLIService->SparkSQLCLIService (核心子類)
服務初始化過程:
CLIService.init
SparkSQLCLIService.init
addService(SparkSQLSessionManager)
initCompositeService
SparkSQLSessionManager.init
addService(SparkSQLOperationManager)
initCompositeService
SparkSQLOperationManager.init
ddl執行過程須要和hive metastore交互
從執行計劃開始:
spark-sql> explain create table test_table(id string);
== Physical Plan ==
ExecutedCommand
+- CreateTableCommand CatalogTable(
Table: `test_table`
Created: Wed Dec 19 18:04:15 CST 2018
Last Access: Thu Jan 01 07:59:59 CST 1970
Type: MANAGED
Schema: [StructField(id,StringType,true)]
Provider: hive
Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false
Time taken: 0.28 seconds, Fetched 1 row(s)
從執行計劃裏能夠找到具體的Command,這裏是CreateTableCommand
org.apache.spark.sql.execution.command.tables
case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { sparkSession.sessionState.catalog.createTable(table, ifNotExists) Seq.empty[Row] } }
這裏能夠看到是直接將請求分發給sparkSession.sessionState.catalog
org.apache.spark.sql.internal.SessionState
/** * Internal catalog for managing table and database states. */ lazy val catalog = new SessionCatalog( sparkSession.sharedState.externalCatalog, sparkSession.sharedState.globalTempViewManager, functionResourceLoader, functionRegistry, conf, newHadoopConf())
取的是sparkSession.sharedState.externalCatalog
org.apache.spark.sql.internal.SharedState
/** * A catalog that interacts with external systems. */ val externalCatalog: ExternalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( SharedState.externalCatalogClassName(sparkContext.conf), sparkContext.conf, sparkContext.hadoopConfiguration) ... private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" private def externalCatalogClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME case "in-memory" => classOf[InMemoryCatalog].getCanonicalName } }
這裏能夠看到是經過externalCatalogClassName反射實例化的,代碼裏硬編碼使用的是org.apache.spark.sql.hive.HiveExternalCatalog
org.apache.spark.sql.hive.HiveExternalCatalog
/** * A Hive client used to interact with the metastore. */ val client: HiveClient = { HiveUtils.newClientForMetadata(conf, hadoopConf) } private def withClient[T](body: => T): T = synchronized { try { body } catch { case NonFatal(exception) if isClientException(exception) => val e = exception match { // Since we are using shim, the exceptions thrown by the underlying method of // Method.invoke() are wrapped by InvocationTargetException case i: InvocationTargetException => i.getCause case o => o } throw new AnalysisException( e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) } } override def createDatabase( dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = withClient { client.createDatabase(dbDefinition, ignoreIfExists) }
這個類裏執行任何ddl方法都會執行withClient,而withClient有synchronized,執行過程是直接把請求分發給client,下面看client是什麼
org.apache.spark.sql.hive.client.IsolatedClientLoader
/** The isolated client interface to Hive. */ private[hive] def createClient(): HiveClient = { if (!isolationOn) { return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") val origLoader = Thread.currentThread().getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) try { classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head .newInstance(version, sparkConf, hadoopConf, config, classLoader, this) .asInstanceOf[HiveClient] } catch {
可見client直接用的是org.apache.spark.sql.hive.client.HiveClientImpl
org.apache.spark.sql.hive.client.HiveClientImpl
def withHiveState[A](f: => A): A = retryLocked { val original = Thread.currentThread().getContextClassLoader // Set the thread local metastore client to the client associated with this HiveClientImpl. Hive.set(client) // The classloader in clientLoader could be changed after addJar, always use the latest // classloader state.getConf.setClassLoader(clientLoader.classLoader) // setCurrentSessionState will use the classLoader associated // with the HiveConf in `state` to override the context class loader of the current // thread. shim.setCurrentSessionState(state) val ret = try f finally { Thread.currentThread().setContextClassLoader(original) HiveCatalogMetrics.incrementHiveClientCalls(1) } ret } private def retryLocked[A](f: => A): A = clientLoader.synchronized { ... override def createDatabase( database: CatalogDatabase, ignoreIfExists: Boolean): Unit = withHiveState { client.createDatabase( new HiveDatabase( database.name, database.description, database.locationUri, Option(database.properties).map(_.asJava).orNull), ignoreIfExists) }
這個類執行任何ddl方法都會執行withHiveState,withHiveState會執行retryLocked,retryLocked上有synchronized;並且這裏也是直接將請求分發給client,這裏的client是hive的類org.apache.hadoop.hive.ql.metadata.Hive
dml執行過程最後會執行到spark.sql
sql執行過程:
CLIService.executeStatement (返回OperationHandle)
SessionManager.getSession
SessionManager.openSession
SparkSQLSessionManager.openSession
SparkSQLOperationManager.sessionToContexts.set (openSession時:session和sqlContext創建映射)
HiveSession.executeStatement
HiveSessionImpl.executeStatementInternal
OperationManager.newExecuteStatementOperation
SparkSQLOperationManager.newExecuteStatementOperation
SparkSQLOperationManager.sessionToContexts.get (經過session取到sqlContext)
ExecuteStatementOperation.run
SparkExecuteStatementOperation.run
SparkExecuteStatementOperation.execute
SQLContext.sql (熟悉的spark sql)
可見從SparkSQLCLIService初始化開始,逐個將各個類的實現類改成spark的子類好比:
org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager extends org.apache.hive.service.cli.session.SessionManager
org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager extends org.apache.hive.service.cli.operation.OperationManager
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation extends org.apache.hive.service.cli.operation.ExecuteStatementOperation
從而實現底層實現的替換;
hive的HiveServer2爲何這麼容易的被擴展,詳見spark代碼的sql/hive-thriftserver,這裏應該是將hive1.2代碼作了不少修改,之後升級就不那麼容易了;至於spark爲何要花這麼大力氣擴展HiveServer2而不是從新實現,多是爲了保持接口一致,這樣有利於原來使用hive thrift的用戶平滑的遷移到spark thrift,由於惟一的改動就是切換url,實際上,相同sql下的spark thrift和hive thrift表現仍是有不少不一樣的。