SDP(5):ScalikeJDBC- JDBC-Engine:Streaming

  做爲一種通用的數據庫編程引擎,用Streaming來應對海量數據的處理是必備功能。一樣,咱們仍是經過一種Context傳遞產生流的要求。由於StreamingContext比較簡單,並且還涉及到數據抽取函數extractor的傳遞,因此咱們分開來定義:java

case class JDBCQueryContext[M]( dbName: Symbol, statement: String, parameters: Seq[Any] = Nil, fetchSize: Int = 100, autoCommit: Boolean = false, queryTimeout: Option[Int] = None, extractor: WrappedResultSet => M)

因爲咱們會將JDBCQueryContext傳給JDBC-Engine去運算,因此Streaming函數的全部參數都必須明肯定義,包括extractor函數。實際上JDBCQueryContext也徹底知足了jdbcQueryResult函數。咱們會在後面從新設計這個函數。JDBCStreaming函數產生一個akka-Source,以下:mysql

def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.iterator .withDBSessionForceAdjuster(session => { session.connection.setAutoCommit(ctx.autoCommit) session.fetchSize(ctx.fetchSize) }) } Source.fromPublisher[A](publisher) }

 咱們只須要提供一個Sink就可使用這個akka-stream了:sql

import akka.actor._ import akka.stream.scaladsl._ import akka.stream._ import scalikejdbc._ import configdbs._ import jdbccontext._ import JDBCEngine._ object JDBCStreaming extends App { implicit val actorSys = ActorSystem("actor-system") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer() ConfigDBsWithEnv("dev").setup('h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings() case class DataRow(year: String, state: String, county: String, value: String) //data row converter
  val toRow = (rs: WrappedResultSet) => DataRow( year = rs.string("REPORTYEAR"), state = rs.string("STATENAME"), county = rs.string("COUNTYNAME"), value = rs.string("VALUE") ) //construct the context
  val ctx = JDBCQueryContext[DataRow]( dbName = 'h2,
    statement = "select * from AIRQM", extractor = toRow ) //pass context to construct akka-source
  val akkaSource = jdbcAkkaStream(ctx) //a sink for display rows
  val snk = Sink.foreach[(DataRow,Long)] { case (row,idx) => println(s"rec#: $idx - year: ${row.year} location: ${row.state},${row.county} value: ${row.value}")} //can manual terminate stream by kill.shutdown
  val kill: UniqueKillSwitch = (akkaSource.zipWithIndex).viaMat(KillSwitches.single)(Keep.right).to(snk).run scala.io.StdIn.readLine() kill.shutdown() actorSys.terminate() println("+++++++++++++++") }

試運行結果OK。下面是新版本的jdbcQueryResult函數:數據庫

    def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( ctx: JDBCQueryContext[A])( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) rawSql.fetchSize(ctx.fetchSize) implicit val session = NamedAutoSession(ctx.dbName) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.collection.apply[C]() }

試運行:apache

 object SlickDAO { import slick.jdbc.H2Profile.api._ case class CountyModel(id: Int, name: String) case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) def name = column[String]("NAME",O.Length(64)) def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) } val CountyQuery = TableQuery[CountyTable] val filter = "Kansas" val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} val statement = qry.result.statements.head } import SlickDAO._ def toRow: WrappedResultSet => CountyModel = rs => CountyModel(id=rs.int("id"),name=rs.string("name")) //construct the context
  val slickCtx = JDBCQueryContext[CountyModel]( dbName = 'h2,
    statement = "select * from county where id > ? and id < ?", parameters = Seq(6000,6200), extractor = toRow ) val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx) vecCounty.foreach(r => println(s"${r.id},${r.name}"))

下面是本次討論的示範源代碼:編程

build.sbtc#

// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq( "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test", "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", "com.h2database"  %  "h2"                % "1.4.196", "mysql" % "mysql-connector-java" % "6.0.6", "org.postgresql" % "postgresql" % "42.2.0", "commons-dbcp" % "commons-dbcp" % "1.4", "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", "com.zaxxer" % "HikariCP" % "2.7.4", "com.jolbox" % "bonecp" % "0.8.0.RELEASE", "com.typesafe.slick" %% "slick" % "3.2.1", "ch.qos.logback"  %  "logback-classic"   % "1.2.3", "com.typesafe.akka" %% "akka-actor" % "2.5.4", "com.typesafe.akka" %% "akka-stream" % "2.5.4" )

resources/application.confapi

# JDBC settings test { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "commons-dbcp2" } } db.mysql.driver = "com.mysql.cj.jdbc.Driver" db.mysql.url = "jdbc:mysql://localhost:3306/testdb" db.mysql.user = "root" db.mysql.password = "123" db.mysql.poolInitialSize = 5 db.mysql.poolMaxSize = 7 db.mysql.poolConnectionTimeoutMillis = 1000 db.mysql.poolValidationQuery = "select 1 as one" db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } dev { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "bonecp" } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 }

JDBCEngine.scalatomcat

package jdbccontext import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import akka.stream.scaladsl._ import scalikejdbc._ import scalikejdbc.streams._ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import scala.util._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor object JDBCContext { type SQLTYPE = Int val SQL_SELECT: Int = 0 val SQL_EXEDDL= 1 val SQL_UPDATE = 2 val RETURN_GENERATED_KEYVALUE = true val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M]( dbName: Symbol, statement: String, parameters: Seq[Any] = Nil, fetchSize: Int = 100, autoCommit: Boolean = false, queryTimeout: Option[Int] = None, extractor: WrappedResultSet => M) case class JDBCContext( dbName: Symbol, statements: Seq[String] = Nil, parameters: Seq[Seq[Any]] = Nil, fetchSize: Int = 100, queryTimeout: Option[Int] = None, queryTags: Seq[String] = Nil, sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, batch: Boolean = false, returnGeneratedKey: Seq[Option[Any]] = Nil, // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None, postAction: Option[PreparedStatement => Unit] = None) { ctx =>

    //helper functions
 def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1) ctx.copy(preAction = action) else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1) ctx.copy(postAction = action) else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(Seq(_parameters)) ) } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(_parameters), returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) ) } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendBatchParameters(_parameters: Any*): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") var matchParams = true
      if (ctx.parameters != Nil) if (ctx.parameters.head.size != _parameters.size) matchParams = false
      if (matchParams) { ctx.copy( parameters = ctx.parameters ++ Seq(_parameters) ) } else
        throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") } def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") ctx.copy( returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil ) } def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), sqlType = JDBCContext.SQL_EXEDDL, batch = false ) } def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), sqlType = JDBCContext.SQL_UPDATE, batch = false ) } def setBatchCommand(_statement: String): JDBCContext = { ctx.copy ( statements = Seq(_statement), sqlType = JDBCContext.SQL_UPDATE, batch = true ) } } object JDBCEngine { import JDBCContext._ private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message) } def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.iterator .withDBSessionForceAdjuster(session => { session.connection.setAutoCommit(ctx.autoCommit) session.fetchSize(ctx.fetchSize) }) } Source.fromPublisher[A](publisher) } def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( ctx: JDBCQueryContext[A])( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) rawSql.fetchSize(ctx.fetchSize) implicit val session = NamedAutoSession(ctx.dbName) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.collection.apply[C]() } def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { if (ctx.sqlType != SQL_EXEDDL) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) } else { NamedDB(ctx.dbName) localTx { implicit session => Try { ctx.statements.foreach { stm => val ddl = new SQLExecution(statement = stm, parameters = Nil)( before = WrappedResultSet => {})( after = WrappedResultSet => {}) ddl.apply() } "SQL_EXEDDL executed succesfully." } } } } def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (ctx.batch) { if (noReturnKey(ctx)) { val usql = SQL(ctx.statements.head) .tags(ctx.queryTags: _*) .batch(ctx.parameters: _*) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[Seq]() Seq.empty[Long].to[C] } } } else { val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[C]() } } } } else { Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) } } } private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { val Some(key) :: xs = ctx.returnGeneratedKey val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result).to[C] } } } private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val before = ctx.preAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val after = ctx.postAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) Try { NamedDB(ctx.dbName) localTx {implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result.toLong).to[C] } } } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (noReturnKey(ctx)) singleTxUpdateNoReturnKey(ctx) else singleTxUpdateWithReturnKey(ctx) } private def noReturnKey(ctx: JDBCContext): Boolean = { if (ctx.returnGeneratedKey != Nil) { val k :: xs = ctx.returnGeneratedKey k match { case None => true
          case Some(k) => false } } else true } def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { case Nil => Seq.fill(ctx.statements.size)(None) case k@_ => k } val sqlcmd = ctx.statements zip ctx.parameters zip keys val results = sqlcmd.map { case ((stm, param), key) => key match { case None =>
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong case Some(k) =>
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong } } results.to[C] } } } def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (!ctx.batch) { if (ctx.statements.size == 1) singleTxUpdate(ctx) else multiTxUpdates(ctx) } else Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) } } }

JDBCQueryDemo.scalasession

import akka.actor._ import akka.stream.scaladsl._ import akka.stream._ import scalikejdbc._ import configdbs._ import jdbccontext._ import JDBCEngine._ object JDBCStreaming extends App { implicit val actorSys = ActorSystem("actor-system") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer() ConfigDBsWithEnv("dev").setup('h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings() case class DataRow(year: String, state: String, county: String, value: String) //data row converter
  val toRow = (rs: WrappedResultSet) => DataRow( year = rs.string("REPORTYEAR"), state = rs.string("STATENAME"), county = rs.string("COUNTYNAME"), value = rs.string("VALUE") ) //construct the context
  val ctx = JDBCQueryContext[DataRow]( dbName = 'h2,
    statement = "select * from AIRQM", extractor = toRow ) //pass context to construct akka-source
  val akkaSource = jdbcAkkaStream(ctx) //a sink for display rows
  val snk = Sink.foreach[(DataRow,Long)] { case (row,idx) => println(s"rec#: $idx - year: ${row.year} location: ${row.state},${row.county} value: ${row.value}")} //can manual terminate stream by kill.shutdown
  val kill: UniqueKillSwitch = (akkaSource.zipWithIndex).viaMat(KillSwitches.single)(Keep.right).to(snk).run scala.io.StdIn.readLine() kill.shutdown() actorSys.terminate() println("+++++++++++++++") object SlickDAO { import slick.jdbc.H2Profile.api._ case class CountyModel(id: Int, name: String) case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) def name = column[String]("NAME",O.Length(64)) def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) } val CountyQuery = TableQuery[CountyTable] val filter = "Kansas" val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} val statement = qry.result.statements.head } import SlickDAO._ def toCounty: WrappedResultSet => CountyModel = rs => CountyModel(id=rs.int("id"),name=rs.string("name")) //construct the context
  val slickCtx = JDBCQueryContext[CountyModel]( dbName = 'h2,
    statement = "select * from county where id > ? and id < ?", parameters = Seq(6000,6200), extractor = toCounty ) val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx) vecCounty.foreach(r => println(s"${r.id},${r.name}")) }
相關文章
相關標籤/搜索