Linkis JDBC模塊設計介紹

目錄
javascript

1、 背景介紹
2、 使用介紹
(1)引入依賴模塊
(2)創建測試類
3、 JDBC模塊設計方案
(1)驅動類UJESSQLDriver
(2)JDBC鏈接器UJESSQLConnection
3)執行對象UJESSQLStatement/UJESSQLPreStatement
(4)結果集UJESSQLResultSet
(5)錯誤碼方案
4、 實現方案總結
5、 參考文獻

相關文章分享: Linkis JDBC是如何適配Tableau的?
 

01html

前端

背景介紹java


Linkis做爲大數據平臺中間件,鏈接了底層的計算存儲和上層的開發應用,統一了任務的調度和執行,在JDBC模塊開發出來以前,向Linkis提交SQL執行任務到Spark、Hive執行支持websocket和restful的方式。爲了多樣化與Linkis的交互方式,便捷用戶開發流程,輕量化客戶端的任務提交過程,JDBC的支持無疑是很是值得考慮的。

JDBC(Java Data Base Connectivity, java數據庫鏈接)是一種用於執行SQL語句的Java API,能夠爲多種關係數據庫提供統一訪問,它由一組用Java語言編寫的類和接口組成。JDBC提供了一種基準,據此能夠構建更高級的工具和接口,使數據庫開發人員可以編寫數據庫應用程序。JDBC提供的主要功能是:一、同一個數據庫創建鏈接;二、向數據庫發送SQL語句;三、處理數據庫返回的結果。

在Linkis中,JDBC模塊屬於Linkis-UJES下面的一個子模塊,UJES即Unified Job Execution Service(統一做業執行服務),UJES是Linkis最第一版本的雛形,提供了基礎的任務提交和結果集查詢的對外服務,查詢時JDBC向Linkis-UJES客戶端提交SQL執行,返回獲得結果集,用戶在只須要像使用mysql的JDBC那樣操做,大大下降了學習成本,而實現這些功能僅僅須要用戶引入一個Linkis JDBC的JAR包便可。

須要區分的是,Linkis還提供JDBC引擎,例如在DataSphereStudio中能夠建立JDBC腳本經過JDBC引擎提交任務到Linkis,但該引擎僅模擬實現了JDBC的部分方法,並無規範性的實現sun提供的JDBC 4.0的完整接口方案,沒法向外提供規範的SDK服務。而本文所指的Linkis JDBC模塊是實現了一套接口的完整方案。

 

02mysql

web

使用介紹sql


(1)引入依賴模塊

第一種方式在pom裏面依賴JDBC模塊:
  
  
   
   
            
   
   
    
    
     
     
              
     
     

<dependency> <groupId>com.webank.wedatasphere.linkis</groupId> <artifactId>linkis-ujes-jdbc</artifactId> <version>0.9.1</version> </dependency>
  
  
   
   
            
   
   
注意若是引入不到該jar包,須要在ujes/jdbc目錄裏面執行mvn install -Dmaven.test.skip=true進行本地安裝

第二種方式經過打包和編譯

Step1:在Linkis項目中進入到ujes/jdbc目錄而後在終端輸入指令進行打包mvn assembly:assembly -Dmaven.test.skip=true 該打包指令會跳過單元測試的運行和測試代碼的編譯,並將JDBC模塊須要的依賴一併打包進Jar包之中。
Step2:打包完成後在JDBC的target目錄下會生成兩個Jar包,Jar包名稱中包含dependencies字樣的那個就是咱們須要的Jar包

(2)創建測試類

創建Java的測試類UJESClientImplTestJ,具體接口含義能夠見註釋:
   

public static void main(String[] args) throws SQLException, ClassNotFoundException { //1. 加載驅動類: Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver"); //2. 得到鏈接:jdbc:linkis://gatewayIP:gatewayPort 賬號和密碼對應前端的賬號密碼 Connection connection = DriverManager.getConnection("jdbc:linkis://127.0.0.1:9001","username","password"); //3. 建立statement 和執行查詢 Statement st= connection.createStatement(); ResultSet rs=st.executeQuery("show tables"); //4.處理數據庫的返回結果(使用ResultSet類) while (rs.next()) { ResultSetMetaData metaData = rs.getMetaData(); for (int i = 1; i <= metaData.getColumnCount(); i++) { System.out.print(metaData.getColumnName(i) + ":" +metaData.getColumnTypeName(i)+": "+ rs.getObject(i) + " "); } System.out.println(); } //關閉資源 rs.close(); st.close(); connection.close();}
   


   
2-1 Linkis JDBC 任務執行結果


03typescript

數據庫

模塊設計方案微信


Linkis JDBC模塊設計的初衷是爲了方便用戶經過JDBC的方式,便捷的提交SQL任務到Linkis中執行,是客戶端輕量化追求的一種體現,該模塊的類大多以UJESSQL開頭,表示JDBC模塊屬於linkis的ujes(Unified Job Execution Service,統一任務執行服務)模塊的一部分。

Linkis的JDBC模塊包含了五個關鍵的實現類:
  • UJESSQLDriver

  • UJESSQLConnection

  • UJESSQLStatement

  • UJESSQLPreStatement

  • UJESSQLResultSet

以及許多額外的輔助類,例如數據庫元數據UJESSQLDatabaseMetaData,任務執行返回的結果集元數據UJESSQLResultMetaData等。

當UJESSQLDriver經過反射機制註冊到DriverManager後,經過DM能夠拿到UJESSQLConnection,接着即可以正常進行SQL任務提交和獲取結果集,下面是結果集獲取時的方法調用時序圖:


下面將逐一介紹JDBC關鍵類在Linkis中的實現方案。

(1)驅動類UJESSQLDriver
在 JDBC的層次上,sun主要定義了一個接口Driver和兩個類:DirverManager和DriverInfo,每一個JDBC驅動程序必須實現 Driver接口,例如MySql的Connector/J驅動中,叫作com.mysql.jdbc.Driver,在Linkis的JDBC中的驅動實現類爲UJESSQLDriver。使用時經過Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver")的方法顯示地讓JVM嘗試加載類,並相應的調用靜態代碼塊完成驅動類的註冊。UJESSQLDriver的主要代碼以下:
static { try { DriverManager.registerDriver(new UJESSQLDriver()); } catch (SQLException e) { Logger logger = LoggerFactory.getLogger(UJESSQLDriver.class); logger.info("Load driver failed",e); }}
經過調用DriverManager的註冊方法registerDriver將該驅動類註冊到DriverManager中,當用戶調用DriverManager的getConnection時,DriverManager會檢索全部已經註冊的驅動類,並根據驅動類的類名和請求URL中的類名進行對比,尋找出對應的驅動類。

(2)JDBC鏈接器UJESSQLConnection

Linkis JDBC中鏈接器爲UJESSQLConnection,實現了java.sql.connection接口。註冊驅動以後,能夠經過傳入指定的數據庫鏈接路徑,用戶名和密碼即可獲取數據庫鏈接對象。
conn = (UJESSQLConnection) DriverManager  .getConnection("jdbc:linkis://hostname:port","username","password")
DriverManager 的getConnection方法將傳入的參數進行處理和轉換,調用Driver的connect方法,再將參數傳入UJESSQLConnection的構造器中初始化,返回給用戶。下面是UJESSQLDriver實現的的connect方法:
override def connect(url: String, info: Properties): Connection = if(acceptsURL(url)) { val props = if(info != null) info else newProperties props.putAll(parseURL(url)) val ujesClient =UJESClientFactory.getUJESClient(props) new UJESSQLConnection(ujesClient, props)} else throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)
private def parseURL(url: String): Properties= { val props = new Properties //add an entry to get url props.setProperty("URL", url) url match { case URL_REGEX(host, port, db, params)=> if(StringUtils.isNotBlank(host))props.setProperty(HOST, host) if(StringUtils.isNotBlank(port))props.setProperty(PORT, port.substring(1)) if(StringUtils.isNotBlank(db)&& db.length > 1) props.setProperty(DB_NAME, db.substring(1)) if(StringUtils.isNotBlank(params)&& params.length > 1) { val _params = params.substring(1) val kvs =_params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).filter { case Array(USER, value) => props.setProperty(USER, value) false case Array(PASSWORD, value) => props.setProperty(PASSWORD,value) false case Array(key, _) => if(StringUtils.isBlank(key)) { throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } else true case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } props.setProperty(PARAMS,kvs.map(_.mkString(KV_SPLIT)).mkString(PARAM_SPLIT)) } case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url) } props}

在構造好鏈接參數以後,會調用UJESClientFactory.getUJESClient(prop)方法建立一個新的ujesclient的linkis訪問客戶端,用於提交和查詢linkis任務。
  
def getUJESClient(props: Properties):UJESClient = { val host = props.getProperty(HOST) val port = props.getProperty(PORT) val serverUrl = if(StringUtils.isNotBlank(port))s"http://$host:$port" else "http://" + host if(ujesClients.containsKey(serverUrl))ujesClients.get(serverUrl) else serverUrl.intern synchronized { if(ujesClients.containsKey(serverUrl))return ujesClients.get(serverUrl) val ujesClient =createUJESClient(serverUrl, props) ujesClients.put(serverUrl, ujesClient) ujesClient }}
將建立好的ujesclient和數據庫參數props傳入UJESSQLConnection的構造器,最終獲得一個完整的UJESSQLConnection對象。

(3)執行對象UJESSQLStatement/UJESSQLPreStatement

執行對象在整個JDBC鏈接和使用的生命週期中,屬於請求保存者和執行者的身份,每一個JDBC鏈接器在生成以後,能夠經過調用鏈接器Connection的createStatement方法獲取執行對象,相似地,也能夠經過Connection的prepareStatement方法獲取預執行對象。
     
//獲取執行對象UJESSQLStatementstatement = (UJESSQLStatementCon) conn.createStatement;//獲取預執行對象UJESSQLPrepareStatementpreStatement = (UJESSQLPrepareStatement) conn.prePareStatement;
UJESSQLStatement中最爲重要的方法execute做爲提交SQL任務執行的入口,任務提交的執行主流程以下:
Step1 調用hook修改sql。
Step2 生成用於提交linkis任務的action。
Step3 利用ujes客戶端提交job到linkis執行。
Step4 檢測job的狀態翻轉。
Step5 獲取結果集ResultSet。

UJESSQLStatement中提交任務執行的Execute方法的代碼:
override defexecute(sql: String): Boolean = throwWhenClosed { var parsedSQL = sql //預執行hook,轉換不支持的sql語句 JDBCDriverPreExecutionHook.getPreExecutionHooks.foreach{ preExecution => parsedSQL = preExecution.callPreExecutionHook(parsedSQL) } //獲取linkis的job執行器,建立用於執行的action任務 val action =JobExecuteAction.builder().setEngineType(EngineType.SPARK).addExecuteCode(parsedSQL) .setCreator(ujesSQLConnection.creator).setUser(ujesSQLConnection.user) if(ujesSQLConnection.variableMap.nonEmpty)action.setVariableMap(JavaConversions.mapAsJavaMap(ujesSQLConnection.variableMap)) //提交SQL任務到ujes客戶端執行 jobExecuteResult =ujesSQLConnection.ujesClient.execute(action.build()) queryEnd = false //job狀態檢測 var status =ujesSQLConnection.ujesClient.status(jobExecuteResult) val atMost = if(queryTimeout > 0) Duration(queryTimeout,TimeUnit.MILLISECONDS) else Duration.Inf if(!status.isCompleted)Utils.tryThrow{ Utils.waitUntil(() =>{ status =ujesSQLConnection.ujesClient.status(jobExecuteResult) status.isCompleted ||closed }, atMost, 100, 10000) } { case t: TimeoutException=> if(queryTimeout >0) clearQuery() newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t) case t => t } if(!closed) { var jobInfo =ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult) if(status.isFailed)throw new ErrorException(jobInfo.getRequestPersistTask.getErrCode,jobInfo.getRequestPersistTask.getErrDesc) val jobInfoStatus =jobInfo.getJobStatus if(!jobInfoStatus.equals("Succeed")) Utils.tryThrow{ Utils.waitUntil(()=> { jobInfo = ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult) val state =jobInfo.getJobStatus match{ case"Failed" | "Cancelled" | "Timeout" |"Succeed" => true case _ => false } state || closed }, atMost, 100, 10000) } { case t:TimeoutException => if(queryTimeout >0) clearQuery() newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t) case t => t } //獲取結果集 val resultSetList =jobInfo.getResultSetList(ujesSQLConnection.ujesClient) queryEnd = true if(resultSetList !=null) { resultSet = newUJESSQLResultSet(resultSetList, this, maxRows, fetchSize) true } else false } else throw newUJESSQLException(UJESSQLErrorCode.STATEMENT_CLOSED, "Statement isclosed.")}
一樣的UJESPrepareStatement中的excute方法繼承自Statement,原理一致。

UJESPrepareStatement與UJESStatement的不一樣之處在於,statement每次執行sql語句,相關數據庫都要執行sql語句的編譯,preparedstatement是預編譯的, 且支持批處理。

(4)結果集UJESSQLResultSet
當用戶提交完SQL任務到Linkis後,會檢測用戶的job是否已經完成,完成時調用getResultSet的方法獲取結果集UJESSQLResultSet。

在UJESSQLResultSet初次加載的時候,java虛擬機會調用初始化的init()方法,該方法會執行三個初始化的步驟,用於構建結果集:
Step1:經過resultSetResultInit方法設置獲取結果集相關的參數,如當前用戶和結果集路徑,而後經過ujesClient拿到結果集。
Step2:經過metaDataInit方法獲取結果集的元數據。
Step3:經過resultSetInit方法獲取結果集的內容。
private def init(): Unit = { resultSetResultInit() metaDataInit() resultSetInit()} private def resultSetResultInit(): Unit = { if (path == null) path =getResultSetPath(resultSetList) val user =connection.getProps.getProperty("user") if(StringUtils.isNotBlank(path)){ val resultAction =ResultSetAction.builder.setUser(user).setPath(path).build() resultSetResult =connection.ujesClient.resultSet(resultAction) }} private def metaDataInit(): Unit = { if ( null ==resultSetResult ){ return } metaData =resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]] for(cursor <- 1 tometaData.size()){ val col =metaData.get(cursor - 1) resultSetMetaData.setColumnNameProperties(cursor,col.get("columnName")) resultSetMetaData.setDataTypeProperties(cursor,col.get("dataType")) resultSetMetaData.setCommentPropreties(cursor,col.get("comment")) }} private def resultSetInit(): Unit = { if ( null ==resultSetResult ){ return } resultSetRow =resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]]}

UJESSQLResultSet中的next()方法將currentRowCursor做爲移動遊標。每次從結果集中讀取數據後都會相應地更新遊標的位置。若是next方法返回true,則能夠調用getXXX()方法獲取相關字段數據,反之則說明當前遊標並未指向一條有效記錄,讀取過程直接結束。
override def next(): Boolean = { if(metaData == null)init() currentRowCursor += 1 if(null == resultSetRow ||currentRowCursor > resultSetRow.size()-1) false else{ updateCurrentRow(currentRowCursor) true }}

當next() 方法返回true時,會相應地調用getXXX()方法讀取數據。以getString()方法爲例:
override def getString(columnIndex: Int): String = { val any = getColumnValue(columnIndex) if(wasNull()) { throw newUJESSQLException(UJESSQLErrorCode.RESULTSET_ROWERROR, "Type is null") }else{ any.asInstanceOf[String] }}

該方法調用getColumnValue讀取數據,並將其轉化爲String類型的值返回。

(5)錯誤碼方案

爲了便於用戶利用Linkis JDBC提交SQL執行,減小錯誤排查時間,咱們在Linkis JDBC中對常見的錯誤生成了錯誤碼,儘可能覆蓋整個JDBC提交和執行過程。錯誤碼編號範圍初步設定在80000~80100,常見的錯誤類型包括:參數類型錯誤、方法暫不支持、操做邏輯出錯以及返回類型錯誤等。
public enum UJESSQLErrorCode {  BAD_URL(80000,"badurl"), NOSUPPORT_DRIVER(80001,"this method not supported in driver"),  NOSUPPORT_CONNECTION(80002,"this method not supported in connection"), NOSUPPORT_STATEMENT(80003,"this method not supported instatement"), CONNECTION_CLOSED(80004,"Connection is closed!"), STATEMENT_CLOSED(80005,"statement is closed!"),  SCHEMA_EMPTY(80006,"schemais empty!"), SCHEMA_FAILED(80007,"Get schema failed!"), QUERY_TIMEOUT(80008,"query has been timeout!"), FILETYPE_ERROR(80009,"file type error"), METADATATYPE_ERROR(80010,"metadata type error"),  NOSUPPORT_METADATA(80011"thismethod not supported in DatabaseMetaData"), NOPERMITION(80012,"This user has no permission to read thisfile!"),  PARAMS_NOT_FOUND(80013,"Parameter not found"), ERRORINFO_FROM_JOBINFO(80014,"get errorinfo from jobInfo"),  RESULTSET_ROWERROR(80015,"rowmessage error"), NOSUPPORT_RESULTSET(80016,"this method not supported inresultSet"), RESULTSET_NULL(80017,"resultset is null,try to run next() firstlyto init ResultSet and MetaData"), PREPARESTATEMENT_TYPEERROR(80018,"parameter type error"), METADATA_EMPTY(80019,"data is empty") ; private String msg; private int code;
UJESSQLErrorCode(intcode,String msg) { this.code = code; this.msg = msg; }
public String getMsg() { return msg; }
public int getCode() { return code; }}


04

實現方案總結


    Linkis JDBC模塊設計的初衷是爲了用戶可以方便的經過JDBC的方式提交SQL任務到LInkis執行,在實現的過程當中,咱們參考了文章[1]進行初步的框架設計,實現過程當中對於任務的提交和封裝參考了Linkis ujes中與job相關的文檔,閱讀了一些JDBC相關的文章[2][3]。設計的過程當中仍有一部分非必要的接口沒有實現,這是參考Kylin、Hive等項目中JDBC模塊設計綜合考量後的結果,在不影響使用效果的前提降低低開發成本。

    Linkis自己做爲大數據產品的鏈接器,具備強大的集成和可拓展性,JDBC模塊也是Linkis的向外兼容的一個具體實現,期待服務於社區一歲多的Linkis可以茁壯成長,在你們的共同栽培下枝繁葉茂。

     

    05

    參考文獻


    [1] create-your-own-type-3-jdbc-driver
    https://www.javaworld.com/article/2074249/create-your-own-type-3-jdbc-driver--part-1.html
    [2] Java JDBC的優雅設計
    https://blog.csdn.net/yisizhu/article/details/104025220
    [3] Class.forName加載JDBC驅動程序時,底層都作了些什麼???
    https://www.cnblogs.com/liuxianan/archive/2012/08/04/2623258.html

    WeDataSphere,BIG DATA MADE EASY.

    用心作一個有溫度的開源社區

    ~歡迎關注~


    掃碼關注咱們

    微信號公衆號 : WeDataSphere

    GitHub:WeDataSphere

    若是喜歡咱們的產品或文章,請給咱們的GitHub點上你寶貴的star和fork哦~~


    本文分享自微信公衆號 - WeDataSphere(gh_273e85fce73b)。
    若有侵權,請聯繫 support@oschina.cn 刪除。
    本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

    相關文章
    相關標籤/搜索