SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

  ScalikeJDBC在覆蓋JDBC基本功能上是比較完整的,並且實現這些功能的方式比較簡潔,運算效率方面天然會稍高一籌了。理論上用ScalikeJDBC做爲一種JDBC-Engine仍是比較理想的:讓它處於各類JDBC工具庫和數據庫實例之間接收JDBC運算指令而後鏈接目標數據庫進行相關運算後返回結果。通常來講,各類JDBC工具庫如ORM,FRM軟件經過各自的DSL在複雜的數據庫表關係環境內進行數據管理編程,最終產生相關的SQL語句即(prepared)statement+parameters傳遞給指定類型的數據庫JDBC驅動程序去運算併產生結果。若是這樣描述,那麼JDBC-Engine主要的功能就是支持下面這個函數:html

jdbcRunSQL(context: JDBCContext): JDBCResultSet

這個函數的用戶提供一個JDBCContext類型值,而後由jdbcRunSQL進行接下來的運算並返回結果。從這個角度分析,JDBCContext最起碼須要提供下面的屬性: java

一、數據庫鏈接:選擇數據庫鏈接池mysql

二、運算參數:fetchSize, queryTimeout,queryTag。這幾個參數都針對當前運算的SQLsql

三、Query參數:數據庫

    Query類型:select/execute/update、單條/成批、前置/後置query、generateKeyapache

    SQL語句:statement:Seq[String]、parameters: Seq[Option[Seq[Any]]]編程

下面就是JDBCContext類型定義api

import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import scalikejdbc._ object JDBCContext { type SQLTYPE = Int val SQL_SELECT: Int = 0 val SQL_EXECUTE = 1 val SQL_UPDATE = 2 def returnColumnByIndex(idx: Int) = Some(idx) def returnColumnByName(col: String) = Some(col) } case class JDBCContext( dbName: Symbol, statements: Seq[String], parameters: Seq[Seq[Any]] = Nil, fetchSize: Int = 100, queryTimeout: Option[Int] = None, queryTags: Seq[String] = Nil, sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, batch: Boolean = false, returnGeneratedKey: Option[Any] = None, // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None, postAction: Option[PreparedStatement => Unit] = None)
從新考慮了一下,覺着把jdbc讀寫分開兩個函數來實現更容易使用,由於這樣比較符合編程模式和習性。因此最好把sqlType=SQL_SELECT類型SQL獨立一個函數出來運算:
   def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( ctx: JDBCContext, rowConverter: WrappedResultSet => A)( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { ctx.sqlType match { case SQL_SELECT => { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) ctx.queryTags.foreach(rawSql.tags(_)) rawSql.fetchSize(ctx.fetchSize) implicit val session = NamedAutoSession(ctx.dbName) val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter) sql.collection.apply[C]() } case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!") } }

還須要提供noExtractor函數來符合SQLToCollectionImpl類型的參數款式要求:tomcat

  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }
咱們來測試用一下jdbcQueryResult:
import scalikejdbc._ import JDBCEngine._ import configdbs._ import org.joda.time._ object JDBCQueryDemo extends App { ConfigDBsWithEnv("dev").setupAll() val ctx = JDBCContext( dbName = 'h2,
    statements = Seq("select * from members where id = ?"), parameters = Seq(Seq(2)) ) //data model
  case class Member( id: Long, name: String, description: Option[String] = None, birthday: Option[LocalDate] = None, createdAt: DateTime) //data row converter
  val toMember = (rs: WrappedResultSet) => Member( id = rs.long("id"), name = rs.string("name"), description = rs.stringOpt("description"), birthday = rs.jodaLocalDateOpt("birthday"), createdAt = rs.jodaDateTime("created_at") ) val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember) println(s"members in vector: $vecMember") val ctx1 = ctx.copy(dbName = 'mysql)
 val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")}) println(s"selected name: $names") val ctx2 = ctx1.copy(dbName = 'postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))}) println(s"selected id+name: $idname") }

若是咱們使用Slick-DSL進行數據庫管理編程後應該如何與JDBC-Engine對接:session

 object SlickDAO { import slick.jdbc.H2Profile.api._ case class CountyModel(id: Int, name: String) case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) def name = column[String]("NAME",O.Length(64)) def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) } val CountyQuery = TableQuery[CountyTable] val filter = "Kansas" val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} val statement = qry.result.statements.head } import SlickDAO._ val slickCtx = JDBCContext( dbName = 'h2,
    statements = Seq(statement), ) val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{ rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))}) vecCounty.foreach(r => println(s"${r.id},${r.name}"))

輸出正確。

下面就是本次示範的源代碼:

 build.sbt

name := "learn-scalikeJDBC" version := "0.1" scalaVersion := "2.12.4"

// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq( "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0", "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test", "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0", "com.h2database"  %  "h2"                % "1.4.196", "mysql" % "mysql-connector-java" % "6.0.6", "org.postgresql" % "postgresql" % "42.2.0", "commons-dbcp" % "commons-dbcp" % "1.4", "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", "com.zaxxer" % "HikariCP" % "2.7.4", "com.jolbox" % "bonecp" % "0.8.0.RELEASE", "com.typesafe.slick" %% "slick" % "3.2.1", "ch.qos.logback"  %  "logback-classic"   % "1.2.3" )

resources/application.conf 包括H2,MySQL,PostgreSQL

# JDBC settings test { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "commons-dbcp2" } } db.mysql.driver = "com.mysql.cj.jdbc.Driver" db.mysql.url = "jdbc:mysql://localhost:3306/testdb" db.mysql.user = "root" db.mysql.password = "123" db.mysql.poolInitialSize = 5 db.mysql.poolMaxSize = 7 db.mysql.poolConnectionTimeoutMillis = 1000 db.mysql.poolValidationQuery = "select 1 as one" db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } dev { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "bonecp" } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10

HikariConfig.scala  HikariCP鏈接池實現

package configdbs import scala.collection.mutable import scala.concurrent.duration.Duration import scala.language.implicitConversions import com.typesafe.config._ import java.util.concurrent.TimeUnit import java.util.Properties import scalikejdbc.config._ import com.typesafe.config.Config import com.zaxxer.hikari._ import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal { import scala.collection.JavaConverters._ def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default def getDurationOr(path: String, default: => Duration = Duration.Zero) =
    if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String, default: => Properties = null): Properties =
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = { def toProps(m: mutable.Map[String, ConfigValue]): Properties = { val props = new Properties(null) m.foreach { case (k, cv) => val v =
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) else if(cv.unwrapped eq null) null
          else cv.unwrapped.toString if(v ne null) props.put(k, v) } props } toProps(c.root.asScala) } def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None def getStringOpt(path: String) = Option(getStringOr(path)) def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) } object ConfigExtensionMethods { @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) } trait HikariConfigReader extends TypesafeConfigReader { self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>
 import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = { val c: Config = config.getConfig(envPrefix + "db." + dbName.name) c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) } def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig() val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings
    if (c.hasPath("dataSourceClass")) { hconf.setDataSourceClassName(c.getString("dataSourceClass")) } else { Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) } hconf.setJdbcUrl(c.getStringOr("url", null)) c.getStringOpt("user").foreach(hconf.setUsername) c.getStringOpt("password").foreach(hconf.setPassword) c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) val numThreads = c.getIntOr("numThreads", 20) hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) hconf.setPoolName(c.getStringOr("poolName", dbName.name)) hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) // Equivalent of ConnectionPreparer
    hconf.setReadOnly(c.getBooleanOr("readOnly", false)) c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) hconf.setCatalog(c.getStringOr("catalog", null)) hconf } } import scalikejdbc._ trait ConfigDBs { self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { getFactoryName(dbName) match { case "hikaricp" => { val hconf = hikariCPConfig(dbName) val hikariCPSource = new HikariDataSource(hconf) if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { Class.forName(hconf.getDriverClassName) } ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource)) } case _ => { val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) val cpSettings = readConnectionPoolSettings(dbName) if (driver != null && driver.trim.nonEmpty) { Class.forName(driver) } ConnectionPool.add(dbName, url, user, password, cpSettings) } } } def setupAll(): Unit = { loadGlobalSettings() dbNames.foreach { dbName => setup(Symbol(dbName)) } } def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { ConnectionPool.close(dbName) } def closeAll(): Unit = { ConnectionPool.closeAll } } object ConfigDBs extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader with EnvPrefix { override val env = Option(envValue) }

JDBCEngine.scala jdbcQueryResult函數實現

import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import scalikejdbc._ object JDBCContext { type SQLTYPE = Int val SQL_SELECT: Int = 0 val SQL_EXECUTE = 1 val SQL_UPDATE = 2 def returnColumnByIndex(idx: Int) = Some(idx) def returnColumnByName(col: String) = Some(col) } case class JDBCContext( dbName: Symbol, statements: Seq[String], parameters: Seq[Seq[Any]] = Nil, fetchSize: Int = 100, queryTimeout: Option[Int] = None, queryTags: Seq[String] = Nil, sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, batch: Boolean = false, returnGeneratedKey: Option[Any] = None, // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None, postAction: Option[PreparedStatement => Unit] = None) object JDBCEngine { import JDBCContext._ private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message) } def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( ctx: JDBCContext, rowConverter: WrappedResultSet => A)( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { ctx.sqlType match { case SQL_SELECT => { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) ctx.queryTags.foreach(rawSql.tags(_)) rawSql.fetchSize(ctx.fetchSize) implicit val session = NamedAutoSession(ctx.dbName) val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter) sql.collection.apply[C]() } case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!") } } }

JDBCQueryDemo.scala  功能測試代碼

import scalikejdbc._ import JDBCEngine._ import configdbs._ import org.joda.time._ object JDBCQueryDemo extends App { ConfigDBsWithEnv("dev").setupAll() val ctx = JDBCContext( dbName = 'h2,
    statements = Seq("select * from members where id = ?"), parameters = Seq(Seq(2)) ) //data model
  case class Member( id: Long, name: String, description: Option[String] = None, birthday: Option[LocalDate] = None, createdAt: DateTime) //data row converter
  val toMember = (rs: WrappedResultSet) => Member( id = rs.long("id"), name = rs.string("name"), description = rs.stringOpt("description"), birthday = rs.jodaLocalDateOpt("birthday"), createdAt = rs.jodaDateTime("created_at") ) val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember) println(s"members in vector: $vecMember") val ctx1 = ctx.copy(dbName = 'mysql)
 val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")}) println(s"selected name: $names") val ctx2 = ctx1.copy(dbName = 'postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))}) println(s"selected id+name: $idname") object SlickDAO { import slick.jdbc.H2Profile.api._ case class CountyModel(id: Int, name: String) case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) def name = column[String]("NAME",O.Length(64)) def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) } val CountyQuery = TableQuery[CountyTable] val filter = "Kansas" val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} val statement = qry.result.statements.head } import SlickDAO._ val slickCtx = JDBCContext( dbName = 'h2,
    statements = Seq(statement), ) val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{ rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))}) vecCounty.foreach(r => println(s"${r.id},${r.name}")) }
相關文章
相關標籤/搜索