前兩篇咱們介紹了JDBC和Cassandra的gRPC streaming實現。相對MongoDB來講,JDBC和Cassandra支持字符類型的query語句SQL,CQL,因此把query指令轉換成protobuf structures是簡單直接的。而MongoDB沒有提供字符類的query,因此咱們必須進行MongoDB query涉及的全部類型與protobuf類型的相互轉換,實現gRPC功能會複雜的多。咱們在這篇討論裏先介紹MongoDB query的protobuf轉換。java
在前面的MongoDB-Engine討論裏咱們設計了個MGOContext做爲JVM內部傳遞MongoDB query的數據結構:mongodb
case class MGOContext( dbName: String, collName: String, actionType: MGO_ACTION_TYPE = MGO_QUERY, action: Option[MGOCommands] = None ) { ctx => def setDbName(name: String): MGOContext = ctx.copy(dbName = name) def setCollName(name: String): MGOContext = ctx.copy(collName = name) def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at) def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd)) }
下面是這個結構支持的action清單:數據結構
object MGOCommands { case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands /* org.mongodb.scala.FindObservable import com.mongodb.async.client.FindIterable val resultDocType = FindIterable[Document] val resultOption = FindObservable(resultDocType) .maxScan(...) .limit(...) .sort(...) .project(...) */
case class Find(filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]]= None, firstOnly: Boolean = false) extends MGOCommands case class DocumentStream(filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]] = None, ) extends MGOCommands case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands } object MGOAdmins { case class DropCollection(collName: String) extends MGOCommands case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands case class ListCollection(dbName: String) extends MGOCommands case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands }
能夠看到,咱們必須把Bson、Document、FindObservable這幾個類型對應到protobuf格式。下面是.proto文件裏的部份內容:app
message MGODocument { bytes document = 1; } message MGOBson { bytes bson = 1; } message ResultTransformer { //FindObservable
int32 optType = 1; MGOBson bsonParam = 2; int32 valueParam = 3; } message MGOAdminOptons { string tarName = 1; repeated MGOBson bsonParam = 2; OptionAny options = 3; string objName = 4; } message MGOOperations { //MGOContext
string dbName = 1; string collName = 2; int32 commandType = 3; repeated MGOBson bsonParam = 4; repeated ResultTransformer resultOptions = 5; OptionAny options = 6; repeated MGODocument documents = 7; google.protobuf.BoolValue only = 8; MGOAdminOptons adminOptions = 9; }
首先,Document是個serializable類,能夠直接進行序列/反序列化:async
val po = Document ( "ponum" -> "po18012301", "vendor" -> "The smartphone compay", "remarks" -> "urgent, rush order", "podtl" -> Seq( Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") ) ) println(po) val pobytes = marshal(po) println(s"po bytes: ${pobytes}") val po1 = unmarshal[Document](pobytes) println(s"back to po document: $po1")
下一個是Bson,它是個java interface:ide
/** * An interface for types that are able to render themselves into a {@code BsonDocument}. * * @since 3.0 */
public interface Bson { /** * Render the filter into a BsonDocument. * * @param documentClass the document class in scope for the collection. This parameter may be ignored, but it may be used to alter * the structure of the returned {@code BsonDocument} based on some knowledge of the document class. * @param codecRegistry the codec registry. This parameter may be ignored, but it may be used to look up {@code Codec} instances for * the document class or any other related class. * @param <TDocument> the type of the document class * @return the BsonDocument */
<TDocument> BsonDocument toBsonDocument(Class<TDocument> documentClass, CodecRegistry codecRegistry); }
Bson只是一個interface,不是serilizable,不過BsonDocument能夠:函數
/** * A type-safe container for a BSON document. This class should NOT be sub-classed by third parties. * * @since 3.0 */
public class BsonDocument extends BsonValue implements Map<String, BsonValue>, Cloneable, Bson, Serializable {...}
因此咱們能夠用BsonDocument來進行序列/反序列後在再用它來構建一個新的Bson對象:ui
def bsonToProto(bson: Bson) = MGOBson(marshal(bson.toBsonDocument( classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY))) def protoToBson(proto: MGOBson): Bson = new Bson { val bsdoc = unmarshal[BsonDocument](proto.bson) override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc }
最後是這個FindObservable:這個類型的應用場景是這樣的:this
/* org.mongodb.scala.FindObservable import com.mongodb.async.client.FindIterable val resultDocType = FindIterable[Document] val resultOption = FindObservable(resultDocType) .maxScan(...) .limit(...) .sort(...) .project(...) */
case class Find(filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]]= None, firstOnly: Boolean = false) extends MGOCommands
FindObservable類型的效果能夠是一連串施用的結果,由於是FindObservable[A] => FindObservable[A]這樣的款式,因此咱們能夠用一串FindObservable[Document]來進行序列/反序列化處理,而後再從新串連施用來得到最終的FindObservable。FindObservable對應的protobuf結構以下:google
message ResultTransformer { //FindObservable
int32 optType = 1; MGOBson bsonParam = 2; int32 valueParam = 3; } type FOD_TYPE = Int val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult] //Sets a document describing the fields to return for all matching documents
val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult] //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult] //Sets the cursor type
val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult] //Sets the hint for which index to use. A null value means no hint is set
val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult] //Sets the exclusive upper bound for a specific index. A null value means no max is set
val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult] //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult] //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult] //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
case class ResultOptions( optType: FOD_TYPE, bsonParam: Option[Bson] = None, valueParam: Int = 0 ){ def toProto = new sdp.grpc.services.ResultTransformer( optType = this.optType, bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))}, valueParam = this.valueParam ) def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => { optType match { case FOD_FIRST => find case FOD_FILTER => find.filter(bsonParam.get) case FOD_LIMIT => find.limit(valueParam) case FOD_SKIP => find.skip(valueParam) case FOD_PROJECTION => find.projection(bsonParam.get) case FOD_SORT => find.sort(bsonParam.get) case FOD_PARTIAL => find.partial(valueParam != 0) case FOD_CURSORTYPE => find case FOD_HINT => find.hint(bsonParam.get) case FOD_MAX => find.max(bsonParam.get) case FOD_MIN => find.min(bsonParam.get) case FOD_RETURNKEY => find.returnKey(valueParam != 0) case FOD_SHOWRECORDID => find.showRecordId(valueParam != 0) } } } object ResultOptions { def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions( optType = msg.optType, bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)), valueParam = msg.valueParam ) }
咱們能夠用這個ResultOptions類型的toProto,fromProto來進行protobuf的轉換處理。而後用aggregation實現連串施用:
def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj => rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
下面這個函數示範了Find Context的反序列:
def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match { case MGO_COMMAND_FIND => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_QUERY, action = Some(Find()) ) def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj => rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b)) (proto.bsonParam, proto.resultOptions, proto.only) match { case (Nil, Nil, None) => ctx case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b)) case (bp,Nil,None) => ctx.setCommand( Find(Some(protoToBson(bp.head)),None,false)) case (bp,Nil,Some(b)) => ctx.setCommand( Find(Some(protoToBson(bp.head)),None,b)) case (bp,fo,None) => { ctx.setCommand( Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false)) } case (bp,fo,Some(b)) => { ctx.setCommand( Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b)) } case _ => ctx } } }
具體的應用示範例以下:
val eqState = equal("state","California") val proj = exclude("rowid","_id") val rtxfmr = Seq( ResultOptions( optType = FOD_LIMIT, valueParam = 3) ,ResultOptions( optType = FOD_PROJECTION, bsonParam = Some(proj)) ) val protoCtx = MGOProtoMsg( dbName = "testdb", collName = "aqmrpt", commandType = MGO_COMMAND_FIND, bsonParam = Seq(eqState), resultOptions = rtxfmr ).toProto val findCtx = CtxFromProto(protoCtx) val futFind = mgoQuery[Seq[Document]](findCtx) futFind.onComplete { case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())} case Failure(e) => println(e.getMessage) }
下面是本次討論的部分源代碼:
MongoDBEngine.scala
package sdp.mongo.engine import java.text.SimpleDateFormat import akka.NotUsed import akka.stream.alpakka.mongodb.scaladsl._ import akka.stream.scaladsl.{Flow, Sink, Source} import org.mongodb.scala.MongoClient import org.mongodb.scala.bson.collection.immutable.Document import org.bson.conversions.Bson import org.mongodb.scala._ import org.mongodb.scala.model._ import java.util.Calendar import scala.collection.JavaConverters._ import sdp.file.Streaming._ import akka.stream.Materializer import org.mongodb.scala.bson.{BsonArray, BsonBinary} import scala.concurrent._ import scala.concurrent.duration._ import sdp.logging.LogSupport object MGOContext { type MGO_ACTION_TYPE = Int val MGO_QUERY = 0 val MGO_UPDATE = 1 val MGO_ADMIN = 2 trait MGOCommands object MGOCommands { case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands /* org.mongodb.scala.FindObservable import com.mongodb.async.client.FindIterable val resultDocType = FindIterable[Document] val resultOption = FindObservable(resultDocType) .maxScan(...) .limit(...) .sort(...) .project(...) */
case class Find(filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]]= None, firstOnly: Boolean = false) extends MGOCommands case class DocumentStream(filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]] = None, ) extends MGOCommands case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands } object MGOAdmins { case class DropCollection(collName: String) extends MGOCommands case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands case class ListCollection(dbName: String) extends MGOCommands case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands } case class MGOContext( dbName: String, collName: String, actionType: MGO_ACTION_TYPE = MGO_QUERY, action: Option[MGOCommands] = None ) { ctx => def setDbName(name: String): MGOContext = ctx.copy(dbName = name) def setCollName(name: String): MGOContext = ctx.copy(collName = name) def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at) def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd)) } object MGOContext { def apply(db: String, coll: String) = new MGOContext(db, coll) } case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) { ctxs => def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt) def appendContext(ctx: MGOContext): MGOBatContext = ctxs.copy(contexts = contexts :+ ctx) } object MGOBatContext { def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx) } type MGODate = java.util.Date def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = { val ca = Calendar.getInstance() ca.set(yyyy,mm,dd) ca.getTime() } def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = { val ca = Calendar.getInstance() ca.set(yyyy,mm,dd,hr,min,sec) ca.getTime() } def mgoDateTimeNow: MGODate = { val ca = Calendar.getInstance() ca.getTime } def mgoDateToString(dt: MGODate, formatString: String): String = { val fmt= new SimpleDateFormat(formatString) fmt.format(dt) } type MGOBlob = BsonBinary type MGOArray = BsonArray def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer) = FileToByteArray(fileName,timeOut) def mgoBlobToFile(blob: MGOBlob, fileName: String)( implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName) def mgoGetStringOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getString(fieldName)) else None } def mgoGetIntOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getInteger(fieldName)) else None } def mgoGetLonggOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getLong(fieldName)) else None } def mgoGetDoubleOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getDouble(fieldName)) else None } def mgoGetBoolOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getBoolean(fieldName)) else None } def mgoGetDateOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getDate(fieldName)) else None } def mgoGetBlobOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) doc.get(fieldName).asInstanceOf[Option[MGOBlob]] else None } def mgoGetArrayOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) doc.get(fieldName).asInstanceOf[Option[MGOArray]] else None } def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = { (arr.getValues.asScala.toList) .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]] } type MGOFilterResult = FindObservable[Document] => FindObservable[Document] } object MGOEngine extends LogSupport { import MGOContext._ import MGOCommands._ import MGOAdmins._ object TxUpdateMode { private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])( implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = { log.info(s"mgoTxUpdate> calling ...") observable.map(clientSession => { val transactionOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY).build() clientSession.startTransaction(transactionOptions) val fut = Future.traverse(ctxs.contexts) { ctx => mgoUpdateObservable[Completed](ctx).map(identity).toFuture() } Await.ready(fut, 3 seconds) clientSession }) } private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { log.info(s"commitAndRetry> calling ...") observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => { log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...") commitAndRetry(observable) } case e: Exception => { log.error(s"commitAndRetry> Exception during commit ...: $e") throw e } }) } private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { log.info(s"runTransactionAndRetry> calling ...") observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...") runTransactionAndRetry(observable) } }) } def mgoTxBatch(ctxs: MGOBatContext)( implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = { log.info(s"mgoTxBatch> MGOBatContext: ${ctxs}") val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession()) val commitTransactionObservable: SingleObservable[Completed] = updateObservable.flatMap(clientSession => clientSession.commitTransaction()) val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable) runTransactionAndRetry(commitAndRetryObservable) }.toFuture() } def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = { log.info(s"mgoUpdateBatch> MGOBatContext: ${ctxs}") if (ctxs.tx) { TxUpdateMode.mgoTxBatch(ctxs) } else { val fut = Future.traverse(ctxs.contexts) { ctx => mgoUpdate[Completed](ctx).map(identity) } Await.ready(fut, 3 seconds) Future.successful(new Completed) } } // T => FindIterable e.g List[Document]
def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): Future[T] = { log.info(s"mgoQuery> MGOContext: ${ctx}") val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) if ( ctx.action == None) { log.error(s"mgoQuery> uery action cannot be null!") throw new IllegalArgumentException("query action cannot be null!") } ctx.action.get match { /* count */
case Count(Some(filter), Some(opt)) => //SingleObservable
coll.countDocuments(filter, opt.asInstanceOf[CountOptions]) .toFuture().asInstanceOf[Future[T]] case Count(Some(filter), None) => //SingleObservable
coll.countDocuments(filter).toFuture() .asInstanceOf[Future[T]] case Count(None, None) => //SingleObservable
coll.countDocuments().toFuture() .asInstanceOf[Future[T]] /* distinct */
case Distict(field, Some(filter)) => //DistinctObservable
coll.distinct(field, filter).toFuture() .asInstanceOf[Future[T]] case Distict(field, None) => //DistinctObservable
coll.distinct((field)).toFuture() .asInstanceOf[Future[T]] /* find */
case Find(None, None, false) => //FindObservable
if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]] else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]] case Find(None, None, true) => //FindObservable
if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]] else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]] case Find(Some(filter), None, false) => //FindObservable
if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]] case Find(Some(filter), None, true) => //FindObservable
if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]] else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]] case Find(None, Some(next), _) => //FindObservable
if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]] case Find(Some(filter), Some(next), _) => //FindObservable
if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]] /* aggregate AggregateObservable*/
case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] /* mapReduce MapReduceObservable*/
case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]] /* list collection */
case ListCollection(dbName) => //ListConllectionObservable
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] } } //T => Completed, result.UpdateResult, result.DeleteResult
def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = mgoUpdateObservable[T](ctx).toFuture() def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = { log.info(s"mgoUpdateObservable> MGOContext: ${ctx}") val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) if ( ctx.action == None) { log.error(s"mgoUpdateObservable> uery action cannot be null!") throw new IllegalArgumentException("query action cannot be null!") } ctx.action.get match { /* insert */
case Insert(docs, Some(opt)) => //SingleObservable[Completed]
if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]] else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]] case Insert(docs, None) => //SingleObservable
if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]] else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]] /* delete */
case Delete(filter, None, onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]] else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]] case Delete(filter, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]] else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]] /* replace */
case Replace(filter, replacement, None) => //SingleObservable
coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]] case Replace(filter, replacement, Some(opt)) => //SingleObservable
coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]] /* update */
case Update(filter, update, None, onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]] else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]] case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]] else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]] /* bulkWrite */
case BulkWrite(commands, None) => //SingleObservable
coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]] case BulkWrite(commands, Some(opt)) => //SingleObservable
coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]] } } def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): SingleObservable[Completed] = { log.info(s"mgoAdmin> MGOContext: ${ctx}") val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) if ( ctx.action == None) { log.error(s"mgoAdmin> uery action cannot be null!") throw new IllegalArgumentException("query action cannot be null!") } ctx.action.get match { /* drop collection */
case DropCollection(collName) => //SingleObservable
val coll = db.getCollection(collName) coll.drop() /* create collection */
case CreateCollection(collName, None) => //SingleObservable
db.createCollection(collName) case CreateCollection(collName, Some(opt)) => //SingleObservable
db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]) /* list collection case ListCollection(dbName) => //ListConllectionObservable client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] */
/* create view */
case CreateView(viewName, viewOn, pline, None) => //SingleObservable
db.createView(viewName, viewOn, pline) case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]) /* create index */
case CreateIndex(key, None) => //SingleObservable
coll.createIndex(key).asInstanceOf[SingleObservable[Completed]] case CreateIndex(key, Some(opt)) => //SingleObservable
coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[SingleObservable[Completed]] /* drop index */
case DropIndexByName(indexName, None) => //SingleObservable
coll.dropIndex(indexName) case DropIndexByName(indexName, Some(opt)) => //SingleObservable
coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]) case DropIndexByKey(key, None) => //SingleObservable
coll.dropIndex(key) case DropIndexByKey(key, Some(opt)) => //SingleObservable
coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]) case DropAllIndexes(None) => //SingleObservable
coll.dropIndexes() case DropAllIndexes(Some(opt)) => //SingleObservable
coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]) } } /* def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = { val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) ctx.action match { /* count */
case Count(Some(filter), Some(opt)) => //SingleObservable
coll.countDocuments(filter, opt.asInstanceOf[CountOptions]) .toFuture().asInstanceOf[Future[T]] case Count(Some(filter), None) => //SingleObservable
coll.countDocuments(filter).toFuture() .asInstanceOf[Future[T]] case Count(None, None) => //SingleObservable
coll.countDocuments().toFuture() .asInstanceOf[Future[T]] /* distinct */
case Distict(field, Some(filter)) => //DistinctObservable
coll.distinct(field, filter).toFuture() .asInstanceOf[Future[T]] case Distict(field, None) => //DistinctObservable
coll.distinct((field)).toFuture() .asInstanceOf[Future[T]] /* find */
case Find(None, None, optConv, false) => //FindObservable
if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]] else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]] case Find(None, None, optConv, true) => //FindObservable
if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]] else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]] case Find(Some(filter), None, optConv, false) => //FindObservable
if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]] case Find(Some(filter), None, optConv, true) => //FindObservable
if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]] else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]] case Find(None, Some(next), optConv, _) => //FindObservable
if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]] case Find(Some(filter), Some(next), optConv, _) => //FindObservable
if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]] /* aggregate AggregateObservable*/
case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] /* mapReduce MapReduceObservable*/
case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]] /* insert */
case Insert(docs, Some(opt)) => //SingleObservable[Completed]
if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture() .asInstanceOf[Future[T]] else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture() .asInstanceOf[Future[T]] case Insert(docs, None) => //SingleObservable
if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]] else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]] /* delete */
case Delete(filter, None, onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]] else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]] case Delete(filter, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] /* replace */
case Replace(filter, replacement, None) => //SingleObservable
coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]] case Replace(filter, replacement, Some(opt)) => //SingleObservable
coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] /* update */
case Update(filter, update, None, onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]] else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]] case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] /* bulkWrite */
case BulkWrite(commands, None) => //SingleObservable
coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]] case BulkWrite(commands, Some(opt)) => //SingleObservable
coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] /* drop collection */
case DropCollection(collName) => //SingleObservable
val coll = db.getCollection(collName) coll.drop().toFuture().asInstanceOf[Future[T]] /* create collection */
case CreateCollection(collName, None) => //SingleObservable
db.createCollection(collName).toFuture().asInstanceOf[Future[T]] case CreateCollection(collName, Some(opt)) => //SingleObservable
db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]] /* list collection */
case ListCollection(dbName) => //ListConllectionObservable
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] /* create view */
case CreateView(viewName, viewOn, pline, None) => //SingleObservable
db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]] case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]] /* create index */
case CreateIndex(key, None) => //SingleObservable
coll.createIndex(key).toFuture().asInstanceOf[Future[T]] case CreateIndex(key, Some(opt)) => //SingleObservable
coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]] /* drop index */
case DropIndexByName(indexName, None) => //SingleObservable
coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]] case DropIndexByName(indexName, Some(opt)) => //SingleObservable
coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] case DropIndexByKey(key, None) => //SingleObservable
coll.dropIndex(key).toFuture().asInstanceOf[Future[T]] case DropIndexByKey(key, Some(opt)) => //SingleObservable
coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] case DropAllIndexes(None) => //SingleObservable
coll.dropIndexes().toFuture().asInstanceOf[Future[T]] case DropAllIndexes(Some(opt)) => //SingleObservable
coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] } } */ def mongoStream(ctx: MGOContext)( implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = { log.info(s"mongoStream> MGOContext: ${ctx}") val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) if ( ctx.action == None) { log.error(s"mongoStream> uery action cannot be null!") throw new IllegalArgumentException("query action cannot be null!") } ctx.action.get match { case DocumentStream(None, None) => MongoSource(coll.find()) case DocumentStream(Some(filter), None) => MongoSource(coll.find(filter)) case DocumentStream(None, Some(next)) => MongoSource(next(coll.find())) case DocumentStream(Some(filter), Some(next)) => MongoSource(next(coll.find(filter))) } } } object MongoActionStream { import MGOContext._ case class StreamingInsert[A](dbName: String, collName: String, converter: A => Document, parallelism: Int = 1 ) extends MGOCommands case class StreamingDelete[A](dbName: String, collName: String, toFilter: A => Bson, parallelism: Int = 1, justOne: Boolean = false ) extends MGOCommands case class StreamingUpdate[A](dbName: String, collName: String, toFilter: A => Bson, toUpdate: A => Bson, parallelism: Int = 1, justOne: Boolean = false ) extends MGOCommands case class InsertAction[A](ctx: StreamingInsert[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] = Flow[A].map(ctx.converter) .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc)) } case class UpdateAction[A](ctx: StreamingUpdate[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) { Flow[A] .mapAsync(ctx.parallelism)(a => collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) } else Flow[A] .mapAsync(ctx.parallelism)(a => collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) } case class DeleteAction[A](ctx: StreamingDelete[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
if (ctx.justOne) { Flow[A] .mapAsync(ctx.parallelism)(a => collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a)) } else Flow[A] .mapAsync(ctx.parallelism)(a => collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a)) } } object MGOHelpers { implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] { override val converter: (Document) => String = (doc) => doc.toJson } implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] { override val converter: (C) => String = (doc) => doc.toString } trait ImplicitObservable[C] { val observable: Observable[C] val converter: (C) => String def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds) def headResult() = Await.result(observable.head(), 10 seconds) def printResults(initial: String = ""): Unit = { if (initial.length > 0) print(initial) results().foreach(res => println(converter(res))) } def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}") } def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = { Await.result(fut, timeOut) } def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { Await.result(fut, timeOut) } import monix.eval.Task import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A]) { def asTask: Task[A] = Task.deferFuture[A](x) } final class TaskToFuture[A](x: => Task[A]) { def asFuture: Future[A] = x.runAsync } }
MgoProtoConversion.scala
package sdp.mongo.engine import org.mongodb.scala.bson.collection.immutable.Document import org.bson.conversions.Bson import sdp.grpc.services._ import protobuf.bytes.Converter._ import com.google.protobuf.ByteString import MGOContext._ import MGOAdmins._ import MGOCommands._ import org.bson.BsonDocument import org.bson.codecs.configuration.CodecRegistry import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY import org.mongodb.scala.FindObservable object MgoProtoConvertion { /* org.mongodb.scala.FindObservable import com.mongodb.async.client.FindIterable val resultDocType = FindIterable[Document] val resultOption = FindObservable(resultDocType) .maxScan(...) .limit(...) .sort(...) .project(...) */ type FOD_TYPE = Int val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult] //Sets a document describing the fields to return for all matching documents
val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult] //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult] //Sets the cursor type
val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult] //Sets the hint for which index to use. A null value means no hint is set
val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult] //Sets the exclusive upper bound for a specific index. A null value means no max is set
val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult] //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult] //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult] //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
case class ResultOptions( optType: FOD_TYPE, bsonParam: Option[Bson] = None, valueParam: Int = 0 ){ def toProto = new sdp.grpc.services.ResultTransformer( optType = this.optType, bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))}, valueParam = this.valueParam ) def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => { optType match { case FOD_FIRST => find case FOD_FILTER => find.filter(bsonParam.get) case FOD_LIMIT => find.limit(valueParam) case FOD_SKIP => find.skip(valueParam) case FOD_PROJECTION => find.projection(bsonParam.get) case FOD_SORT => find.sort(bsonParam.get) case FOD_PARTIAL => find.partial(valueParam != 0) case FOD_CURSORTYPE => find case FOD_HINT => find.hint(bsonParam.get) case FOD_MAX => find.max(bsonParam.get) case FOD_MIN => find.min(bsonParam.get) case FOD_RETURNKEY => find.returnKey(valueParam != 0) case FOD_SHOWRECORDID => find.showRecordId(valueParam != 0) } } } object ResultOptions { def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions( optType = msg.optType, bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)), valueParam = msg.valueParam ) } type MGO_COMMAND_TYPE = Int val MGO_COMMAND_FIND = 0 val MGO_COMMAND_COUNT = 20 val MGO_COMMAND_DISTICT = 21 val MGO_COMMAND_DOCUMENTSTREAM = 1 val MGO_COMMAND_AGGREGATE = 2 val MGO_COMMAND_INSERT = 3 val MGO_COMMAND_DELETE = 4 val MGO_COMMAND_REPLACE = 5 val MGO_COMMAND_UPDATE = 6 val MGO_ADMIN_DROPCOLLECTION = 8 val MGO_ADMIN_CREATECOLLECTION = 9 val MGO_ADMIN_LISTCOLLECTION = 10 val MGO_ADMIN_CREATEVIEW = 11 val MGO_ADMIN_CREATEINDEX = 12 val MGO_ADMIN_DROPINDEXBYNAME = 13 val MGO_ADMIN_DROPINDEXBYKEY = 14 val MGO_ADMIN_DROPALLINDEXES = 15
case class MGOAdminCtx( tarName: String = "", bsonParam: Seq[Bson] = Nil, options: Option[Any] = None, objName: String = "" ){ def toProto = sdp.grpc.services.MGOAdminOptons( tarName = this.tarName, bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))}, objName = this.objName, options = this.options.map(b => OptionAny(marshal(b))) ) } object MGOAdminCtx { def fromProto(msg: sdp.grpc.services.MGOAdminOptons) = new MGOAdminCtx( tarName = msg.tarName, bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)) ) } case class MGOProtoMsg( dbName: String = "", collName: String = "", commandType: MGO_COMMAND_TYPE, bsonParam: Seq[Bson] = Nil, resultOptions: Seq[ResultOptions] = Nil, options: ByteString = com.google.protobuf.ByteString.EMPTY, documents: Seq[Document] = Nil, only: Boolean = false, adminOptions: Option[MGOAdminCtx] = None ){ def toProto = new sdp.grpc.services.MGOOperations( dbName = this.dbName, collName = this.collName, commandType = this.commandType, bsonParam = this.bsonParam.map(bsonToProto), resultOptions = this.resultOptions.map(_.toProto), documents = this.documents.map(d => sdp.grpc.services.MGODocument(marshal(d))), only = Some(this.only), adminOptions = this.adminOptions.map(_.toProto) ) } object MGOProtoMsg { def fromProto(msg: sdp.grpc.services.MGOOperations) = new MGOProtoMsg( dbName = msg.dbName, collName = msg.collName, commandType = msg.commandType, bsonParam = msg.bsonParam.map(protoToBson), resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)) ) } def bsonToProto(bson: Bson) = MGOBson(marshal(bson.toBsonDocument( classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY))) def protoToBson(proto: MGOBson): Bson = new Bson { val bsdoc = unmarshal[BsonDocument](proto.bson) override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc } def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match { case MGO_COMMAND_FIND => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_QUERY, action = Some(Find()) ) def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj => rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b)) (proto.bsonParam, proto.resultOptions, proto.only) match { case (Nil, Nil, None) => ctx case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b)) case (bp,Nil,None) => ctx.setCommand( Find(Some(protoToBson(bp.head)),None,false)) case (bp,Nil,Some(b)) => ctx.setCommand( Find(Some(protoToBson(bp.head)),None,b)) case (bp,fo,None) => { ctx.setCommand( Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false)) } case (bp,fo,Some(b)) => { ctx.setCommand( Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b)) } case _ => ctx } } } }
BytesConverter.scala
package protobuf.bytes import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream} import com.google.protobuf.ByteString object Converter { def marshal(value: Any): ByteString = { val stream: ByteArrayOutputStream = new ByteArrayOutputStream() val oos = new ObjectOutputStream(stream) oos.writeObject(value) oos.close() ByteString.copyFrom(stream.toByteArray()) } def unmarshal[A](bytes: ByteString): A = { val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray)) val value = ois.readObject() ois.close() value.asInstanceOf[A] } }
FindDemo.scala
package demo.sdp.mgo.localapp import akka.actor.ActorSystem import akka.stream.ActorMaterializer import org.mongodb.scala._ import scala.util._ import scala.collection.JavaConverters._ import sdp.mongo.engine._ import org.mongodb.scala.model._ import akka.stream.scaladsl.{Sink, Source} import org.bson.codecs.configuration.CodecRegistry import org.mongodb.scala.bson.{BsonDocument, BsonValue} import scalikejdbc._ import sdp.jdbc.engine._ import sdp.jdbc.config._ object ProtoTests extends App { import MGOContext._ import MGOEngine._ import MGOCommands._ import MongoActionStream._ import MgoProtoConvertion._ import org.mongodb.scala.model._ import Projections._ import Filters._ implicit val system = ActorSystem() implicit val mat = ActorMaterializer() implicit val ec = system.dispatcher val clientSettings: MongoClientSettings = MongoClientSettings.builder() .applyToClusterSettings {b => b.hosts(List(new ServerAddress("localhost")).asJava) }.build() implicit val client: MongoClient = MongoClient(clientSettings) val eqState = equal("state","California") val proj = exclude("rowid","_id") val rtxfmr = Seq( ResultOptions( optType = FOD_LIMIT, valueParam = 3) ,ResultOptions( optType = FOD_PROJECTION, bsonParam = Some(proj)) ) val protoCtx = MGOProtoMsg( dbName = "testdb", collName = "aqmrpt", commandType = MGO_COMMAND_FIND, bsonParam = Seq(eqState), resultOptions = rtxfmr ).toProto val findCtx = CtxFromProto(protoCtx) val futFind = mgoQuery[Seq[Document]](findCtx) futFind.onComplete { case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())} case Failure(e) => println(e.getMessage) } scala.io.StdIn.readLine() system.terminate() }