目錄
javascript
![](http://static.javashuo.com/static/loading.gif)
01html
—前端
背景介紹java
![](http://static.javashuo.com/static/loading.gif)
02mysql
—web
使用介紹sql
(1)引入依賴模塊
<dependency> <groupId>com.webank.wedatasphere.linkis</groupId> <artifactId>linkis-ujes-jdbc</artifactId> <version>0.9.1</version> </dependency>
(2)創建測試類
public static void main(String[] args) throws SQLException, ClassNotFoundException { //1. 加載驅動類: Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver"); //2. 得到鏈接:jdbc:linkis://gatewayIP:gatewayPort 賬號和密碼對應前端的賬號密碼 Connection connection = DriverManager.getConnection("jdbc:linkis://127.0.0.1:9001","username","password"); //3. 建立statement 和執行查詢 Statement st= connection.createStatement(); ResultSet rs=st.executeQuery("show tables"); //4.處理數據庫的返回結果(使用ResultSet類) while (rs.next()) { ResultSetMetaData metaData = rs.getMetaData(); for (int i = 1; i <= metaData.getColumnCount(); i++) { System.out.print(metaData.getColumnName(i) + ":" +metaData.getColumnTypeName(i)+": "+ rs.getObject(i) + " "); } System.out.println(); } //關閉資源 rs.close(); st.close(); connection.close();}
圖 2-1 Linkis JDBC 任務執行結果
![]()
03typescript
—數據庫
模塊設計方案微信
UJESSQLDriver
UJESSQLConnection
UJESSQLStatement
UJESSQLPreStatement
UJESSQLResultSet
static { try { DriverManager.registerDriver(new UJESSQLDriver()); } catch (SQLException e) { Logger logger = LoggerFactory.getLogger(UJESSQLDriver.class); logger.info("Load driver failed",e); }}
(2)JDBC鏈接器UJESSQLConnection
conn = (UJESSQLConnection) DriverManager .getConnection("jdbc:linkis://hostname:port","username","password")
override def connect(url: String, info: Properties): Connection = if(acceptsURL(url)) { val props = if(info != null) info else newProperties props.putAll(parseURL(url)) val ujesClient =UJESClientFactory.getUJESClient(props) new UJESSQLConnection(ujesClient, props)} else throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)
private def parseURL(url: String): Properties= { val props = new Properties //add an entry to get url props.setProperty("URL", url) url match { case URL_REGEX(host, port, db, params)=> if(StringUtils.isNotBlank(host))props.setProperty(HOST, host) if(StringUtils.isNotBlank(port))props.setProperty(PORT, port.substring(1)) if(StringUtils.isNotBlank(db)&& db.length > 1) props.setProperty(DB_NAME, db.substring(1)) if(StringUtils.isNotBlank(params)&& params.length > 1) { val _params = params.substring(1) val kvs =_params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).filter { case Array(USER, value) => props.setProperty(USER, value) false case Array(PASSWORD, value) => props.setProperty(PASSWORD,value) false case Array(key, _) => if(StringUtils.isBlank(key)) { throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } else true case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } props.setProperty(PARAMS,kvs.map(_.mkString(KV_SPLIT)).mkString(PARAM_SPLIT)) } case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url) } props}
def getUJESClient(props: Properties):UJESClient = { val host = props.getProperty(HOST) val port = props.getProperty(PORT) val serverUrl = if(StringUtils.isNotBlank(port))s"http://$host:$port" else "http://" + host if(ujesClients.containsKey(serverUrl))ujesClients.get(serverUrl) else serverUrl.intern synchronized { if(ujesClients.containsKey(serverUrl))return ujesClients.get(serverUrl) val ujesClient =createUJESClient(serverUrl, props) ujesClients.put(serverUrl, ujesClient) ujesClient }}
(3)執行對象UJESSQLStatement/UJESSQLPreStatement
//獲取執行對象UJESSQLStatementstatement = (UJESSQLStatementCon) conn.createStatement;//獲取預執行對象UJESSQLPrepareStatementpreStatement = (UJESSQLPrepareStatement) conn.prePareStatement;
override defexecute(sql: String): Boolean = throwWhenClosed { var parsedSQL = sql //預執行hook,轉換不支持的sql語句 JDBCDriverPreExecutionHook.getPreExecutionHooks.foreach{ preExecution => parsedSQL = preExecution.callPreExecutionHook(parsedSQL) } //獲取linkis的job執行器,建立用於執行的action任務 val action =JobExecuteAction.builder().setEngineType(EngineType.SPARK).addExecuteCode(parsedSQL) .setCreator(ujesSQLConnection.creator).setUser(ujesSQLConnection.user) if(ujesSQLConnection.variableMap.nonEmpty)action.setVariableMap(JavaConversions.mapAsJavaMap(ujesSQLConnection.variableMap)) //提交SQL任務到ujes客戶端執行 jobExecuteResult =ujesSQLConnection.ujesClient.execute(action.build()) queryEnd = false //job狀態檢測 var status =ujesSQLConnection.ujesClient.status(jobExecuteResult) val atMost = if(queryTimeout > 0) Duration(queryTimeout,TimeUnit.MILLISECONDS) else Duration.Inf if(!status.isCompleted)Utils.tryThrow{ Utils.waitUntil(() =>{ status =ujesSQLConnection.ujesClient.status(jobExecuteResult) status.isCompleted ||closed }, atMost, 100, 10000) } { case t: TimeoutException=> if(queryTimeout >0) clearQuery() newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t) case t => t } if(!closed) { var jobInfo =ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult) if(status.isFailed)throw new ErrorException(jobInfo.getRequestPersistTask.getErrCode,jobInfo.getRequestPersistTask.getErrDesc) val jobInfoStatus =jobInfo.getJobStatus if(!jobInfoStatus.equals("Succeed")) Utils.tryThrow{ Utils.waitUntil(()=> { jobInfo = ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult) val state =jobInfo.getJobStatus match{ case"Failed" | "Cancelled" | "Timeout" |"Succeed" => true case _ => false } state || closed }, atMost, 100, 10000) } { case t:TimeoutException => if(queryTimeout >0) clearQuery() newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t) case t => t } //獲取結果集 val resultSetList =jobInfo.getResultSetList(ujesSQLConnection.ujesClient) queryEnd = true if(resultSetList !=null) { resultSet = newUJESSQLResultSet(resultSetList, this, maxRows, fetchSize) true } else false } else throw newUJESSQLException(UJESSQLErrorCode.STATEMENT_CLOSED, "Statement isclosed.")}
private def init(): Unit = { resultSetResultInit() metaDataInit() resultSetInit()} private def resultSetResultInit(): Unit = { if (path == null) path =getResultSetPath(resultSetList) val user =connection.getProps.getProperty("user") if(StringUtils.isNotBlank(path)){ val resultAction =ResultSetAction.builder.setUser(user).setPath(path).build() resultSetResult =connection.ujesClient.resultSet(resultAction) }} private def metaDataInit(): Unit = { if ( null ==resultSetResult ){ return } metaData =resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]] for(cursor <- 1 tometaData.size()){ val col =metaData.get(cursor - 1) resultSetMetaData.setColumnNameProperties(cursor,col.get("columnName")) resultSetMetaData.setDataTypeProperties(cursor,col.get("dataType")) resultSetMetaData.setCommentPropreties(cursor,col.get("comment")) }} private def resultSetInit(): Unit = { if ( null ==resultSetResult ){ return } resultSetRow =resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]]}
override def next(): Boolean = { if(metaData == null)init() currentRowCursor += 1 if(null == resultSetRow ||currentRowCursor > resultSetRow.size()-1) false else{ updateCurrentRow(currentRowCursor) true }}
override def getString(columnIndex: Int): String = { val any = getColumnValue(columnIndex) if(wasNull()) { throw newUJESSQLException(UJESSQLErrorCode.RESULTSET_ROWERROR, "Type is null") }else{ any.asInstanceOf[String] }}
(5)錯誤碼方案
public enum UJESSQLErrorCode { BAD_URL(80000,"badurl"), NOSUPPORT_DRIVER(80001,"this method not supported in driver"), NOSUPPORT_CONNECTION(80002,"this method not supported in connection"), NOSUPPORT_STATEMENT(80003,"this method not supported instatement"), CONNECTION_CLOSED(80004,"Connection is closed!"), STATEMENT_CLOSED(80005,"statement is closed!"), SCHEMA_EMPTY(80006,"schemais empty!"), SCHEMA_FAILED(80007,"Get schema failed!"), QUERY_TIMEOUT(80008,"query has been timeout!"), FILETYPE_ERROR(80009,"file type error"), METADATATYPE_ERROR(80010,"metadata type error"), NOSUPPORT_METADATA(80011, "thismethod not supported in DatabaseMetaData"), NOPERMITION(80012,"This user has no permission to read thisfile!"), PARAMS_NOT_FOUND(80013,"Parameter not found"), ERRORINFO_FROM_JOBINFO(80014,"get errorinfo from jobInfo"), RESULTSET_ROWERROR(80015,"rowmessage error"), NOSUPPORT_RESULTSET(80016,"this method not supported inresultSet"), RESULTSET_NULL(80017,"resultset is null,try to run next() firstlyto init ResultSet and MetaData"), PREPARESTATEMENT_TYPEERROR(80018,"parameter type error"), METADATA_EMPTY(80019,"data is empty") ; private String msg; private int code;
UJESSQLErrorCode(intcode,String msg) { this.code = code; this.msg = msg; }
public String getMsg() { return msg; }
public int getCode() { return code; }}
![](http://static.javashuo.com/static/loading.gif)
04
—
實現方案總結
![](http://static.javashuo.com/static/loading.gif)
05
—
參考文獻
WeDataSphere,BIG DATA MADE EASY.
用心作一個有溫度的開源社區
~歡迎關注~
![](http://static.javashuo.com/static/loading.gif)
掃碼關注咱們
微信號公衆號 : WeDataSphere
GitHub:WeDataSphere
若是喜歡咱們的產品或文章,請給咱們的GitHub點上你寶貴的star和fork哦~~
本文分享自微信公衆號 - WeDataSphere(gh_273e85fce73b)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。