restapi(8)- restapi-sql:用戶自主的服務

  學習函數式編程初衷是看到本身熟悉的oop編程語言和sql數據庫在現代商業社會中前景暗淡,準備徹底放棄windows技術棧轉到分佈式大數據技術領域的。可是在現實中理想老是不如人意,原本想在一個規模較小的公司展展拳腳,覺得小公司會少點歷史包袱,有利於全面技術改造。但現實是:即便是小公司,一旦有個成熟的產品,那麼進行全面的技術更新基本上是不可能的了,由於公司要生存,開發人員很難新舊技術之間隨時切換。除非有狂熱的熱情,員工怠慢甚至抵制情緒不容易解決。只能採起逐步切換方式:保留原有產品的後期維護不動,新產品開發用一些新的技術。在咱們這裏的狀況就是:之前一堆c#、sqlserver的東西必須保留,新的功能好比大數據、ai、識別等必須用新的手段如scala、python、dart、akka、kafka、cassandra、mongodb來開發。好了,新舊兩個開發平臺之間的軟件系統對接又變成了一個問題。前端

   如今咱們這裏有個需求:把在linux-ubuntu akka-cluster集羣環境裏mongodb裏數據處理的結果傳給windows server下SQLServer裏。這是一種典型的異系統集成場景。個人解決方案是經過一個restapi服務做爲兩個系統的數據橋樑,這個restapi的最基本要求是:java

一、支持任何操做系統前端:這個沒什麼問題,在http層上經過json交換數據python

二、能讀寫mongodb:在前面討論的restapi-mongo已經實現了這一功能mysql

三、能讀寫windows server環境下的sqlserver:這個是本篇討論的主題linux

四、用戶可以比較方便的對平臺數據庫進行操做,最好免去先後雙方每類操做都須要進行協定model這一過程,也就是能達到用戶隨意調用服務sql

前面曾經實現了一個jdbc-engine項目,基於scalikejdbc,不過只示範了slick-h2相關的功能。如今須要sqlserver-jdbc驅動,而後試試能不能在JVM裏驅動windows下的sqlserver。maven裏找不到sqlserver的驅動,但從微軟官網能夠下載mssql-jdbc-7.0.0.jre8.jar。這是個jar,在sbt裏稱做unmanagedjar,不能擺在build.sbt的dependency裏。這個須要擺在項目根目錄下的lib目錄下便可(也能夠在放在build.sbt裏unmanagedBase :=?? 指定的路徑下)。而後是數據庫鏈接,下面是可使用sqlserver的application.conf配置文件內容:mongodb

# JDBC settings prod { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mssql { driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" url = "jdbc:sqlserver://192.168.11.164:1433;integratedSecurity=false;Connect Timeout=3000" user = "sa" password = "Tiger2020" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true connectionTimeout = 3000 } termtxns { driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=TERMTXNS;integratedSecurity=false;Connect Timeout=3000" user = "sa" password = "Tiger2020" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true connectionTimeout = 3000 } crmdb { driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=CRMDB;integratedSecurity=false;Connect Timeout=3000" user = "sa" password = "Tiger2020" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true connectionTimeout = 3000 } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 }

這個文件裏的mssql,termtxns,crmdb段落都是給sqlserver的,它們都使用hikaricp線程池管理。數據庫

在jdbc-engine裏啓動數據庫方式以下:編程

  ConfigDBsWithEnv("prod").setup('termtxns)
  ConfigDBsWithEnv("prod").setup('crmdb)
  ConfigDBsWithEnv("prod").loadGlobalSettings()

這段打開了在配置文件中用termtxns,crmdb註明的數據庫。json

下面是SqlHttpServer.scala的代碼:

package com.datatech.rest.sql import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import pdi.jwt._ import AuthBase._ import MockUserAuthService._ import com.datatech.sdp.jdbc.config.ConfigDBsWithEnv import akka.actor.ActorSystem import akka.stream.ActorMaterializer import Repo._ import SqlRoute._ object SqlHttpServer extends App { implicit val httpSys = ActorSystem("sql-http-sys") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher ConfigDBsWithEnv("prod").setup('termtxns)
  ConfigDBsWithEnv("prod").setup('crmdb)
  ConfigDBsWithEnv("prod").loadGlobalSettings() implicit val authenticator = new AuthBase() .withAlgorithm(JwtAlgorithm.HS256) .withSecretKey("OpenSesame") .withUserFunc(getValidUser) val route = path("auth") { authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo => post { complete(authenticator.issueJwt(userinfo))} } } ~ pathPrefix("api") { authenticateOAuth2(realm = "api", authenticator.authenticateToken) { token =>
          new SqlRoute("sql", token)(new JDBCRepo) .route // ~ ...
 } } val (port, host) = (50081,"192.168.11.189") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }

服務入口在http://mydemo.com/api/sql,服務包括get,post,put三類,參考這個SqlRoute:

package com.datatech.rest.sql import akka.http.scaladsl.server.Directives import akka.stream.ActorMaterializer import akka.http.scaladsl.model._ import akka.actor.ActorSystem import com.datatech.rest.sql.Repo.JDBCRepo import akka.http.scaladsl.common._ import spray.json.DefaultJsonProtocol import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport trait JsFormats extends SprayJsonSupport with DefaultJsonProtocol object JsConverters extends JsFormats { import SqlModels._ implicit val brandFormat = jsonFormat2(Brand) implicit val customerFormat = jsonFormat6(Customer) } object SqlRoute { import JsConverters._ implicit val jsonStreamingSupport = EntityStreamingSupport.json() .withParallelMarshalling(parallelism = 8, unordered = false) class SqlRoute(val pathName: String, val jwt: String)(repo: JDBCRepo)( implicit sys: ActorSystem, mat: ActorMaterializer) extends Directives with JsonConverter { val route = pathPrefix(pathName) { path(Segment / Remaining) { case (db, tbl) => (get & parameter('sqltext)) { sql => {
          val rsc = new RSConverter val rows = repo.query[Map[String,Any]](db, sql, rsc.resultSet2Map) complete(rows.map(m => toJson(m))) } } ~ (post & parameter('sqltext)) { sql =>
              entity(as[String]){ json => repo.batchInsert(db,tbl,sql,json) complete(StatusCodes.OK) } } ~ put { entity(as[Seq[String]]) { sqls => repo.update(db, sqls) complete(StatusCodes.OK) } } } } } }

jdbc-engine的特色是能夠用字符類型的sql語句來操做。因此咱們能夠經過傳遞字符串型的sql語句來實現服務調用,使用門檻低,方便通用。restapi-sql提供的是對服務器端sqlserver的普通操做,包括讀get,寫入post,更改put。這些sqlserver操做部分是在JDBCRepo裏的:

package com.datatech.rest.sql import com.datatech.sdp.jdbc.engine.JDBCEngine._ import com.datatech.sdp.jdbc.engine.{JDBCQueryContext, JDBCUpdateContext} import scalikejdbc._ import akka.stream.ActorMaterializer import com.datatech.sdp.result.DBOResult.DBOResult import akka.stream.scaladsl._ import scala.concurrent._ import SqlModels._ object Repo { class JDBCRepo(implicit ec: ExecutionContextExecutor, mat: ActorMaterializer) { def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = { //construct the context
      val ctx = JDBCQueryContext( dbName = Symbol(db), statement = sqlText ) jdbcAkkaStream(ctx,toRow) } def query(db: String, tbl: String, sqlText: String) = { //construct the context
      val ctx = JDBCQueryContext( dbName = Symbol(db), statement = sqlText ) jdbcQueryResult[Vector,RS](ctx,getConverter(tbl)).toFuture[Vector[RS]] } def update(db: String, sqlTexts: Seq[String]): DBOResult[Seq[Long]] = { val ctx = JDBCUpdateContext( dbName = Symbol(db), statements = sqlTexts ) jdbcTxUpdates(ctx) } def bulkInsert[P](db: String, sqlText: String, prepParams: P => Seq[Any], params: Source[P,_]) = { val insertAction = JDBCActionStream( dbName = Symbol(db), parallelism = 4, processInOrder = false, statement = sqlText, prepareParams = prepParams ) params.via(insertAction.performOnRow).to(Sink.ignore).run() } def batchInsert(db: String, tbl: String, sqlText: String, jsonParams: String):DBOResult[Seq[Long]] = { val ctx = JDBCUpdateContext( dbName = Symbol(db), statements = Seq(sqlText), batch = true, parameters = getSeqParams(jsonParams,sqlText) ) jdbcBatchUpdate[Seq](ctx) } } import monix.execution.Scheduler.Implicits.global
  implicit class DBResultToFuture(dbr: DBOResult[_]){ def toFuture[R] = { dbr.value.value.runToFuture.map { eor => eor match { case Right(or) => or match { case Some(r) => r.asInstanceOf[R] case None => throw new RuntimeException("Operation produced None result!") } case Left(err) => throw new RuntimeException(err) } } } } }

讀query部分即 def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = {...} 這個函數返回Source[R,Any],下面咱們好好談談這個R:R是讀的結果,一般是某個類或model,好比讀取Person記錄返回一組Person類的實例。這裏有一種強類型的感受。一開始我也是隨大流堅持建model後用toJson[E],fromJson[E]這樣作線上數據轉換。如今的問題是restapi-sql是一項公共服務,使用者知道sqlserver上有些什麼表,而後但願經過sql語句來從這些表裏讀取數據。這些sql語句可能超出表的界限如sql join, union等,若是咱們堅持每一個返回結果都必須有個對應的model,那麼顯然就會犧牲這個服務的通用性。實際上,http線上數據交換自己就不多是強類型的,由於通過了json轉換。對於json轉換來講,只要求字段名稱、字段類型對稱就好了。至於從什麼類型轉換成了另外一個什麼類型都沒問題。因此,字段名+字段值的表現形式不就是Map[K,V]嗎,咱們就用Map[K,V]做爲萬能model就好了,沒人知道。也就是說用戶方經過sql語句指定返回的字段名稱,它們多是任何類型Any,具體類型天然會由數據庫補上。服務方從數據庫讀取結果ResultSet後轉成Map[K,V]而後再轉成json返回給用戶,用戶能夠用Map[String,Any]信息產生任何類型,這就是自主。好,就來看看如何將ResultSet轉成Map[String,Any]:

package com.datatech.rest.sql
import scalikejdbc._
import java.sql.ResultSetMetaData
class RSConverter {
  import RSConverterUtil._
  var rsMeta: ResultSetMetaData = _
  var columnCount: Int = 0
  var rsFields: List[(String,String)] = List[(String,String)]()

  def getFieldsInfo:List[(String,String)] =
    ( 1 until columnCount).foldLeft(List[(String,String)]()) {
    case (cons,i) =>
      (rsMeta.getColumnLabel(i) -> rsMeta.getColumnTypeName(i)) :: cons
  }
  def resultSet2Map(rs: WrappedResultSet): Map[String,Any] = {
    if(columnCount == 0) {
      rsMeta =  rs.underlying.getMetaData
      columnCount = rsMeta.getColumnCount
      rsFields = getFieldsInfo
    }
    rsFields.foldLeft(Map[String,Any]()) {
      case (m,(n,t)) =>
        m + (n -> rsFieldValue(n,t,rs))
    }
  }
}
object RSConverterUtil {
  import scala.collection.immutable.TreeMap
  def map2Params(stm: String, m: Map[String,Any]): Seq[Any] = {
    val sortedParams = m.foldLeft(TreeMap[Int,Any]()) {
      case (t,(k,v)) => t + (stm.indexOfSlice(k) -> v)
    }
    sortedParams.map(_._2).toSeq
  }
  def rsFieldValue(fldname: String, fldType: String, rs: WrappedResultSet): Any = fldType match {
    case "LONGVARCHAR" => rs.string(fldname)
    case "VARCHAR" => rs.string(fldname)
    case "CHAR" => rs.string(fldname)
    case "BIT" => rs.boolean(fldname)
    case "TIME" => rs.time(fldname)
    case "TIMESTAMP" => rs.timestamp(fldname)
    case "ARRAY" => rs.array(fldname)
    case "NUMERIC" => rs.bigDecimal(fldname)
    case "BLOB" => rs.blob(fldname)
    case "TINYINT" => rs.byte(fldname)
    case "VARBINARY" => rs.bytes(fldname)
    case "BINARY" => rs.bytes(fldname)
    case "CLOB" => rs.clob(fldname)
    case "DATE" => rs.date(fldname)
    case "DOUBLE" => rs.double(fldname)
    case "REAL" => rs.float(fldname)
    case "FLOAT" => rs.float(fldname)
    case "INTEGER" => rs.int(fldname)
    case "SMALLINT" => rs.int(fldname)
    case "Option[Int]" => rs.intOpt(fldname)
    case "BIGINT" => rs.long(fldname)
  }
}

這段主要功能是將JDBC的ResultSet轉換成Map[String,Any]。在前面討論的restapi-mongo咱們能夠進行Document到Map[String,Any]的轉換以實現一樣的目的。

下面是個調用query服務的例子:

    val getAllRequest = HttpRequest( HttpMethods.GET, uri = "http://192.168.11.189:50081/api/sql/termtxns/brand?sqltext=SELECT%20*%20FROM%20BRAND", ).addHeader(authentication) (for { response <- Http().singleRequest(getAllRequest) json <- Unmarshal(response.entity).to[String] } yield message).andThen { case Success(msg) => println(s"Received json collection: $json") case Failure(err) => println(s"Error: ${err.getMessage}") }

特色是我只須要提供sql語句,服務就會返回一個json數組,而後我怎麼把json轉成任何類型就隨我高興了。

再看看post服務:在這裏但願實現一種批次型插入表的功能,好比從一個數據表裏把數據搬到另一個表。通常來說在jdbc操做裏首先得提供一個模版,如:insert into person(fullname,code) values(?,?),而後經過提供一組參數值來實現批次插入。固然,爲安全起見,咱們仍是須要肯定正確的參數位置,這個能夠從sql語句裏獲取:

  def map2Params(stm: String, m: Map[String,Any]): Seq[Any] = { val sortedParams = m.foldLeft(TreeMap[Int,Any]()) { case (t,(k,v)) => t + (stm.toUpperCase.indexOfSlice(k.toUpperCase) -> v) } sortedParams.map(_._2).toSeq } def getSeqParams(json: String, sql: String): Seq[Seq[Any]] = { val seqOfjson = fromJson[Seq[String]](json) val prs = seqOfjson.map(fromJson[Map[String,Any]]) prs.map(RSConverterUtil.map2Params(sql,_)) }

下面是個批次插入的示範代碼:

    val encodedSelect = URLEncode.encode("select id as code, name as fullname from members") val encodedInsert = URLEncode.encode("insert into person(fullname,code) values(?,?)") val getMembers = HttpRequest( HttpMethods.GET, uri = "http://192.168.0.189:50081/api/sql/h2/members?sqltext="+encodedSelect ).addHeader(authentication) val postRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.0.189:50081/api/sql/h2/person?sqltext="+encodedInsert, ).addHeader(authentication) (for { _ <- update("http://192.168.0.189:50081/api/sql/h2/person",Seq(createCTX)) respMembers <- Http().singleRequest(getMembers) message <- Unmarshal(respMembers.entity).to[String] reqEntity <- Marshal(message).to[RequestEntity] respInsert <- Http().singleRequest(postRequest.copy(entity = reqEntity)) // HttpEntity(ContentTypes.`application/json`,ByteString(message))))
    } yield respInsert).onComplete { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => println("builk insert successful!") case Success(_) => println("builk insert failed!") case Failure(err) => println(s"Error: ${err.getMessage}") }

你看,我特別把參數值清單裏字段位置和insert sql裏字段前後位置顛倒了,但仍是獲得正確的結果。

最後是put:這是爲批次型的事物處理設計的。接受一條或者多條無參數sql指令,多條指令會在一個事物中執行。具體使用方式以下:

    def update(url: String, cmds: Seq[String])(implicit token: Authorization): Future[HttpResponse] =
    for { reqEntity <- Marshal(cmds).to[RequestEntity] response <- Http().singleRequest(HttpRequest( method=HttpMethods.PUT,uri=url,entity=reqEntity) .addHeader(token)) } yield response

在上面的討論裏介紹了基於sqlserver的rest服務,與前面討論的restapi-mongo從原理上區別並不大,重點是實現了用戶主導的數據庫操做。

相關文章
相關標籤/搜索