akka在alpakka工具包裏提供了對cassandra數據庫的streaming功能。簡單來說就是用一個CQL-statement讀取cassandra數據併產生akka-stream的Source。這是一個支持reactive-stream協議的流:java
object CassandraSource { /** * Scala API: creates a [[CassandraSourceStage]] from a given statement. */ def apply(stmt: Statement)(implicit session: Session): Source[Row, NotUsed] = Source.fromGraph(new CassandraSourceStage(Future.successful(stmt), session)) /** * Scala API: creates a [[CassandraSourceStage]] from the result of a given Future. */ def fromFuture(futStmt: Future[Statement])(implicit session: Session): Source[Row, NotUsed] = Source.fromGraph(new CassandraSourceStage(futStmt, session)) }
CassandraSource.apply構建Source[Row,NotUsed]。能夠直接接通Flow[Row,Row,NotUsed]和Sink來使用。咱們是經過CQLQueryContext來構建這個Source的:mysql
def cassandraStream[A](ctx: CQLQueryContext[A]) (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) }
CQLQueryContext[A]在上期介紹過: react
case class CQLQueryContext[M]( statement: String, extractor: Row => M, parameter: Seq[Object] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, fetchSize: Int = 100 ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] = ctx.copy(consistency = Some(_consistency)) def setFetchSize(pageSize: Int): CQLQueryContext[M] = ctx.copy(fetchSize = pageSize) } object CQLQueryContext { def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =
new CQLQueryContext[M](statement = stmt, extractor = converter) }
下面是一個流的構建和使用示範:sql
case class Model ( rowid: Long, measureid: Long, state: String, county: String, year: Int, value: Int, createdAt: java.util.Date ) //data row converter
val toModel = (rs: Row) => Model( rowid = rs.getLong("ROWID"), measureid = rs.getLong("MEASUREID"), state = rs.getString("STATENAME"), county = rs.getString("COUNTYNAME"), year = rs.getInt("REPORTYEAR"), value = rs.getInt("VALUE"), createdAt = rs.getTimestamp("CREATED") ) //setup context
val ctx = CQLQueryContext("select * from testdb.aqmrpt",toModel) //construct source
val src = cassandraStream(ctx) //a display sink
val snk = Sink.foreach[Model]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } //run on source
src.to(snk).run()
除了經過讀取數據構成stream source以外,咱們還能夠以流元素爲數據進行數據庫更新操做,由於咱們能夠用map來運行execute:數據庫
case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Object], consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = cas.copy(consistency = Some(_consistency)) def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { val prepStmt = session.prepare(statement) var boundStmt = prepStmt.bind() val params = processParameters(prepareParams(r)) boundStmt = prepStmt.bind(params:_*) consistency.foreach { cons => boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) } session.executeAsync(boundStmt).map(_ => r) } def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object CassandraActionStream { def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
new CassandraActionStream[R]( statement=_statement, prepareParams = params) }
CassandraActionStream能夠用statement,params構建。它的一個函數performOnRow是一個Flow[R,R,NotUsed],能夠把每一個R轉換成一條CQL後經過map來運行executeAsyn,形成一種批次運算效果。下面是CassandraActionStream的使用示範:apache
//pass context to construct akka-source
val jdbcSource = jdbcAkkaStream(ctx) val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," +
"COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?,?,?,?,?)" val toPparams: DataRow => Seq[Object] = r => { Seq[Object](r.rowid.asInstanceOf[Object], r.measureid.asInstanceOf[Object], r.state, r.county, r.year.asInstanceOf[Object], r.value.asInstanceOf[Object], CQLDateTimeNow ) } val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2) .setProcessOrder(false) val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow val sink = Sink.foreach[DataRow]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } val sts = jdbcSource.take(100).via(actionFlow).to(sink).run()
下面的例子裏咱們用CassandraStream的流元素更新h2數據庫中的數據,調用了JDBCActionStream: promise
//data row converter
val toModel = (rs: Row) => Model( rowid = rs.getLong("ROWID"), measureid = rs.getLong("MEASUREID"), state = rs.getString("STATENAME"), county = rs.getString("COUNTYNAME"), year = rs.getInt("REPORTYEAR"), value = rs.getInt("VALUE"), createdAt = rs.getTimestamp("CREATED") ) //setup context
val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel) //construct source
val src = cassandraStream(cqlCtx) //a display sink
val snk = Sink.foreach[Model]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } //run on source
src.to(snk).run() val params: Model => Seq[Any] = row => { Seq((row.value * 10), row.rowid) } val jdbcActionStream = JDBCActionStream('h2, "update AQMRPT set total = ? where rowid = ?",params)
.setParallelism(2).setProcessOrder(false) val jdbcActionFlow = jdbcActionStream.performOnRow //update rows in h2 database from data in cassandra database
src.via(jdbcActionFlow).to(snk).run()
下面是本次示範的源代碼:tomcat
build.sbtsession
name := "learn_cassandra" version := "0.1" scalaVersion := "2.12.4" libraryDependencies := Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", "com.typesafe.akka" %% "akka-actor" % "2.5.4", "com.typesafe.akka" %% "akka-stream" % "2.5.4", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16", "org.scalikejdbc" %% "scalikejdbc" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test", "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", "com.h2database" % "h2" % "1.4.196", "mysql" % "mysql-connector-java" % "6.0.6", "org.postgresql" % "postgresql" % "42.2.0", "commons-dbcp" % "commons-dbcp" % "1.4", "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", "com.zaxxer" % "HikariCP" % "2.7.4", "com.jolbox" % "bonecp" % "0.8.0.RELEASE", "com.typesafe.slick" %% "slick" % "3.2.1", "ch.qos.logback" % "logback-classic" % "1.2.3")
resources/application.confapp
# JDBC settings test { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "commons-dbcp2" } } db.mysql.driver = "com.mysql.cj.jdbc.Driver" db.mysql.url = "jdbc:mysql://localhost:3306/testdb" db.mysql.user = "root" db.mysql.password = "123" db.mysql.poolInitialSize = 5 db.mysql.poolMaxSize = 7 db.mysql.poolConnectionTimeoutMillis = 1000 db.mysql.poolValidationQuery = "select 1 as one" db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } dev { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "bonecp" } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 }
CassandraEngine.scala
import com.datastax.driver.core._ import scala.concurrent._ import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom import scala.concurrent.duration.Duration import akka.NotUsed import akka.stream.alpakka.cassandra.scaladsl._ import akka.stream.scaladsl._ object CQLContext { // Consistency Levels
type CONSISTENCY_LEVEL = Int val ANY: CONSISTENCY_LEVEL = 0x0000 val ONE: CONSISTENCY_LEVEL = 0x0001 val TWO: CONSISTENCY_LEVEL = 0x0002 val THREE: CONSISTENCY_LEVEL = 0x0003 val QUORUM : CONSISTENCY_LEVEL = 0x0004 val ALL: CONSISTENCY_LEVEL = 0x0005 val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006 val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007 val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { consistency match { case ALL => ConsistencyLevel.ALL case ONE => ConsistencyLevel.ONE case TWO => ConsistencyLevel.TWO case THREE => ConsistencyLevel.THREE case ANY => ConsistencyLevel.ANY case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE case QUORUM => ConsistencyLevel.QUORUM case SERIAL => ConsistencyLevel.SERIAL case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL } } } case class CQLQueryContext[M]( statement: String, extractor: Row => M, parameter: Seq[Object] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, fetchSize: Int = 100 ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] = ctx.copy(consistency = Some(_consistency)) def setFetchSize(pageSize: Int): CQLQueryContext[M] = ctx.copy(fetchSize = pageSize) } object CQLQueryContext { def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =
new CQLQueryContext[M](statement = stmt, extractor = converter) } case class CQLContext( statements: Seq[String], parameters: Seq[Seq[Object]] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = ctx.copy(consistency = Some(_consistency)) def setCommand(_statement: String, _parameters: Object*): CQLContext = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) def appendCommand(_statement: String, _parameters: Object*): CQLContext = ctx.copy(statements = ctx.statements :+ _statement, parameters = ctx.parameters ++ Seq(_parameters)) } object CQLEngine { import CQLContext._ import CQLHelpers._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)( implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() if (ctx.parameter != Nil) { val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) } ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) } def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
if (resultSet.isFullyFetched) { (resultSet, None) } else { try { val result = Await.result(resultSet.fetchMoreResults(), timeOut) (result, Some((result.asScala.view.map(extractor)).to[C])) } catch { case e: Throwable => (resultSet, None) } } def cqlExecute(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { if (ctx.statements.size == 1) cqlSingleUpdate(ctx) else cqlMultiUpdate(ctx) } def cqlSingleUpdate(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(ctx.statements.head) var boundStmt = prepStmt.bind() if (ctx.parameters != Nil) { val params = processParameters(ctx.parameters.head) boundStmt = prepStmt.bind(params:_*) } ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} session.executeAsync(boundStmt).map(_.wasApplied()) } def cqlMultiUpdate(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters var batch = new BatchStatement() commands.foreach { case (stm, params) => val prepStmt = session.prepare(stm) if (params == Nil) batch.add(prepStmt.bind()) else { val p = processParameters(params) batch.add(prepStmt.bind(p: _*)) } } ctx.consistency.foreach {consistency => batch.setConsistencyLevel(consistencyLevel(consistency))} session.executeAsync(batch).map(_.wasApplied()) } def cassandraStream[A](ctx: CQLQueryContext[A]) (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) } case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Object], consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = cas.copy(consistency = Some(_consistency)) private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { val prepStmt = session.prepare(statement) var boundStmt = prepStmt.bind() val params = processParameters(prepareParams(r)) boundStmt = prepStmt.bind(params:_*) consistency.foreach { cons => boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) } session.executeAsync(boundStmt).map(_ => r) } def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object CassandraActionStream { def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
new CassandraActionStream[R]( statement=_statement, prepareParams = params) } } object CQLHelpers { import java.nio.ByteBuffer import java.io._ import java.nio.file._ import com.datastax.driver.core.LocalDate import com.datastax.driver.extras.codecs.jdk8.InstantCodec import java.time.Instant import akka.stream.scaladsl._ import akka.stream._ implicit def listenableFutureToFuture[T]( listenableFuture: ListenableFuture[T]): Future[T] = { val promise = Promise[T]() Futures.addCallback(listenableFuture, new FutureCallback[T] { def onFailure(error: Throwable): Unit = { promise.failure(error) () } def onSuccess(result: T): Unit = { promise.success(result) () } }) promise.future } case class CQLDate(year: Int, month: Int, day: Int) case object CQLTodayDate case class CQLDateTime(year: Int, Month: Int, day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) case object CQLDateTimeNow def processParameters(params: Seq[Object]): Seq[Object] = { import java.time.{Clock,ZoneId} params.map { obj => obj match { case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) case CQLTodayDate => val today = java.time.LocalDate.now() LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS))) case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") case p@_ => p } } } class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { override def read: Int = { if (!buf.hasRemaining) return -1 buf.get } override def read(bytes: Array[Byte], off: Int, len: Int): Int = { val length: Int = Math.min(len, buf.remaining) buf.get(bytes, off, length) length } } object ByteBufferInputStream { def apply(buf: ByteBuffer): ByteBufferInputStream = { new ByteBufferInputStream(buf) } } class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { override def write(b: Int): Unit = { buf.put(b.toByte) } override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { buf.put(bytes, off, len) } } object FixsizedByteBufferOutputStream { def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf) } class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR override def write(b: Array[Byte], off: Int, len: Int): Unit = { val position = buf.position val limit = buf.limit val newTotal: Long = position + len if(newTotal > limit){ var capacity = (buf.capacity * increasing) while(capacity <= newTotal){ capacity = (capacity*increasing) } increase(capacity.toInt) } buf.put(b, 0, len) } override def write(b: Int): Unit= { if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt) buf.put(b.toByte) } protected def increase(newCapacity: Int): Unit = { buf.limit(buf.position) buf.rewind val newBuffer =
if (onHeap) ByteBuffer.allocate(newCapacity) else ByteBuffer.allocateDirect(newCapacity) newBuffer.put(buf) buf.clear buf = newBuffer } def size: Long = buf.position def capacity: Long = buf.capacity def byteBuffer: ByteBuffer = buf } object ExpandingByteBufferOutputStream { val DEFAULT_INCREASING_FACTOR = 1.5f def apply(size: Int, increasingBy: Float, onHeap: Boolean) = { if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0") val buffer: ByteBuffer =
if (onHeap) ByteBuffer.allocate(size) else ByteBuffer.allocateDirect(size) new ExpandingByteBufferOutputStream(buffer,onHeap) } def apply(size: Int): ExpandingByteBufferOutputStream = { apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false) } def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = { apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap) } def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = { apply(size, increasingBy, false) } } def cqlFileToBytes(fileName: String): ByteBuffer = { val fis = new FileInputStream(fileName) val b = new Array[Byte](fis.available + 1) val length = b.length fis.read(b) ByteBuffer.wrap(b) } def cqlBytesToFile(bytes: ByteBuffer, fileName: String)( implicit mat: Materializer): Future[IOResult] = { val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def cqlDateTimeString(date: java.util.Date, fmt: String): String = { val outputFormat = new java.text.SimpleDateFormat(fmt) outputFormat.format(date) } def useJava8DateTime(cluster: Cluster) = { //for jdk8 datetime format
cluster.getConfiguration().getCodecRegistry() .register(InstantCodec.instance) } }
JDBCEngine.scala
package jdbccontext import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import akka.stream.scaladsl._ import scalikejdbc._ import scalikejdbc.streams._ import akka.NotUsed import scala.util._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor object JDBCContext { type SQLTYPE = Int val SQL_EXEDDL= 1 val SQL_UPDATE = 2 val RETURN_GENERATED_KEYVALUE = true val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M]( dbName: Symbol, statement: String, parameters: Seq[Any] = Nil, fetchSize: Int = 100, autoCommit: Boolean = false, queryTimeout: Option[Int] = None, extractor: WrappedResultSet => M) case class JDBCContext( dbName: Symbol, statements: Seq[String] = Nil, parameters: Seq[Seq[Any]] = Nil, fetchSize: Int = 100, queryTimeout: Option[Int] = None, queryTags: Seq[String] = Nil, sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, batch: Boolean = false, returnGeneratedKey: Seq[Option[Any]] = Nil, // no return: None, return by index: Some(1), by name: Some("id")
preAction: Option[PreparedStatement => Unit] = None, postAction: Option[PreparedStatement => Unit] = None) { ctx =>
//helper functions
def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == 1) ctx.copy(preAction = action) else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == 1) ctx.copy(postAction = action) else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(Seq(_parameters)) ) } else
throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(_parameters), returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) ) } else
throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendBatchParameters(_parameters: Any*): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") var matchParams = true
if (ctx.parameters != Nil) if (ctx.parameters.head.size != _parameters.size) matchParams = false
if (matchParams) { ctx.copy( parameters = ctx.parameters ++ Seq(_parameters) ) } else
throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") } def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") ctx.copy( returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil ) } def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), sqlType = JDBCContext.SQL_EXEDDL, batch = false ) } def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), sqlType = JDBCContext.SQL_UPDATE, batch = false ) } def setBatchCommand(_statement: String): JDBCContext = { ctx.copy ( statements = Seq(_statement), sqlType = JDBCContext.SQL_UPDATE, batch = true ) } } object JDBCEngine { import JDBCContext._ private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
throw new IllegalStateException(message) } def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.iterator .withDBSessionForceAdjuster(session => { session.connection.setAutoCommit(ctx.autoCommit) session.fetchSize(ctx.fetchSize) }) } Source.fromPublisher[A](publisher) } def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( ctx: JDBCQueryContext[A])( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) rawSql.fetchSize(ctx.fetchSize) implicit val session = NamedAutoSession(ctx.dbName) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.collection.apply[C]() } def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { if (ctx.sqlType != SQL_EXEDDL) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) } else { NamedDB(ctx.dbName) localTx { implicit session => Try { ctx.statements.foreach { stm => val ddl = new SQLExecution(statement = stm, parameters = Nil)( before = WrappedResultSet => {})( after = WrappedResultSet => {}) ddl.apply() } "SQL_EXEDDL executed succesfully." } } } } def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (ctx.batch) { if (noReturnKey(ctx)) { val usql = SQL(ctx.statements.head) .tags(ctx.queryTags: _*) .batch(ctx.parameters: _*) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[Seq]() Seq.empty[Long].to[C] } } } else { val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[C]() } } } } else { Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) } } } private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { val Some(key) :: xs = ctx.returnGeneratedKey val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result).to[C] } } } private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val before = ctx.preAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val after = ctx.postAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) Try { NamedDB(ctx.dbName) localTx {implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result.toLong).to[C] } } } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (noReturnKey(ctx)) singleTxUpdateNoReturnKey(ctx) else singleTxUpdateWithReturnKey(ctx) } private def noReturnKey(ctx: JDBCContext): Boolean = { if (ctx.returnGeneratedKey != Nil) { val k :: xs = ctx.returnGeneratedKey k match { case None => true
case Some(k) => false } } else true } def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { case Nil => Seq.fill(ctx.statements.size)(None) case k@_ => k } val sqlcmd = ctx.statements zip ctx.parameters zip keys val results = sqlcmd.map { case ((stm, param), key) => key match { case None =>
new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong case Some(k) =>
new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong } } results.to[C] } } } def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (!ctx.batch) { if (ctx.statements.size == 1) singleTxUpdate(ctx) else multiTxUpdates(ctx) } else Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) } } case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Any]) {jas => def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName=db) def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) private def perform(r: R) = { import scala.concurrent._ val params = prepareParams(r) NamedDB(dbName) autoCommit { session => session.execute(statement,params: _*) } Future.successful(r) } def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] =
if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object JDBCActionStream { def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) } }
HikariConfig.scala
package configdbs import scala.collection.mutable import scala.concurrent.duration.Duration import scala.language.implicitConversions import com.typesafe.config._ import java.util.concurrent.TimeUnit import java.util.Properties import scalikejdbc.config._ import com.typesafe.config.Config import com.zaxxer.hikari._ import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal { import scala.collection.JavaConverters._ def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default def getDurationOr(path: String, default: => Duration = Duration.Zero) =
if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String, default: => Properties = null): Properties =
if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = { def toProps(m: mutable.Map[String, ConfigValue]): Properties = { val props = new Properties(null) m.foreach { case (k, cv) => val v =
if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) else if(cv.unwrapped eq null) null
else cv.unwrapped.toString if(v ne null) props.put(k, v) } props } toProps(c.root.asScala) } def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None def getStringOpt(path: String) = Option(getStringOr(path)) def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) } object ConfigExtensionMethods { @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) } trait HikariConfigReader extends TypesafeConfigReader { self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix =>
import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = { val c: Config = config.getConfig(envPrefix + "db." + dbName.name) c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) } def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig() val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings
if (c.hasPath("dataSourceClass")) { hconf.setDataSourceClassName(c.getString("dataSourceClass")) } else { Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) } hconf.setJdbcUrl(c.getStringOr("url", null)) c.getStringOpt("user").foreach(hconf.setUsername) c.getStringOpt("password").foreach(hconf.setPassword) c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration
hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) val numThreads = c.getIntOr("numThreads", 20) hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) hconf.setPoolName(c.getStringOr("poolName", dbName.name)) hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) // Equivalent of ConnectionPreparer
hconf.setReadOnly(c.getBooleanOr("readOnly", false)) c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) hconf.setCatalog(c.getStringOr("catalog", null)) hconf } } import scalikejdbc._ trait ConfigDBs { self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { getFactoryName(dbName) match { case "hikaricp" => { val hconf = hikariCPConfig(dbName) val hikariCPSource = new HikariDataSource(hconf) if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { Class.forName(hconf.getDriverClassName) } ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource)) } case _ => { val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) val cpSettings = readConnectionPoolSettings(dbName) if (driver != null && driver.trim.nonEmpty) { Class.forName(driver) } ConnectionPool.add(dbName, url, user, password, cpSettings) } } } def setupAll(): Unit = { loadGlobalSettings() dbNames.foreach { dbName => setup(Symbol(dbName)) } } def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { ConnectionPool.close(dbName) } def closeAll(): Unit = { ConnectionPool.closeAll } } object ConfigDBs extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader with EnvPrefix { override val env = Option(envValue) }
StreamDemo.scala
import com.datastax.driver.core._ import akka._ import CQLEngine._ import CQLHelpers._ import akka.actor._ import akka.stream.scaladsl._ import akka.stream._ import scalikejdbc._ import configdbs._ import jdbccontext._ import JDBCEngine._ import scala.util._ object cassandraStreamDemo extends App { //#init-mat
implicit val cqlsys = ActorSystem("cqlSystem") implicit val mat = ActorMaterializer() implicit val ec = cqlsys.dispatcher val cluster = new Cluster .Builder() .addContactPoints("localhost") .withPort(9042) .build() useJava8DateTime(cluster) implicit val session = cluster.connect() //config jdbc drivers
ConfigDBsWithEnv("dev").setup('h2)
ConfigDBsWithEnv("dev").loadGlobalSettings() val cqlCreate =
""" |create table testdb.AQMRPT( |ROWID BIGINT PRIMARY KEY, |MEASUREID BIGINT, |STATENAME TEXT, |COUNTYNAME TEXT, |REPORTYEAR INT, |VALUE INT, |CREATED TIMESTAMP) """.stripMargin
val ctxCreate = CQLContext().setCommand(cqlCreate) cqlExecute(ctxCreate).onComplete{ case Success(s) => println("schema created successfully!") case Failure(e) => println(e.getMessage) } scala.io.StdIn.readLine() case class DataRow ( rowid: Long, measureid: Long, state: String, county: String, year: Int, value: Int ) val toRow: WrappedResultSet => DataRow = rs => DataRow( rowid = rs.long("ROWID"), measureid = rs.long("MEASUREID"), state = rs.string("STATENAME"), county = rs.string("COUNTYNAME"), year = rs.int("REPORTYEAR"), value = rs.int("VALUE") ) //construct the context
val ctx = JDBCQueryContext[DataRow]( dbName = 'h2,
statement = "select * from AQMRPT", extractor = toRow ) //source from h2 database
val jdbcSource = jdbcAkkaStream(ctx) //insert into cassandra database
val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," +
"COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?,?,?,?,?)" val toPparams: DataRow => Seq[Object] = r => { Seq[Object](r.rowid.asInstanceOf[Object], r.measureid.asInstanceOf[Object], r.state, r.county, r.year.asInstanceOf[Object], r.value.asInstanceOf[Object], CQLDateTimeNow ) } val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2) .setProcessOrder(false) val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow val sink = Sink.foreach[DataRow]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } val sts = jdbcSource.take(100).via(actionFlow).to(sink).run() case class Model ( rowid: Long, measureid: Long, state: String, county: String, year: Int, value: Int, createdAt: java.util.Date ) //data row converter
val toModel = (rs: Row) => Model( rowid = rs.getLong("ROWID"), measureid = rs.getLong("MEASUREID"), state = rs.getString("STATENAME"), county = rs.getString("COUNTYNAME"), year = rs.getInt("REPORTYEAR"), value = rs.getInt("VALUE"), createdAt = rs.getTimestamp("CREATED") ) //setup context
val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel) //construct source
val src = cassandraStream(cqlCtx) //a display sink
val snk = Sink.foreach[Model]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } //run on source
src.to(snk).run() val params: Model => Seq[Any] = row => { Seq((row.value * 10), row.rowid) } val jdbcActionStream = JDBCActionStream('h2, "update AQMRPT set total = ? where rowid = ?",params)
.setParallelism(2).setProcessOrder(false) val jdbcActionFlow = jdbcActionStream.performOnRow //update rows in h2 database from data in cassandra database
src.via(jdbcActionFlow).to(snk).run() scala.io.StdIn.readLine() session.close() cluster.close() cqlsys.terminate() }