SDP(9):MongoDB-Scala - data access and modeling

    MongoDB是一種文件型數據庫,對數據格式沒有硬性要求,因此能夠實現靈活多變的數據存儲和讀取。MongoDB又是一種分佈式數據庫,與傳統關係數據庫不一樣的是,分佈式數據庫不支持table-join,因此在設計數據庫表結構方面與關係數據庫有很大的不一樣。分佈式數據庫有一套與傳統觀念不一樣的數據模式,在設計庫表結構時必須從知足各類數據抽取的須要爲主要目的。關係數據庫設計要求遵循範式模式(normalization)庫表結構,在抽取數據時再經過table-join聯結關係表。由於分佈式數據庫不支持table-join,在讀取跨表數據時就須要屢次抽取,影響數據處理的效率。MongoDB做爲文件型數據庫最大的特色就是允許嵌入Document:咱們能夠把相關聯的Document嵌入在另外一個關聯Document中,這樣就能夠一次性讀取所有數據,實現反範式(denormalization)的數據模式了。這方面MongoDB比Cassandra更加優勝。MongoDB支持靈活多樣的索引方式,使它成爲提供高效數據讀取的分佈式數據庫最佳選擇。另外,MongoDB還經過提供sort、aggregation、map-reduce來支持豐富強大的大數據統計功能。java

   在使用MongoDB前咱們必須熟悉它的數據模式和設計理念:在大數據時代的今天,數據的產生和使用發生了質的變化,傳統關係數據庫數據模式已經沒法知足現代信息系統的要求。好比,在設計我的信息表時要考慮有些人有兩個地址,有些甚至沒有地址,又有些有傳真號,還有這個那個的其它特色等等。在關係數據庫模式設計中咱們必須做出取捨,犧牲一些屬性。但MongoDB的文件類數據庫特色允許不一樣的數據格式,能實現完整的數據採集與儲存。下面是一個採購單的Document設計:mongodb

  val po1 = Document ( "ponum" -> "po18012301", "vendor" -> "The smartphone compay", "podate" -> podate1, "remarks" -> "urgent, rush order", "handler" -> pic, "podtl" -> Seq( Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") ) ) val po2 = Document ( "ponum" -> "po18022002", "vendor" -> "The Samsung compay", "podate" -> podate2, "podtl" -> Seq( Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") ) )

po1和po2都在podtl鍵嵌入了多條採購項目Document。首先,po1與po2有結構上的不一樣:po1多出了remarks、handler這兩個鍵。嵌入的Document各自也有不一樣的結構。在這個例子裏我特別加了date、binary、array類型的使用示範:數據庫

  val ca = Calendar.getInstance() ca.set(2011,10,23) val podate1 = ca.getTime ca.set(2012,12,23) val podate2 = ca.getTime val pic = FileToByteArray("/users/tiger-macpro/sample.png",3 seconds)

MongoDB的Date是java.util.Date,能夠用Calendar來操做。再看看下面類型轉換中的數據類型對應: app

 

  case class PO ( ponum: String, podate: java.util.Date, vendor: String, remarks: Option[String], podtl: Option[BsonArray], handler: Option[BsonBinary] ) def toPO(doc: Document): PO = { val ks = doc.keySet PO( ponum = doc.getString("ponum"), podate = doc.getDate("podate"), vendor = doc.getString("vendor"), remarks = { if (ks.contains("remarks")) Some(doc.getString("remarks")) else None }, podtl = { if (ks.contains("podtl")) doc.get("podtl").asInstanceOf[Option[BsonArray]] else None }, handler = { if (ks.contains("handler")) doc.get("handler").asInstanceOf[Option[BsonBinary]] else None } ) } case class PODTL( item: String, price: Double, qty: Int, packing: Option[String], payTerm: Option[String] ) def toPODTL(podtl: Document): PODTL = { val ks = podtl.keySet PODTL( item = podtl.getString("item"), price = podtl.getDouble("price"), qty = podtl.getInteger("qty"), packing = { if (ks.contains("packing")) Some(podtl.getString("packing")) else None }, payTerm = { if(ks.contains("payterm")) Some(podtl.getString("payterm")) else None } ) }

注意BsonBinary和BsonArray這兩個類型和它們的使用方法。咱們能夠用嵌入Document的鍵做爲查詢條件:iphone

   poCollection.find(equal("podtl.qty",100)).toFuture().onComplete { case Success(docs) => docs.map(toPO).foreach (showPO) println("-------------------------------") case Failure(e) => println(e.getMessage) }

咱們能夠用toPO和toPODTL把po,podtl對應到case class,而後用強類型方式來使用它們:數據庫設計

   def showPO(po: PO) = { println(s"po number: ${po.ponum}") println(s"po date: ${po.podate.toString}") println(s"vendor: ${po.vendor}") if (po.remarks != None) println(s"remarks: ${po.remarks.get}") po.podtl match { case Some(barr) => val docs = barr.getValues.asScala.toList docs.map { dc => toPODTL(dc.asInstanceOf[org.bson.BsonDocument]) }.foreach { doc: PODTL => print(s"==>Item: ${doc.item} ") print(s"price: ${doc.price} ") print(s"qty: ${doc.qty} ") doc.packing.foreach(pk => print(s"packing: ${pk} ")) doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) println("") } case _ => } po.handler match { case Some(bs) => val fileName = s"/users/tiger-macpro/${po.ponum}.png" ByteArrayToFile(bs.getData,fileName) println(s"picture saved to ${fileName}") case None => println("no picture provided") } } poCollection.find(equal("podtl.qty",100)).toFuture().onComplete { case Success(docs) => docs.map(toPO).foreach (showPO) println("------------------------------") case Failure(e) => println(e.getMessage) } poCollection.find().toFuture().onComplete { case Success(docs) => docs.map(toPO).foreach (showPO) println("-------------------------------") case Failure(e) => println(e.getMessage) }

試運行顯示結果以下:分佈式

po number: po18022002 po date: Wed Jan 23 11:57:50 HKT 2013 vendor: The Samsung compay ==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard ==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days ==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury no picture provided ------------------------------- po number: po18012301 po date: Wed Nov 23 11:57:50 HKT 2011 vendor: The smartphone compay remarks: urgent, rush order ==>Item: sony smartphone price: 2389.0 qty: 1239 packing: standard ==>Item: ericson smartphone price: 897.0 qty: 1000 payTerm: 30 days picture saved to /users/tiger-macpro/po18012301.png po number: po18022002 po date: Wed Jan 23 11:57:50 HKT 2013 vendor: The Samsung compay ==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard ==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days ==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury no picture provided ------------------------------

下面是本次示範的源代碼:ide

build.sbt大數據

name := "learn-mongo" version := "0.1" scalaVersion := "2.12.4" libraryDependencies := Seq( "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17" )

FileStreaming.scalaui

import java.nio.file.Paths import akka.stream.{Materializer} import akka.stream.scaladsl.{FileIO, StreamConverters} import scala.concurrent.{Await} import akka.util._ import scala.concurrent.duration._ object FileStreaming { def FileToByteBuffer(fileName: String, timeOut: FiniteDuration)( implicit mat: Materializer):ByteBuffer = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toByteBuffer } def FileToByteArray(fileName: String, timeOut: FiniteDuration)( implicit mat: Materializer): Array[Byte] = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toArray } def FileToInputStream(fileName: String, timeOut: FiniteDuration)( implicit mat: Materializer): InputStream = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } val buf = (Await.result(fut, timeOut)).toArray new ByteArrayInputStream(buf) } def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( implicit mat: Materializer) = { val ba = new Array[Byte](byteBuf.remaining()) byteBuf.get(ba,0,ba.length) val baInput = new ByteArrayInputStream(ba) val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName))) } def ByteArrayToFile(bytes: Array[Byte], fileName: String)( implicit mat: Materializer) = { val bb = ByteBuffer.wrap(bytes) val baInput = new ByteArrayInputStream(bytes) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
    source.runWith(FileIO.toPath(Paths.get(fileName))) } def InputStreamToFile(is: InputStream, fileName: String)( implicit mat: Materializer) = { val source = StreamConverters.fromInputStream(() => is) source.runWith(FileIO.toPath(Paths.get(fileName))) } }

MongoScala103.scala

import akka.actor.ActorSystem import akka.stream.ActorMaterializer import java.util.Calendar import org.bson.BsonBinary import scala.util._ import FileStreaming._ import scala.concurrent.duration._ import org.mongodb.scala._ import org.mongodb.scala.bson.{BsonArray, BsonDocument} import scala.collection.JavaConverters._ import org.mongodb.scala.connection.ClusterSettings import org.mongodb.scala.model.Filters._ object MongoScala103 extends App { import Helpers._ val clusterSettings = ClusterSettings.builder() .hosts(List(new ServerAddress("localhost:27017")).asJava).build() val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() val client = MongoClient(clientSettings) implicit val system = ActorSystem() implicit val mat = ActorMaterializer() implicit val ec = system.dispatcher val db: MongoDatabase = client.getDatabase("testdb") val poOrgCollection: MongoCollection[Document] = db.getCollection("po") poOrgCollection.drop.headResult() val poCollection: MongoCollection[Document] = db.getCollection("po") val ca = Calendar.getInstance() ca.set(2011,10,23) val podate1 = ca.getTime ca.set(2012,12,23) val podate2 = ca.getTime val pic = FileToByteArray("/users/tiger-macpro/sample.png",3 seconds) val po1 = Document ( "ponum" -> "po18012301", "vendor" -> "The smartphone compay", "podate" -> podate1, "remarks" -> "urgent, rush order", "handler" -> pic, "podtl" -> Seq( Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") ) ) val po2 = Document ( "ponum" -> "po18022002", "vendor" -> "The Samsung compay", "podate" -> podate2, "podtl" -> Seq( Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") ) ) poCollection.insertMany(Seq(po1,po2)).headResult() case class PO ( ponum: String, podate: java.util.Date, vendor: String, remarks: Option[String], podtl: Option[BsonArray], handler: Option[BsonBinary] ) def toPO(doc: Document): PO = { val ks = doc.keySet PO( ponum = doc.getString("ponum"), podate = doc.getDate("podate"), vendor = doc.getString("vendor"), remarks = { if (ks.contains("remarks")) Some(doc.getString("remarks")) else None }, podtl = { if (ks.contains("podtl")) doc.get("podtl").asInstanceOf[Option[BsonArray]] else None }, handler = { if (ks.contains("handler")) doc.get("handler").asInstanceOf[Option[BsonBinary]] else None } ) } case class PODTL( item: String, price: Double, qty: Int, packing: Option[String], payTerm: Option[String] ) def toPODTL(podtl: Document): PODTL = { val ks = podtl.keySet PODTL( item = podtl.getString("item"), price = podtl.getDouble("price"), qty = podtl.getInteger("qty"), packing = { if (ks.contains("packing")) Some(podtl.getString("packing")) else None }, payTerm = { if(ks.contains("payterm")) Some(podtl.getString("payterm")) else None } ) } def showPO(po: PO) = { println(s"po number: ${po.ponum}") println(s"po date: ${po.podate.toString}") println(s"vendor: ${po.vendor}") if (po.remarks != None) println(s"remarks: ${po.remarks.get}") po.podtl match { case Some(barr) => val docs = barr.getValues.asScala.toList docs.map { dc => toPODTL(dc.asInstanceOf[org.bson.BsonDocument]) }.foreach { doc: PODTL => print(s"==>Item: ${doc.item} ") print(s"price: ${doc.price} ") print(s"qty: ${doc.qty} ") doc.packing.foreach(pk => print(s"packing: ${pk} ")) doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) println("") } case _ => } po.handler match { case Some(bs) => val fileName = s"/users/tiger-macpro/${po.ponum}.png" ByteArrayToFile(bs.getData,fileName) println(s"picture saved to ${fileName}") case None => println("no picture provided") } } poCollection.find().toFuture().onComplete { case Success(docs) => docs.map(toPO).foreach (showPO) println("------------------------------") case Failure(e) => println(e.getMessage) } poCollection.find(equal("podtl.qty",100)).toFuture().onComplete { case Success(docs) => docs.map(toPO).foreach (showPO) println("-------------------------------") case Failure(e) => println(e.getMessage) } scala.io.StdIn.readLine() system.terminate() }
相關文章
相關標籤/搜索