版權聲明:本文爲xpleaf(香飄葉子)博主原創文章,遵循CC 4.0 BY-SA 版權協議,轉載請附上原文出處連接和本聲明。java
本文較爲系統、全面而且由淺入深地介紹了網易Spark Kyuubi出現的背景、核心架構設計與關鍵源碼實現,是學習、應用和對Kyuubi進行二次開發不可多得的技術乾貨,但因爲做者認知水平有限,文中不免會出現描述不許確的措辭,還請多多包容和指出。node
Kyuubi是網易數帆旗下易數大數據團隊開源的一個高性能的通用JDBC和SQL執行引擎,創建在Apache Spark之上,Kyuubi的出現,較好的彌補了Spark ThriftServer在多租戶、資源隔離和高可用等方面的不足,是一個真正能夠知足大多數生產環境場景的開源項目。git
經過分析Spark ThriftServer的設計與不足,本文會逐漸帶你深刻理解Kyuubi的核心設計與實現,同時會選取多個關鍵場景來剖析其源碼,經過本文的閱讀,但願能讓讀者對網易Kyuubi的總體架構設計有一個較爲清晰的理解,並可以用在本身的生產環境中解決更多實際應用問題。github
本文主要主要選取Kyuubi 1.1.0版原本對其設計與實現進行分析,後續的版本迭代社區加入了數據湖等概念和實現,本文不會對這方面的內容進行探討。sql
在最初使用Spark時,只有理解了Spark RDD模型和其提供的各類算子時,才能比較好地使用Spark進行數據處理和分析,顯然因爲向上層暴露了過多底層實現細節,Spark有必定的高使用門檻,在易用性上對許多初入門用戶來講並不太友好。apache
SparkSQL的出現則較好地解決了這一問題,經過使用SparkSQL提供的簡易API,用戶只須要有基本的編程基礎而且會使用SQL,就能夠藉助Spark強大的快速分佈式計算能力來處理和分析他們的大規模數據集。編程
而Spark ThriftServer的出現使Spark的易用性又向前邁進了一步,經過提供標準的JDBC接口和命令行終端的方式,平臺開發者能夠基於其提供的服務來快速構建它們的數據分析應用,普通用戶甚至不須要有編程基礎便可藉助其強大的能力來進行交互式數據分析。緩存
顧名思義,本質上,Spark ThriftServer是一個基於Apache Thrift框架構建而且封裝了SparkContext的RPC服務端,或者從Spark的層面來說,咱們也能夠說,Spark ThriftServer是一個提供了各類RPC服務的Spark Driver。但無論從哪一個角度去看Spark ThriftServer,有一點能夠確定的是,它是一個Server,是須要對外提供服務的,所以其是常駐的進程,並不會像通常咱們構建的Spark Application在完成數據處理的工做邏輯後就退出。其總體架構圖以下所示:安全
Apache Thrift是業界流行的RPC框架,經過其提供的接口描述語言(IDL),能夠快速構建用於數據通訊的而且語言無關的RPC客戶端和服務端,在帶來高性能的同時,大大下降了開發人員構建RPC服務的成本,所以在大數據生態其有較多的應用場景,好比咱們熟知的hiveserver2便是基於Apache Thrift來構建其RPC服務。網絡
當用戶經過JDBC或beeline方式執行一條SQL語句時,TThreadPoolServer
會接收到該SQL,經過一系列的Session和Operation的管理,最終會使用在啓動Spark ThriftServer時已經構建好的SparkContext來執行該SQL,並獲取最後的結果集。
從上面的基本分析中咱們能夠看到,在不考慮Spark ThrfitServer的底層RPC通訊框架和業務細節時,其總體實現思路是比較清晰和簡單的。
固然實際上要構建一個對外提供SQL能力的RPC服務時,是有許多細節須要考慮的,而且工做量也會很是巨大,Spark ThriftServer在實現時實際上也沒有本身重複造輪子,它複用了hiveserver2的許多組件和邏輯,並根據自身的業務需求來對其進行特定改造;一樣的,後面當咱們去看Kyuubi時,也會發現它複用了hiveserver2和Spark ThriftServer的一些組件和邏輯,並在此基礎上創新性地設計本身的一套架構。
這裏列舉的代碼是基於Spark 2.1的源碼,新版本在結構上可能有全部區別,但不影響咱們對其本質實現原理的理解。
前面提到的TThreadPoolServer
是Apache Thrift提供的用於構建RPC Server的一個工做線程池類,在Spark ThriftServer的Service體系結構中,ThriftBinaryService
正是使用TThreadPoolServer
來構建RPC服務端並對外提供一系列RPC服務接口:
Spark ThriftServer Service體系
ThriftBinaryService
基於TThreadPoolServer
構建RPC服務端
// org.apache.hive.service.cli.thrift.ThriftBinaryCLIService#run public class ThriftBinaryCLIService extends ThriftCLIService { @Override public void run() { // ...省略其它細節... // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(serverEventHandler); server.serve(); // ...省略其它細節... } }
ThriftBinaryService
提供的RPC服務接口
// org.apache.hive.service.cli.thrift.TCLIService.Iface TOpenSessionResp OpenSession(TOpenSessionReq req); TCloseSessionResp CloseSession(TCloseSessionReq req); TGetInfoResp GetInfo(TGetInfoReq req); TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req); TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req); TGetCatalogsResp GetCatalogs(TGetCatalogsReq req); TGetSchemasResp GetSchemas(TGetSchemasReq req); TGetTablesResp GetTables(TGetTablesReq req); TGetTableTypesResp GetTableTypes(TGetTableTypesReq req); TGetColumnsResp GetColumns(TGetColumnsReq req); TGetFunctionsResp GetFunctions(TGetFunctionsReq req); TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req); TCancelOperationResp CancelOperation(TCancelOperationReq req); TCloseOperationResp CloseOperation(TCloseOperationReq req); TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req); TFetchResultsResp FetchResults(TFetchResultsReq req); TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req); TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req); TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req);
能夠看到,其提供的至關一部分接口都是提供SQL服務時所必要的能力。
固然,無論是使用標準的JDBC接口仍是經過beeline的方式來訪問Spark ThriftServer,必然都是經過Spark基於Apache Thrift構建的RPC客戶端來訪問這些RPC服務接口的,所以咱們去看Spark ThriftServer提供的RPC客戶端,其提供的方法接口與RPC服務端提供的是對應的,能夠參考org.apache.hive.service.cli.thrift.TCLIService.Client。
若是比較難以理解,建議能夠先研究一下RPC框架的本質,而後再簡單使用一下Apache Thrift來構建RPC服務端和客戶端,這樣就會有一個比較清晰的理解,這裏不對其底層框架和原理作更多深刻的分析。我的以爲,要理解Spark ThriftServer,或是後面要介紹的Kyubbi,本質上是理解其通訊框架,也就是其是怎麼使用Apache Thrift來進行通訊的,由於其它的細節都是業務實現。
Spark ThriftServer在帶來各類便利性的同時,其不足也是顯而易見的。
首先,Spark ThriftServer難以知足生產環境下多租戶與資源隔離的場景需求。因爲一個Spark ThriftServer全局只有一個SparkContext,也即只有一個Spark Application,其在啓動時就肯定了全局惟一的用戶名,所以在Spark ThriftServer的維護人員看來,全部經過Spark ThriftServer下發的SQL都是來自同一用戶(也就是啓動時肯定的全局惟一的用戶名),儘管其背後其實是由使用Spark ThriftServer服務的不一樣用戶下發的,但全部背後的這些用戶都共享使用了Spark ThriftServer的資源、權限和數據,所以咱們難以單獨對某個用戶作資源和權限上的控制,操做審計和其它安全策略。
在Spark ThriftServer執行的一條SQL實際上會被轉換爲一個job執行,若是用戶A下發的SQL的job執行時間較長,必然也會阻塞後續用戶B下發的SQL的執行。
其次,單個Spark ThriftServer也容易帶來單點故障問題。從Spark ThriftServer接受的客戶端請求和其與Executor的通訊來考慮,Spark ThriftServer自己的可靠性也難以知足生產環境下的需求。
所以,在將Spark ThriftServer應用於生產環境當中,上面說起的問題和侷限性都會不可避免,那業界有沒有比較好的解決方案呢?網易開源的Spark Kyuubi就給出了比較好的答案。
Kyuubi的總體架構設計以下:
Kyuubi從總體上能夠分爲用戶層、服務發現層、Kyuubi Server層、Kyuubi Engine層,其總體概述以下:
用戶層
指經過不一樣方式使用Kyuubi的用戶,好比經過JDBC或beeline方式使用Kyuubi的用戶。
服務發現層
服務發現層依賴於Zookeeper實現,其又分爲Kyuubi Server層的服務發現和Kyuubi Engine層的服務發現。
Kyuubi Server層
由多個不一樣的KyuubiServer實例組成,每一個KyuubiServer實例本質上爲基於Apache Thrift實現的RPC服務端,其接收來自用戶的請求,但並不會真正執行該請求的相關SQL操做,只會做爲代理轉發該請求到Kyuubi Engine層用戶所屬的SparkSQLEngine實例上。
Kyuubi Engine層
由多個不一樣的SparkSQLEngine實例組成,每一個SparkSQLEngine實例本質上爲基於Apache Thrift實現的而且持有一個SparkSession實例的RPC服務端,其接收來自KyuubiServer實例的請求,並經過SparkSession實例來執行。在Kyuubi的USER共享層級上,每一個SparkSQLEngine實例都是用戶級別的,即不一樣的用戶其會持有不一樣的SparkSQLEngine實例,以實現用戶級別的資源隔離和控制。
下面將會對每一層以及它們的協做與交互展開較爲詳細的分析。
用戶層就是指實際須要使用Kyuubi服務的用戶,它們經過不過的用戶名進行標識,以JDBC或beeline方式進行鏈接。
好比咱們能夠在beeline中指定以不一樣用戶名進行登陸:
使用xpleaf用戶名進行登陸
./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf
使用yyh用戶名進行登陸
./beeline -u 'jdbc:hive2://10.2.10.1:10010' -n yyh
使用leaf用戶名進行登陸
./beeline -u 'jdbc:hive2://10.2.10.2:10009' -n leaf
固然,這裏的用戶名或登陸標識並非能夠隨意指定或使用的,它應該根據實際使用場景由運維繫統管理人員進行分配,而且其背後應當有一整套完整的認證、受權和審計機制,以確保總體系統的安全。
服務發現層主要是指Zookeepr服務以及Kyuubi Server層的KyuubiServer實例和Kyuubi Engine層的SparkSQLEngine在上面註冊的命名空間(即node節點),以提供負載均衡和高可用等特性,所以它分爲Kyuubi Server層的服務發現和Kyuubi Engine層的服務發現。
Kyuubi Server層的服務發現
Kyuubi Server層的服務發現是須要用戶感知的。
KyuubiServer實例在啓動以後都會向Zookeeper的/kyuubi
節點下面建立關於本身實例信息的節點,主要是包含KyuubiServer實例監聽的host和port這兩個關鍵信息,這樣用戶在鏈接KyuubiServer時,只須要到Zookeeper的/kyuubi
節點下面獲取對應的服務信息便可,當有多個KyuubiServer實例時,選取哪個實例進行登陸,這個是由用戶自行決定的,Kyuubi自己並不會進行干預。
在實際應用時也能夠封裝接口實現隨機返回實例給用戶,以免直接暴露Kyuubi的底層實現給用戶。
另外,KyuubiServer實例是對全部用戶共享,並不會存在特定KyuubiServer實例只對特定用戶服務的問題。
固然在實際應用時你也能夠這麼作,好比你能夠不對用戶暴露服務發現,也就是不對用戶暴露Zookeeper,對於不一樣用戶,直接告訴他們相應的KyuubiServer實例鏈接信息便可。不過這樣一來,Kyuubi Server層的高可用就難以保證了。
好比有多個在不一樣節點上啓動的KyuubiServer實例,其在Zookeeper上面註冊的信息以下:
/kyuubi/instance1_10.2.10.1:10009 /kyuubi/instance2_10.2.10.1:10010 /kyuubi/instance3_10.2.10.2:10009
Kyuubi Engine層的服務發現
Kyuubi Engine層的服務發現是不須要用戶感知的,其屬於Kyuubi內部不一樣組件之間的一種通訊協做方式。
SparkSQLEngine實例在啓動以後都會向Zookeeper的/kyuubi_USER
節點下面建立關於本身實例信息的節點,主要是包含該實例監聽的host和port以及其所屬user的相關信息,也就是說SparkSQLEngine實例並非全部用戶共享的,它是由用戶獨享的。
好比Kyuubi系統中有多個不一樣用戶使用了Kyuubi服務,啓動了多個SparkSQLEngine實例,其在Zookeeper上面註冊的信息以下:
/kyuubi_USER/xpleaf/instance1_10.2.20.1:52643 /kyuubi_USER/yyh/instance2_10.2.10.1:52346 /kyuubi_USER/leaf/instance3_10.2.10.2:51762
Kyuubi Server層由多個不一樣的KyuubiServer實例組成,每一個KyuubiServer實例本質上爲基於Apache Thrift實現的RPC服務端,其接收來自用戶的請求,但並不會真正執行該請求的相關SQL操做,只會做爲代理轉發該請求到Kyuubi Engine層用戶所屬的SparkSQLEngine實例上。
整個Kyuubi系統中須要存在多少個KyuubiServer實例是由Kyuubi系統管理員決定的,根據實際使用Kyuubi服務的用戶數和併發數,能夠部署一個或多個KyuubiServer實例,以知足SLA要求。固然後續發現KyuubiServer實例不夠時,能夠橫向動態擴容,只須要在Kyuubi中系統配置好host和port,啓動新的KyuubiServer實例便可。
Kyuubi Engine層由多個不一樣的SparkSQLEngine實例組成,每一個SparkSQLEngine實例本質上爲基於Apache Thrift實現的而且持有一個SparkSession實例的RPC服務端,其接收來自KyuubiServer實例的請求,並經過SparkSession實例來執行。在Kyuubi的USER共享層級上,每一個SparkSQLEngine實例都是用戶級別的,即不一樣的用戶其會持有不一樣的SparkSQLEngine實例,以實現用戶級別的資源隔離和控制。
SparkSQLEngine實例是針對不一樣的用戶按需啓動的。在Kyuubi總體系統啓動以後,若是沒有用戶訪問Kyuubi服務,實際上在整個系統中只有一個或多個KyuubiServer實例,當有用戶經過JDBC或beeline的方式鏈接KyuubiServer實例時,其會在Zookeeper上去查找是否存在用戶所屬的SparkSQLEngine實例,若是沒有,則經過spark-submit
提交一個Spark應用,而這個Spark應用自己就是SparkSQLEngine,啓動後,基於其內部構建的SparkSession實例,便可爲特定用戶執行相關SQL操做。
經過前面對各層的介紹,結合KyubbiServer架構圖,以用戶xpleaf
訪問Kyuubi服務爲例來描述整個流程。
1.Kyuubi系統管理員在大數據集羣中啓動了3個KyuubiServer實例和1個Zookeeper集羣,其中3個KyuubiServer實例的鏈接信息分別爲10.2.10.1:10009
、10.2.10.1:10010
和10.2.10.2:1009
;
2.用戶xpleaf經過beeline終端的方式鏈接了其中一個KyuubiServer實例;
./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf
在這裏咱們假設用戶xpleaf事先已經經過管理員告知的方式知道了該KyuubiServer實例的鏈接信息。
3.KyuubiServer_instance1接收到xpleaf的鏈接請求,會爲該用戶建立session會話,同時會去Zookeeper上檢查是否已經存在xpleaf所屬的SparkSQLEngine實例;
4.KyuubiServer_instance1在Zookeeper上沒有找到xpleaf所屬的SparkSQLEngine實例信息,其經過spark-submit的方式啓動了一個SparkSQLEngine實例;
5.屬於xpleaf用戶的新的SparkSQLEngine_instance1實例在10.2.10.1
節點上進行啓動,而且監聽的52463
端口,啓動後,其向Zookeeper註冊本身的鏈接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463
;
6.KyuubiServer_instance1在檢測到SparkSQLEngine_instance1啓動成功後,會向其發送建立session會話的鏈接請求;
7.SparkSQLEngine_instance1收到KyuubiServer_instance1建立session會話的鏈接請求,則建立一個新的session會話;
8.用戶啓動beeleine完成併成功建立會話,接着用戶執行SQL查詢;
0: jdbc:hive2://10.2.10.1:10009> select * from teacher;
9.KyuubiServer_instance1接收到xpleaf的執行SQL查詢的請求,會先檢查是否存在xpleaf所屬的SparkSQLEngine實例;
10.KyuubiServer_instance1找到xpleaf所屬的SparkSQLEngine_instance1實例,接着會爲此次執行SQL的操做建立一個Operation;
11.KyuubiServer_instance1根據鏈接信息建立了一個RPC Client,而且構建SQL執行的RPC請求,發到對應的SparkSQLEngine_instance1實例上;
12.SparkSQLEngine_instance1接收到該請求後,會建立一個該SQL操做的Operation,而且使用其內部的SparkSession實例來進行執行,最後將執行結果返回給KyuubiServer_instance1;
13.KyuubiServer_instance1接收到SparkSQLEngine_instance1的執行結果,返回給用戶,這樣一次SQL查詢操做就完成了。
0: jdbc:hive2://localhost:10009> select * from teacher; +-----------+------------+--------------+ | database | tableName | isTemporary | +-----------+------------+--------------+ | default | teacher | false | +-----------+------------+--------------+ 1 row selected (0.19 seconds)
透過總體協做流程咱們能夠看到:
Kyuubi在總體Server端和Client端以及其實現功能的設計上,是十分清晰的。
經過前面對Kyuubi各層以及總體協做流程的描述,相信對Kyuubi的核心架構設計會有一個比較清晰的理解,這樣再去分析Kyuubi的源碼時就會簡單不少。
首先咱們會來介紹Kyuubi總體的Service體系與組合關係,以對Kyuubi總體核心代碼有一個概覽性的理解,接着會選取多個關鍵場景來對Kyuubi的源碼進行分析,而且給出每一個場景的代碼執行流程圖。
確實沒有辦法在較爲簡短的篇幅裏爲你們介紹Kyuubi源碼的方方面面,但我我的認爲無論對於哪一個大數據組件,在理解了其底層通訊框架的基礎上,再選取關於該組件的幾個或多個關鍵場景來分析其源碼,基本上對其總體設計就會有概覽性的理解,這樣後面對於該組件可能出現的Bug進行排查與修復,或是對該組件進行深度定製以知足業務的實際需求,我相信問題都不大——這也就達到了咱們的目的,就是去解決實際問題。
固然,在這個過程中你也能夠欣賞到漂亮的代碼,這自己也是一種享受。
RPC
RPC(Remote Procedure Call)遠程過程調用,若是按照百度百科的解釋會很是羞澀難懂(上面提供的圖應該仍是《TCP/IP詳解卷1:協議》上面的一個圖),但實際上咱們就能夠簡單地把它理解爲,一個進程調用另一個進程的服務便可,無論是經過Socket、內存共享或是網絡的方式,只要其調用的服務的具體實現不是在調用方的進程內完成的就能夠,目前咱們見得比較多的是經過網絡通訊調用服務的方式。
在Java語言層面上比較廣泛的RPC實現方式是,反射+網絡通訊+動態代理的方式來實現RPC,而網絡通訊因爲須要考慮各類性能指標,主要用的Netty或者原生的NIO比較多,Socket通常比較少用,好比能夠看一下阿里Doubbo的實現。
若是想加深這方面的理解,能夠參考個人一個開源RPC框架,其實就是很是mini版的Doubbo實現:https://github.com/xpleaf/minidubbo,建議有時間能夠看下,實際上這會很是有用,由於幾乎全部的大數據組件都會用到相關的RPC框架,無論是開源三方的仍是其本身實現的(好比Hadoop的就是使用本身實現的一套RPC框架)。
Apache Thrift
Apache Thrift是業界流行的RPC框架,經過其提供的接口描述語言(IDL),能夠快速構建用於數據通訊的而且語言無關的RPC客戶端和服務端,在帶來高性能的同時,大大下降了開發人員構建RPC服務的成本,所以在大數據生態其有較多的應用場景,好比咱們熟知的hiveserver2便是基於Apache Thrift來構建其RPC服務。
在看Kyuubi的源碼時,咱們能夠把較多精力放在某幾種較重要的類和其體系上,這樣有助於咱們抓住重點,理解Kyuubi最核心的部分。僅考慮Kyuubi總體的架構設計和實現,比較重要的是Service、Session和Operation等相關的類和體系。
Service體系
Service,顧名思義就是服務,在Kyuubi中,各類不一樣核心功能的提供都是經過其Service體系下各個實現類來進行提供的。咱們前面提到的服務發現層、Kyuubi Server層和Kyuubi Engine層,在代碼實現上絕大部分核心功能都是由Kyuubi源碼項目的Server類體系來完成的,能夠這麼說,理解了Service體系涉及類的相關功能,就基本上從源碼級別上理解了整個Kyuubi的體系架構設計和實現。
固然這些Service的實現類並不必定使用Service結尾,好比SessionManager、OperationManager等,但基本上從名字咱們就能對其功能窺探一二。
其完整的繼承關係以下:
基於Kyuubi提供的核心功能,咱們能夠大體按Kyuubi Server層和Kyuubi Engine層來將整個體系中的Service類進行一個劃分:
openSession
、executeStatement
、fetchResults
等;openSession
、executeStatement
、fetchResults
等;這裏咱們只對具體實現類進行歸類,由於中間抽象類只是提取多個子類的公共方法,不影響咱們對其體系功能的說明和講解;而以Noop開頭的其實是Kyuubi的測試實現類,所以咱們也不展開說明;KinitAuxiliaryService
是Kyuubi中用於認證的類,這裏咱們不對其認證功能實現進行說明。
經過對Service體系各個具體實現類的介紹,再回顧前面對Kyuubi總體架構和協做流程的介紹,其抽象的功能在源碼實現類上面就有了一個相對比較清晰的體現,而且基本上也是能夠一一對應上的。
Service組合關係
爲了理解Kyuubi在源碼層面上是如何進行總體協做的,除了前面介紹的Service體系外,咱們還有必要理清其各個Service之間的組合關係。
在整個Service體系中,CompositeService
這個中間抽象類在設計上是須要額外關注的,它表示的是在它之下的實現類都至少有一個成員爲其它Service服務類對象,好比對於KyuubiServer
,它的成員則包含有KyuubiBackdService
、KyuubiServiceDiscovery
等多個Service實現類,SparkSQLEngine
也是如此。
咱們將一些關鍵的Service類及其組合關係梳理以下,這對後面咱們分析關鍵場景的代碼執行流程時會提供很清晰的思路參考:
Session與SessionHandle
當咱們使用經過JDBC或beeline的方式鏈接Kyuubi時,實際上在Kyuubi內部就爲咱們建立了一個Session,用以標識本次會話的全部相關信息,後續的全部操做都是基於此次會話來完成的,咱們能夠在一次會話下執行多個操做(好比屢次執行某次SQL,咱們只須要創建一次會話鏈接便可)。
Session在Kyuubi中又分爲Kyuubi Server層的Session和Kyuubi Engine層的Session。Kyuubi Server層的Session實現類爲KyuubiSessionImpl
,用來標識來自用戶層的會話鏈接信息;Kyuubi Engine層的Session實現類爲SparkSessionImpl
,用來標識來自Kyuubi Server層的會話鏈接信息。兩個Session實現類都有一個共同的抽象父類AbstractSession
,用於Session操做的主要功能邏輯都是在該類實現的。
Session對象的存儲實際上由SessionManager來完成,在SessionManager內部其經過一個Map來存儲Session的詳細信息,其中key爲SessionHandle,value爲Session對象自己。SessionHandle能夠理解爲就是封裝了一個惟一標識一個用戶會話的字符串,這樣用戶在會話創建後進行通訊時只須要攜帶該字符串標識便可,並不須要傳輸完整的會話信息,以免網絡傳輸帶來的開銷。
Operation與OperationHandle
用戶在創建會話後執行的相關語句在Kyuubi內部都會抽象爲一個個的Operation,好比執行一條SQL語句對應的Operation實現類爲Executement
,不過須要注意,Operation又分爲Kyuubi Server層的KyuubiOperation
和Kyuubi Engine層的SparkOperation
。Kyuubi Server層的Operation並不會執行真正的操做,它只是一個代理,它會經過RPC Client請求Kyuubi Engine層來執行該Operation,所以全部Operation的真正執行都是在Kyuubi Engine層來完成的。
因爲Operation都是創建在Session之下的,因此咱們在看前面的組合關係時能夠看到,用於管理Operation的OperationManager爲SessionManager的成員屬性。
Operation對象的存儲實際上由OprationManager來完成,在SessioOprationManagerManager內部其經過一個Map來存儲Session的詳細信息,其中key爲OperationHandle,value爲Operation對象自己。OperationHandle能夠理解爲就是封裝了一個惟一標識一個用戶操做的字符串,這樣用戶基於會話的操做時只須要攜帶該字符串標識便可,並不須要傳輸完整的操做信息,以免網絡傳輸帶來的開銷。
第一次提交Operation時仍是須要完整信息,後續只須要提供OperationHandle便可,實際上SQL語句的執行在Kyuubi內部是異步執行的,用戶端在提交Opeation後便可得到OperationHandle,後續只須要持着該OperationHandle去獲取結果便可,咱們在分析SQL執行的代碼時就能夠看到這一點。
Kyuubi的啓動實際上包含兩部分,分別是KyuubiServer的啓動和SparkSQLEngine的啓動。KyuubiServer實例的啓動發生在系統管理員根據實際業務須要啓動KyuubiServer實例,這個是手動操做完成的;而SparkSQLEngine實例的啓動則是在爲用戶創建會話時爲由KyuubiServer實例經過spark-submit的方式去提交一個Spark應用來完成的。
KyuubiServer啓動流程
當咱們在Kyuubi的bin目錄下去執行./kyuubi run
命令去啓動KyuubiServer時,就會去執行KyuubiServer的main方法:
def main(args: Array[String]): Unit = { info( """ | Welcome to | __ __ __ | /\ \/\ \ /\ \ __ | \ \ \/'/' __ __ __ __ __ __\ \ \____/\_\ | \ \ , < /\ \/\ \/\ \/\ \/\ \/\ \\ \ '__`\/\ \ | \ \ \\`\\ \ \_\ \ \ \_\ \ \ \_\ \\ \ \L\ \ \ \ | \ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\ | \/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/ | /\___/ | \/__/ """.stripMargin) info(s"Version: $KYUUBI_VERSION, Revision: $REVISION, Branch: $BRANCH," + s" Java: $JAVA_COMPILE_VERSION, Scala: $SCALA_COMPILE_VERSION," + s" Spark: $SPARK_COMPILE_VERSION, Hadoop: $HADOOP_COMPILE_VERSION," + s" Hive: $HIVE_COMPILE_VERSION") info(s"Using Scala ${Properties.versionString}, ${Properties.javaVmName}," + s" ${Properties.javaVersion}") SignalRegister.registerLogger(logger) val conf = new KyuubiConf().loadFileDefaults() UserGroupInformation.setConfiguration(KyuubiHadoopUtils.newHadoopConf(conf)) startServer(conf) }
在加載完配置信息後,經過調用startServer(conf)
方法,就開始了KyuubiServer的啓動流程:
def startServer(conf: KyuubiConf): KyuubiServer = { if (!ServiceDiscovery.supportServiceDiscovery(conf)) { zkServer.initialize(conf) zkServer.start() conf.set(HA_ZK_QUORUM, zkServer.getConnectString) conf.set(HA_ZK_ACL_ENABLED, false) } val server = new KyuubiServer() server.initialize(conf) server.start() sys.addShutdownHook(server.stop()) server }
能夠看到,實際上KyuubiServer的啓動包括兩部分:初始化和啓動。
KyuubiServer的初始化和啓動其實是一個遞歸初始化和啓動的過程。咱們前面提到,KyuubiServer爲Service體系下的一個CompositeService
,參考前面給出的組合關係圖,它自己的成員又包含了多個Service對象,它們都保存在保存在serviceList
這個成員當中,所以初始化和啓動KyuubiServer實際上就是初始化和啓動serviceList
中所包含的各個Service對象。而這些Service對象自己又多是CompositeService
,所以KyuubiServer的啓動和初始化實際上就是一個遞歸初始化和啓動的過程。
// 遞歸初始化serviceList下的各個服務 override def initialize(conf: KyuubiConf): Unit = { serviceList.foreach(_.initialize(conf)) super.initialize(conf) } // 遞歸啓動serviceList下的各個服務 override def start(): Unit = { serviceList.zipWithIndex.foreach { case (service, idx) => try { service.start() } catch { case NonFatal(e) => error(s"Error starting service ${service.getName}", e) stop(idx) throw new KyuubiException(s"Failed to Start $getName", e) } } super.start() }
這樣一來,整個KyuubiServer的啓動流程就比較清晰了,這也是咱們在最開始就列出其Service體系和組合關係的緣由,因爲總體的啓動流程和細節所包含的代碼比較多,咱們就沒有必要貼代碼了,這裏我把整個初始化和啓動流程步驟的流程圖梳理了出來,待會再對其中一些須要重點關注的點進行說明,以下:
咱們重點關注一下FontendService
和ServiceDiscoveryService
的初始化和啓動流程。
咱們須要重點關注一下FrontendService
,由於KyuubiServer實例對外提供RPC服務都是由其做爲入口來完成的。
其初始化時主要是獲取和設置了Apache Thrift內置的用於構建RPC服務端的TThreadPoolServer
的相關參數:
override def initialize(conf: KyuubiConf): Unit = synchronized { this.conf = conf try { hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf) val serverHost = conf.get(FRONTEND_BIND_HOST) serverAddr = serverHost.map(InetAddress.getByName).getOrElse(InetAddress.getLocalHost) portNum = conf.get(FRONTEND_BIND_PORT) val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS) val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS) val keepAliveTime = conf.get(FRONTEND_WORKER_KEEPALIVE_TIME) val executor = ExecutorPoolCaptureOom( name + "Handler-Pool", minThreads, maxThreads, keepAliveTime, oomHook) authFactory = new KyuubiAuthenticationFactory(conf) val transFactory = authFactory.getTTransportFactory val tProcFactory = authFactory.getTProcessorFactory(this) val serverSocket = new ServerSocket(portNum, -1, serverAddr) portNum = serverSocket.getLocalPort val tServerSocket = new TServerSocket(serverSocket) val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE) val requestTimeout = conf.get(FRONTEND_LOGIN_TIMEOUT).toInt val beBackoffSlotLength = conf.get(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt val args = new TThreadPoolServer.Args(tServerSocket) .processorFactory(tProcFactory) .transportFactory(transFactory) .protocolFactory(new TBinaryProtocol.Factory) .inputProtocolFactory( new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS) .beBackoffSlotLength(beBackoffSlotLength) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) .executorService(executor) // TCP Server server = Some(new TThreadPoolServer(args)) server.foreach(_.setServerEventHandler(new FeTServerEventHandler)) info(s"Initializing $name on host ${serverAddr.getCanonicalHostName} at port $portNum with" + s" [$minThreads, $maxThreads] worker threads") } catch { case e: Throwable => throw new KyuubiException( s"Failed to initialize frontend service on $serverAddr:$portNum.", e) } super.initialize(conf) }
能夠看到主要是host、port、minThreads、maxThreads、maxMessageSize、requestTimeout等,這些參數都是可配置的,關於其詳細做用能夠參考KyuubiConf
這個類的說明。
其啓動比較簡單,主要是調用TThreadPoolServer
的server()
方法來完成:
override def start(): Unit = synchronized { super.start() if(!isStarted) { serverThread = new NamedThreadFactory(getName, false).newThread(this) serverThread.start() isStarted = true } } override def run(): Unit = try { info(s"Starting and exposing JDBC connection at: jdbc:hive2://$connectionUrl/") server.foreach(_.serve()) } catch { case _: InterruptedException => error(s"$getName is interrupted") case t: Throwable => error(s"Error starting $getName", t) System.exit(-1) }
初始化時主要是建立一個用於後續鏈接ZooKeeper的zkClient:
def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { this.conf = conf _namespace = conf.get(HA_ZK_NAMESPACE) val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) setUpZooKeeperAuth(conf) _zkClient = buildZookeeperClient(conf) zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener { private val isConnected = new AtomicBoolean(false) override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = { info(s"Zookeeper client connection state changed to: $newState") newState match { case CONNECTED | RECONNECTED => isConnected.set(true) case LOST => isConnected.set(false) val delay = maxRetries.toLong * maxSleepTime connectionChecker.schedule(new Runnable { override def run(): Unit = if (!isConnected.get()) { error(s"Zookeeper client connection state changed to: $newState, but failed to" + s" reconnect in ${delay / 1000} seconds. Give up retry. ") stopGracefully() } }, delay, TimeUnit.MILLISECONDS) case _ => } } }) zkClient.start() super.initialize(conf) }
固然這裏還看到其獲取了一個HA_ZK_NAMESPACE
的配置值,其默認值爲kyuubi
:
val HA_ZK_NAMESPACE: ConfigEntry[String] = buildConf("ha.zookeeper.namespace") .doc("The root directory for the service to deploy its instance uri. Additionally, it will" + " creates a -[username] suffixed root directory for each application") .version("1.0.0") .stringConf .createWithDefault("kyuubi")
在ServiceDiscoveryService進行啓動的時候,就會基於該namesapce來構建在Kyuubi Server層進行服務發現所須要的KyuubiServer實例信息:
override def start(): Unit = { val ns = ZKPaths.makePath(null, namespace) try { zkClient .create() .creatingParentsIfNeeded() .withMode(PERSISTENT) .forPath(ns) } catch { case _: NodeExistsException => // do nothing case e: KeeperException => throw new KyuubiException(s"Failed to create namespace '$ns'", e) } val instance = server.connectionUrl val pathPrefix = ZKPaths.makePath( namespace, s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=") try { _serviceNode = new PersistentEphemeralNode( zkClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, instance.getBytes(StandardCharsets.UTF_8)) serviceNode.start() val znodeTimeout = 120 if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) { throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted") } info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance) } catch { case e: Exception => if (serviceNode != null) { serviceNode.close() } throw new KyuubiException( s"Unable to create a znode for this server instance: $instance", e) } super.start() }
在這裏,就會在Zookeeper的/kyuubi
節點下面建立一個包含KyuubiServer實例詳細鏈接信息的節點,假設KyuubiServer實例所配置的host和post分別爲10.2.10.1
和10009
,那麼其所建立的zk節點爲:
[zk: localhost:2181(CONNECTED) 87] ls /kyuubi [serviceUri=10.2.10.1:10009;version=1.1.0;sequence=0000000007]
咱們主要關注一下其啓動過程:
// org.apache.kyuubi.session.SessionManager#start override def start(): Unit = { startTimeoutChecker() super.start() } // org.apache.kyuubi.session.SessionManager#startTimeoutChecker private def startTimeoutChecker(): Unit = { val interval = conf.get(SESSION_CHECK_INTERVAL) val timeout = conf.get(SESSION_TIMEOUT) val checkTask = new Runnable { override def run(): Unit = { val current = System.currentTimeMillis if (!shutdown) { for (session <- handleToSession.values().asScala) { if (session.lastAccessTime + timeout <= current && session.getNoOperationTime > timeout) { try { closeSession(session.handle) } catch { case e: KyuubiSQLException => warn(s"Error closing idle session ${session.handle}", e) } } else { session.closeExpiredOperations } } } } } timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) }
在這裏主要完成的事情:
1.獲取session check interval;
2.獲取session timout;
3.起一個schedule的調度線程;
4.根據interval和timeout對handleToSession的session進行檢查;
5.若是session超時(超過timeout沒有access),則closesession;
那麼對於KyuubiServer的啓動過程咱們就分析到這裏,更多細節部分你們能夠結合個人流程圖來自行閱讀代碼便可,實際上當咱們把Kyuubi的Service體系和組合關係整理下來以後,再去分析它的啓動流程時就會發現簡單不少,這個過程當中無非就是要關注它的一些相關參數獲取和設置是在哪裏完成的,它是怎麼偵聽服務的(真正用於偵聽host和port的server的啓動)。
SparkSQLEngine啓動流程
在KyuubiServer爲用戶創建會話時會去經過服務發現層去Zookeeper查找該用戶是否存在對應的SparkSQLEngine實例,若是沒有則經過spark-submit的啓動一個屬於該用戶的SparkSQLEngine實例。
後面在分析KyuubiServer Session創建過程會提到,實際上KyuubiServer是經過調用外部進程命令的方式來提交一個Spark應用的,爲了方便分析SparkSQLEngine的啓動流程,這裏我先將其大體的命令貼出來:
/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \ --class org.apache.kyuubi.engine.spark.SparkSQLEngine \ --conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \ --conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \ --conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \ --conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \ --conf spark.yarn.tags=KYUUBI \ --conf spark.kyuubi.ha.zookeeper.acl.enabled=false \ --proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar
kyuubi-spark-sql-engine-1.1.0.jar
是Kyuubi發佈版本里面的一個jar包,裏面就包含了SparkSQLEngine
這個類,經過-class
參數咱們能夠知道,實際上就是要運行SparkSQLEngine的main方法,因爲開啓了SparkSQLEngine的啓動流程。
須要說明的是,提交Sparkk App的這些參數在SparkSQLEngine啓動以前都會被設置到SparkSQLEngine的成員變量kyuubiConf
當中,獲取方法比較簡單,經過scala提供的sys.props
就能夠獲取,這些參數在SparkSQLEngine的初始化和啓動中都會起到十分關鍵的做用。
接下來咱們看一下SparkSQLEngine的main方法:
def main(args: Array[String]): Unit = { SignalRegister.registerLogger(logger) var spark: SparkSession = null var engine: SparkSQLEngine = null try { spark = createSpark() engine = startEngine(spark) info(KyuubiSparkUtil.diagnostics(spark)) // blocking main thread countDownLatch.await() } catch { case t: Throwable => error("Error start SparkSQLEngine", t) if (engine != null) { engine.stop() } } finally { if (spark != null) { spark.stop() } } }
首先會經過createSpark()
建立一個SparkSession對象,後續SQL的真正執行都會交由其去執行,其建立方法以下:
def createSpark(): SparkSession = { val sparkConf = new SparkConf() sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true") sparkConf.setIfMissing("spark.master", "local") sparkConf.setIfMissing("spark.ui.port", "0") val appName = s"kyuubi_${user}_spark_${Instant.now}" sparkConf.setIfMissing("spark.app.name", appName) kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0) kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString) // Pass kyuubi config from spark with `spark.kyuubi` val sparkToKyuubiPrefix = "spark.kyuubi." sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) => kyuubiConf.set(s"kyuubi.$k", v) } if (logger.isDebugEnabled) { kyuubiConf.getAll.foreach { case (k, v) => debug(s"KyuubiConf: $k = $v") } } val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() session.sql("SHOW DATABASES") session }
這裏主要是設置了一些在建立SparkSession時須要的參數,包括appName、spark運行方式、spark ui的端口等。另外這裏還特別對frontend.bind.port
參數設置爲0,關於該參數自己的定義以下:
val FRONTEND_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.bind.port") .doc("Port of the machine on which to run the frontend service.") .version("1.0.0") .intConf .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number") .createWithDefault(10009)
能夠看到其默認值爲10009,前面KyuubiServer在構建TThreadPoolServer
時就直接使用了默認值,這也是咱們啓動的KyuubiServer實例偵聽10009端口的緣由,而在這裏,也就是SparkSQLEngine啓動時將其設置爲0是有緣由,咱們將在下面繼續說明。
建立完成SparkSession後才調用startEngine(spark)
方法啓動SparkSQLEngine自己:
def startEngine(spark: SparkSession): SparkSQLEngine = { val engine = new SparkSQLEngine(spark) engine.initialize(kyuubiConf) engine.start() sys.addShutdownHook(engine.stop()) currentEngine = Some(engine) engine }
能夠看到也是先進行初始化,而後再啓動,SparkSQLEngine自己是CompositeService
,因此初始化和啓動過程跟KyuubiServer是如出一轍的(固然其包含的成員會有所差異),都是遞歸對serviceList
中所包含的各個Service對象進行初始化和啓動:
FrontendService在SparkSQLEngine中的啓動流程與在KyuubiServer中的啓動流程是基本同樣的,能夠參考前面的說明,這裏主要說明一些比較細微的差異點。
前面已經設置了frontend.bind.port
參數的值爲0,在FrontendService這個類當中,它會賦值給portNum
這個變量,用以構建TThreadPoolServer
所須要的參數ServerSocket
對象:
// org.apache.kyuubi.service.FrontendService#initialize val serverSocket = new ServerSocket(portNum, -1, serverAddr)
因此實際上,無論是KyuubiServer仍是SparkSQLEngine,其所偵聽的端口是在這裏構建ServerSocket對象的時候肯定下來的,對ServerSocket對象,若是傳入一個爲0的portNum,則表示使用系統隨機分配的端口號,因此這也就是咱們在啓動了SparkSQLEngine以後看到其偵聽的端口號都是隨機端口號的緣由。
與KyuubiServer相似,這裏分析一下其差異點。
前面在經過spark-submit提交應用時傳入了--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf
的參數,實際上在SparkSQLEngine初始化KyuubiConfig對象時會設置到KyuubiConfig.HA_ZK_NAMESPACE
屬性上,所以在ServiceDiscoveryService初始化時獲取的namespace實際上就爲/kyuubi_USER/xpleaf
,而不是默認的kyuubi
,這點是須要注意的:
def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { this.conf = conf _namespace = conf.get(HA_ZK_NAMESPACE) // 省略其它代碼 }
所以在啓動調用start()方法時,其在Zookeeper上構建的znode節點也就不一樣:
override def start(): Unit = { // 省略其它代碼 val instance = server.connectionUrl val pathPrefix = ZKPaths.makePath( namespace, s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=") // 省略其它代碼 }
好比其建立的znode節點爲:
[zk: localhost:2181(CONNECTED) 94] ls /kyuubi_USER/xpleaf [serviceUri=10.2.10.1:52643;version=1.1.0;sequence=0000000004]
SparkSQLSessionManager也是繼承自SessionManager,所以與KyuubiServer的KyuubiSessionManager同樣,其也啓動了一個用於檢查Session是否超時的checker。
此外,還啓動了另一個checker,以下:
// org.apache.kyuubi.engine.spark.SparkSQLEngine#start override def start(): Unit = { super.start() // Start engine self-terminating checker after all services are ready and it can be reached by // all servers in engine spaces. backendService.sessionManager.startTerminatingChecker() } // org.apache.kyuubi.session.SessionManager#startTerminatingChecker private[kyuubi] def startTerminatingChecker(): Unit = if (!isServer) { // initialize `_latestLogoutTime` at start _latestLogoutTime = System.currentTimeMillis() val interval = conf.get(ENGINE_CHECK_INTERVAL) val idleTimeout = conf.get(ENGINE_IDLE_TIMEOUT) val checkTask = new Runnable { override def run(): Unit = { if (!shutdown && System.currentTimeMillis() - latestLogoutTime > idleTimeout && getOpenSessionCount <= 0) { info(s"Idled for more than $idleTimeout ms, terminating") sys.exit(0) // Note:直接退出整個SparkSQLEngine,也就是App } } } timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) }
實際上這個checker是在SparkSQLEngine遞歸初始化和啓動其serviceList以前就已經啓動,從它的實現當中咱們能夠看到,當超過必定時時而且SparkSQLEngine維護的Session爲0時,整個SparkSQLEngine實例就會退出,這樣作的好處就是,若是一個用戶的SparkSQLEngine實例長期沒有被使用,咱們就能夠將其佔用的資源釋放出來,達到節省資源的目的。
Kyuubi Session的創建實際上包含兩部分,分別是KyuubiServer Session創建和SparkSQLEngine Session創建,這兩個過程不是獨立進行的,KyuubiServer Session的創建伴隨着SparkSQLEngine Session的創建,KyuubiServer Session和SparkSQLEngine Session才完整構成了Kyuubi中可用於執行特定Operation操做的Session。
KyuubiServer Session創建過程
當用戶經過JDBC或beeline的方式鏈接Kyuubi時,實際上就開啓了KyuubiServer Session的一個創建過程,此時KyuubiServer中FrontedService的OpenSession
方法就會被執行:
// org.apache.kyuubi.service.FrontendService#OpenSession override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = { debug(req.toString) info("Client protocol version: " + req.getClient_protocol) val resp = new TOpenSessionResp try { val sessionHandle = getSessionHandle(req, resp) resp.setSessionHandle(sessionHandle.toTSessionHandle) resp.setConfiguration(new java.util.HashMap[String, String]()) resp.setStatus(OK_STATUS) Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle)) } catch { case e: Exception => warn("Error opening session: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true)) } resp }
進而開啓了KyuubiServer Session創建以及後續SparkSQLEngine實例啓動(這部分前面已經單獨介紹)、SparkSQLEngine Session創建的過程:
總體流程並不複雜,在執行FrontendService#OpenSession方法時,最終會調用到KyuubiSessionImpl#open方法,這是整個KyuubiServer Session創建最複雜也是最爲關鍵的一個過程,爲此咱們單獨將其流程整理出來進行說明:
流程中其實已經能夠比較清晰地說明其過程,這裏咱們再詳細展開說下,其主要分爲下面的過程:
第一次創建特定user的session時,在zk的/kyuubi_USER path下是沒有相關user的節點的,好比/kyuubi_USER/xpleaf,所以在代碼執行流程中,其獲取的值會爲None,這就觸發了其調用外部命令來啓動一個SparkSQLEngine實例:
// org.apache.kyuubi.session.KyuubiSessionImpl#open override def open(): Unit = { super.open() val zkClient = startZookeeperClient(sessionConf) logSessionInfo(s"Connected to Zookeeper") try { getServerHost(zkClient, appZkNamespace) match { case Some((host, port)) => openSession(host, port) case None => sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.toString) // tag is a seq type with comma-separated sessionConf.set(SparkProcessBuilder.TAG_KEY, sessionConf.getOption(SparkProcessBuilder.TAG_KEY) .map(_ + ",").getOrElse("") + "KYUUBI") sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace) val builder = new SparkProcessBuilder(appUser, sessionConf) try { logSessionInfo(s"Launching SQL engine:\n$builder") val process = builder.start var sh = getServerHost(zkClient, appZkNamespace) val started = System.currentTimeMillis() var exitValue: Option[Int] = None while (sh.isEmpty) { if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { exitValue = Some(process.exitValue()) if (exitValue.get != 0) { throw builder.getError } } if (started + timeout <= System.currentTimeMillis()) { process.destroyForcibly() throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder", builder.getError) } sh = getServerHost(zkClient, appZkNamespace) } val Some((host, port)) = sh openSession(host, port) } finally { // we must close the process builder whether session open is success or failure since // we have a log capture thread in process builder. builder.close() } } } finally { try { zkClient.close() } catch { case e: IOException => error("Failed to release the zkClient after session established", e) } } }
而調用的外部命令實際上就是咱們在前面講解SparkSQLEngine實例中提到的spark-submit命令:
/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \ --class org.apache.kyuubi.engine.spark.SparkSQLEngine \ --conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \ --conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \ --conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \ --conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \ --conf spark.yarn.tags=KYUUBI \ --conf spark.kyuubi.ha.zookeeper.acl.enabled=false \ --proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar
以後就是SparkSQLEngine實例的啓動過程,其啓動完成以後,就會在Zookeeper上面註冊本身的節點信息。
對於KyuubiSessionImpl#open方法,在不超時的狀況下,循環會一直執行,直到其獲取到用戶的SparkSQLEngine實例信息,循環結束,進入下面跟SparkSQLEngine實例創建會話的過程。
SparkSQLEngine本質上也是一個RPC服務端,爲了與其進行通訊以創建會話,就須要構建RPC客戶端,這裏KyuubiSessionImpl#openSession方法中構建RPC客戶端的方法主要是Apache Thrift的一些模板代碼,以下:
org.apache.kyuubi.session.KyuubiSessionImpl#openSession private def openSession(host: String, port: Int): Unit = { val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous") val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt transport = PlainSASLHelper.getPlainTransport( user, passwd, new TSocket(host, port, loginTimeout)) if (!transport.isOpen) { logSessionInfo(s"Connecting to engine [$host:$port]") transport.open() logSessionInfo(s"Connected to engine [$host:$port]") } client = new TCLIService.Client(new TBinaryProtocol(transport)) val req = new TOpenSessionReq() req.setUsername(user) req.setPassword(passwd) req.setConfiguration(conf.asJava) logSessionInfo(s"Sending TOpenSessionReq to engine [$host:$port]") val resp = client.OpenSession(req) logSessionInfo(s"Received TOpenSessionResp from engine [$host:$port]") ThriftUtils.verifyTStatus(resp.getStatus) remoteSessionHandle = resp.getSessionHandle sessionManager.operationManager.setConnection(handle, client, remoteSessionHandle) }
在發送請求給SparkSQLEngine的時候,又會觸發SparkSQLEngine Session創建的過程(這個接下來講明),在跟其創建完Session以後,KyuubiSessionImpl會將其用於標識用戶端會話的sessionHandle、用於跟SparkSQLEngine進行通訊的RPC客戶端和在SparkSQLEngine實例中進行Session標識的remoteSessionHandle緩存下來,這樣在整個Kyuubi體系中,就構建了一個完整的Session映射關係:userSessionInKyuubiServer-RPCClient-KyuubiServerSessionInSparkSQLEngine,後續的Operation都是創建在這樣一個體系之下。
KyuubiServer在Session創建完成後會給客戶端返回一個SessionHandle,後續客戶端在與KyuubiServer進行通訊時都會攜帶該SessionHandle,以標識其用於會話的窗口。
SparkSQLEngine Session創建過程
在接收到來自KyuubiServer的創建會話的RPC請求以後,SparkSQLEngine中FrontedService的OpenSession
方法就會被執行,其總體流程與KyuubiServer Session的創建過程是相似的,主要不一樣在於SparkSQLSessionManager#openSession方法執行上面,以下:
其對應的關鍵代碼以下:
// org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager#openSession override def openSession( protocol: TProtocolVersion, user: String, password: String, ipAddress: String, conf: Map[String, String]): SessionHandle = { info(s"Opening session for $user@$ipAddress") val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this) val handle = sessionImpl.handle try { val sparkSession = spark.newSession() // 省略非核心代碼 sessionImpl.open() operationManager.setSparkSession(handle, sparkSession) setSession(handle, sessionImpl) info(s"$user's session with $handle is opened, current opening sessions" + s" $getOpenSessionCount") handle } catch { case e: Exception => sessionImpl.close() throw KyuubiSQLException(e) } }
sessionImpl.open()
實際上只是作了日誌記錄的一些操做,因此其實這裏的核心是將建立的Session記錄下來。
SparkSQLEngine在Session創建完成後會給KyuubiServer返回一個SessionHandle,後續KyuubiServer在與SparkSQLEngine進行通訊時都會攜帶該SessionHandle,以標識其用於會話的窗口。
Kyuubi SQL的執行流程實際上包含兩部分,分別是KyuubiServer SQL執行流程和SparkSQLEngine SQL執行流程,其結合起來纔是一個完整的SQL執行流程,KyuubiServer只是一個代理,真正的SQL執行是在SparkSQLEngine中完成。
另外因爲在Kyuubi中,SQL的執行是異步的,也就是能夠先提交一個SQL讓其去執行,後續再經過其返回的operationHandle去獲取結果,因此在KyuubiServer和SparkSQLEngine內部,SQL的執行流程又能夠再細分爲提交Statement和FetchResults兩個過程,在分別分析KyuubiServer SQL執行流程和SparkSQLEngine SQL執行流程時,咱們就是對提交Statment和FetchResults這兩個過程來展開詳細的分析,總體會有些繁多,但並不複雜。
KyuubiServer SQL執行流程
當用戶經過JDBC或beeline的方式執行一條SQL語句時,就開啓了SQL語句在Kyuubi中的執行流程,此時KyuubiServer中FrontedService的ExecuteStatement
方法就會被執行:
override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = { debug(req.toString) val resp = new TExecuteStatementResp try { val sessionHandle = SessionHandle(req.getSessionHandle) val statement = req.getStatement val runAsync = req.isRunAsync // val confOverlay = req.getConfOverlay val queryTimeout = req.getQueryTimeout val operationHandle = if (runAsync) { be.executeStatementAsync(sessionHandle, statement, queryTimeout) } else { be.executeStatement(sessionHandle, statement, queryTimeout) } resp.setOperationHandle(operationHandle.toTOperationHandle) resp.setStatus(OK_STATUS) } catch { case e: Exception => warn("Error executing statement: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e)) } resp }
runAsync
值爲true,所以會經過異步的方式來執行SQL,也就是會執行BackendService的executeStatementAsync方法,開啓了異步執行SQL的流程:
首先會經過KyuubiOperationManager去建立一個表示執行SQL的ExecuteStatement:
// org.apache.kyuubi.operation.KyuubiOperationManager#newExecuteStatementOperation override def newExecuteStatementOperation( session: Session, statement: String, runAsync: Boolean, queryTimeout: Long): Operation = { val client = getThriftClient(session.handle) val remoteSessionHandle = getRemoteTSessionHandle(session.handle) val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync) addOperation(operation) }
client實際上就是咱們前面在KyuubiServer Session創建過程當中創建的用於與SparkSQLEngine通訊的RPC客戶端,ExecuteStatement須要client來發送執行SQL語句的請求給SparkSQLEngine實例,不過須要注意的是,這裏的ExecuteStatement是KyuubiServer體系下的,其類全路徑爲org.apache.kyuubi.operation.ExecuteStatement
,由於後面在分析SparkSQLEngine SQL執行流程時,在SparkSQLEngine體系下也有一個ExecuteStatement,但其類全路徑爲org.apache.kyuubi.engine.spark.operation.ExecuteStatement
。
這裏的整個流程關鍵在於後面執行operation.run()
方法,進而執行runInternal()
方法:
// org.apache.kyuubi.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { executeStatement() val sessionManager = session.sessionManager val asyncOperation = new Runnable { override def run(): Unit = waitStatementComplete() } try { val backgroundOperation = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundOperation) } catch onError("submitting query in background, query rejected") } else { setState(OperationState.RUNNING) executeStatement() setState(OperationState.FINISHED) } }
這裏會經過異步的方式來執行,其先同步執行executeStatement()方法,而後再提交一個異步線程來執行asyncOperation(sessionManager.submitBackgroundOperation(asyncOperation)
實際上就是經過線程池來提交一個線程線程),咱們先看一下其executeStatement()方法:
// org.apache.kyuubi.operation.ExecuteStatement#executeStatement private def executeStatement(): Unit = { try { val req = new TExecuteStatementReq(remoteSessionHandle, statement) req.setRunAsync(shouldRunAsync) val resp = client.ExecuteStatement(req) verifyTStatus(resp.getStatus) _remoteOpHandle = resp.getOperationHandle } catch onError() }
這裏statement實際上就是要執行的SQL語句,因此本質上就是向SparkSQLEngine發送了一個用於執行SQL語句的RPC請求,這樣就會觸發SparkSQLEngine執行提交Statement的一個過程(這個接下來會分析),請求成功後,KyuubiServer會將SparkSQLEngine實例用於記錄該操做的operationHandle記錄下來,就是賦值給成員變量_remoteOpHandle
,_remoteOpHandle
用後續用於查詢statement在SparkSQLEngine實例中的執行狀態和FetchResults。
執行完executeStatement()方法後,咱們再看一下其提交異步線程時所執行的操做,也就是waitStatementComplete()方法:
// org.apache.kyuubi.operation.ExecuteStatement#waitStatementComplete // TODO 主要是更新該Operation的State爲FINISHED,這樣後面取數據時才知道已經執行完成 private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle) private def waitStatementComplete(): Unit = { setState(OperationState.RUNNING) // 由於FetchResults有進行檢查,assertState(OperationState.FINISHED) var statusResp = client.GetOperationStatus(statusReq) var isComplete = false while (!isComplete) { getQueryLog() verifyTStatus(statusResp.getStatus) val remoteState = statusResp.getOperationState info(s"Query[$statementId] in ${remoteState.name()}") isComplete = true remoteState match { case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE => isComplete = false statusResp = client.GetOperationStatus(statusReq) case FINISHED_STATE => setState(OperationState.FINISHED) // 省略其它代碼 setOperationException(ke) } } // see if anymore log could be fetched getQueryLog() }
能夠看到其主要操做是構建用於查詢SparkSQLEngine實例中Operation的執行狀態。
再回過來看一下runInternal()
方法:
// org.apache.kyuubi.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { executeStatement() val sessionManager = session.sessionManager val asyncOperation = new Runnable { override def run(): Unit = waitStatementComplete() } try { val backgroundOperation = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundOperation) } catch onError("submitting query in background, query rejected") } else { // 省略其它代碼 } }
這裏提交一個線程後的返回結果backgroundOperation實際上爲一個FutureTask對象,後續在FetchResults過程當中經過該對象就能夠知道Operation在SparkSQLEngine實例中的執行狀態。
在提交完Statement以後,KyuubiServer會將operationHandle返回給用戶端,用於後續獲取執行結果。
提交完Statement後,用戶層的RPC客戶端就會去獲取結果,此時KyuubiServer中FrontedService的FetchResults
方法就會被執行:
// org.apache.kyuubi.service.FrontendService#FetchResults override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = { debug(req.toString) val resp = new TFetchResultsResp try { val operationHandle = OperationHandle(req.getOperationHandle) val orientation = FetchOrientation.getFetchOrientation(req.getOrientation) // 1 means fetching log val fetchLog = req.getFetchType == 1 val maxRows = req.getMaxRows.toInt val rowSet = be.fetchResults(operationHandle, orientation, maxRows, fetchLog) resp.setResults(rowSet) resp.setHasMoreRows(false) resp.setStatus(OK_STATUS) } catch { case e: Exception => warn("Error fetching results: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e)) } resp }
在獲取真正執行結果以前,會有屢次獲取操做日誌的請求,也就是req.getFetchType == 1
的狀況,這裏咱們只關注fetchLog
爲false的狀況:
獲取執行結果的過程就比較簡單,主要是調用RPC客戶端的FetchResults方法,這樣就會觸發SparkSQLEngine FetchResults的一個過程(這個接下來會分析),不過在獲取執行結果前會檢查其執行狀態,前面在分析在提交Statement時,異步線程waitStatementComplete()就會請求SparkSQLEngine更新其狀態爲FINISHED,所以這裏能夠正常獲取執行結果。
SparkSQLEngine SQL執行流程
接收到KyuubiServer提交Statement的RPC請求時,此時SparkSQLEngine中FrontedService的ExecuteStatement
方法就會被執行,進而觸發接下來提交Statement的整個流程:
其總體流程與KyuubiServer是十分類似的,主要區別在於:
1.其建立的Statement爲SparkSQLEngine體系下的ExecuteStatement;
2.其異步線程是經過SparkSession來執行SQL語句;
所以咱們來看一下其runInternal()
方法和異步線程執行的executeStatement()
方法:
// org.apache.kyuubi.engine.spark.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { val asyncOperation = new Runnable { override def run(): Unit = { OperationLog.setCurrentOperationLog(operationLog) executeStatement() } } try { val sparkSQLSessionManager = session.sessionManager val backgroundHandle = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) val ke = KyuubiSQLException("Error submitting query in background, query rejected", rejected) setOperationException(ke) throw ke } } else { executeStatement() } } // org.apache.kyuubi.engine.spark.operation.ExecuteStatement#executeStatement private def executeStatement(): Unit = { try { setState(OperationState.RUNNING) info(KyuubiSparkUtil.diagnostics(spark)) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) spark.sparkContext.setJobGroup(statementId, statement) result = spark.sql(statement) debug(result.queryExecution) iter = new ArrayFetchIterator(result.collect()) setState(OperationState.FINISHED) } catch { onError(cancel = true) } finally { spark.sparkContext.clearJobGroup() } }
能夠看到其執行很是簡單,就是直接調用SparkSession的sql()方法來執行SQL語句,最後再將結果保存到迭代器iter,並設置執行狀態爲完成。
在提交完Statement以後,SparkSQLEngine會將operationHandle返回給KyuubiServer,用於後續獲取執行結果。
接收到KyuubiServer獲取結果的RPC請求時,此時SparkSQLEngine中FrontedService的FetchResults
方法就會被執行,進而觸發接下來FetchResults的整個流程:
整個過程比較簡單,就是將iter的結果轉換爲rowSet的對象格式,最後返回給KyuubiServer。