restapi(2)- generic restful CRUD:通用的restful風格數據庫表維護工具

   研究關於restapi的初衷是想搞一套通用的平臺數據表維護http工具。前面談過身份驗證和使用權限、文件的上傳下載,此次來到具體的數據庫表維護。咱們在這篇示範裏設計一套通用的對平臺每個數據表的標準維護方式。http服務端數據表維護CRUD有幾個標準的部分組成:Model,Repository,Route。咱們先看看這幾個類型的基類:java

trait ModelBase[M,E] { def to: M => E def from: E => M } trait RepoBase[M] { def getById(id: Long) : Future[Option[M]] def getAll : Future[Seq[M]] def filter(expr: M => Boolean): Future[Seq[M]] def save(row: M) : Future[AnyRef] def deleteById(id: Long) : Future[Int] def updateById(id: Long, row: M) : Future[Int] } abstract class RouteBase[M](val pathName: String, repository: RepoBase[M])( implicit m: Manifest[M]) extends Directives with JsonConverter { val route = path(pathName) { get { complete(futureToJson(repository.getAll)) } ~ post { entity(as[String]) { json => val extractedEntity = fromJson[M](json) complete(futureToJson(repository.save(extractedEntity))) } } } ~ path(pathName / LongNumber) { id =>
    get { complete(futureToJson(repository.getById(id))) } ~ put { entity(as[String]) { json => val extractedEntity = fromJson[M](json) complete(futureToJsonAny(repository.updateById(id, extractedEntity))) } } ~ delete { complete(futureToJsonAny(repository.deleteById(id))) } } }

很明顯,Model是數據庫錶行類型的表達方式、Repository是數據庫表操做方法、Route是操做方法的調用。下面是這幾個類型的實例示範:sql

object MockModels { case class DataRow ( name: String, age: Int ) case class Person(name: String, age: Int) extends ModelBase[Person,DataRow] { def to: Person => DataRow = p => DataRow ( name = p.name, age = p.age ) def from: DataRow => Person = m => Person( name = m.name, age = m.age ) } } package com.datatech.restapi import MockModels._ import scala.concurrent.Future object MockRepo { class PersonRepo extends RepoBase[Person] { override def getById(id: Long): Future[Option[Person]] = Future.successful(Some(Person("johnny lee",23))) override def getAll: Future[Seq[Person]] = Future.successful( Seq(Person("jonny lee",23),Person("candy wang",45),Person("jimmy kowk",34)) ) override def filter(expr: Person => Boolean): Future[Seq[Person]] = Future.successful( Seq(Person("jonny lee",23),Person("candy wang",45),Person("jimmy kowk",34)) ) override def save(row: Person): Future[Person] = Future.successful(row) override def deleteById(id: Long): Future[Int] = Future.successful(1) override def updateById(id: Long, row: Person): Future[Int] = Future.successful(1) } } object PersonRoute { class PersonRoute(pathName: String, repo: RepoBase[Person]) extends RouteBase[Person](pathName,repo) val route = new PersonRoute("person",new PersonRepo).route }

Model表明數據表結構以及某種數據庫的錶行與Model之間的轉換。而repository則表明某種數據庫對庫表具體操做的實現。咱們把焦點拉回到RouteBase上來,這裏包含了rest標準的get,post,put,delete http操做。實際上就是request/response處理機制。由於數據須要在線上on-the-wire來回移動,因此須要進行數據轉換。通用的數據傳輸模式是:類->json->類,即序列化/反序列化。akka-http提供了豐富的Marshaller來實現自動的數據轉換,但在編譯時要提供Marshaller的隱式實例implicit instance,因此用類參數是沒法經過編譯的。只能手工進行類和json之間的轉換。json轉換是經過json4s實現的:數據庫

import java.text.SimpleDateFormat import akka.http.scaladsl.model._ import org.json4s.JsonAST.{JNull, JString} import org.json4s.{CustomSerializer, DefaultFormats, Formats} import org.json4s.jackson.Serialization import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future trait DateSerializer { case object SqlDateSerializer extends CustomSerializer[java.sql.Date](format => ( { case JString(date) => { val utilDate = new SimpleDateFormat("yyyy-MM-dd").parse(date); new java.sql.Date(utilDate.getTime) } case JNull         => null }, { case date: java.sql.Date => JString(date.toString) })) } trait JsonConverter extends DateSerializer { implicit val formats: Formats = new DefaultFormats { override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd") } ++ List(SqlDateSerializer) def toJson(obj: AnyRef): String = { Serialization.write(obj) } def futureToJson(obj: Future[AnyRef]): Future[HttpResponse] = { obj.map { x => HttpResponse(status = StatusCodes.OK, entity = HttpEntity(MediaTypes.`application/json`, Serialization.write(x))) }.recover { case ex => ex.printStackTrace(); HttpResponse(status = StatusCodes.InternalServerError) } } def futureToJsonAny(obj: Future[Any]): Future[HttpResponse] = { obj.map { x => HttpResponse(status = StatusCodes.OK, entity = HttpEntity(MediaTypes.`application/json`, s"""{status : ${x}""")) }.recover { case ex => HttpResponse(status = StatusCodes.InternalServerError) } } def fromJson[E](json: String)(implicit m: Manifest[E]): E = { Serialization.read[E](json) } }

固然對於一些特別的數據庫表,咱們仍是但願使用akka-http強大的功能,如streaming。這時對於每個這樣的表單就須要要定製Route了。下面是一個定製Route的例子:json

object MockModel { case class AddressRow ( province: String, city: String, street: String, zip: String ) case class Address( province: String, city: String, street: String, zip: String ) extends ModelBase[Address,AddressRow] { def to: Address => AddressRow = addr => AddressRow ( province = addr.province, city = addr.city, street = addr.street, zip = addr.zip ) def from: AddressRow => Address = row => Address( province = row.province, city = row.city, street = row.street, zip = row.zip ) } } object AddressRepo { def getById(id: Long): Future[Option[Address]] = ??? def getAll: Source[Address,_] = ??? def filter(expr: Address => Boolean): Future[Seq[Address]] = ??? def saveAll(rows: Source[Address,_]): Future[Int] = ??? def saveAll(rows: Future[Seq[Address]]): Future[Int] = ??? def deleteById(id: Long): Future[Address] = ??? def updateById(id: Long, row: Address): Future[Address] = ??? } package com.datatech.restapi import akka.actor._ import akka.stream._ import akka.http.scaladsl.common._ import spray.json.DefaultJsonProtocol import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.server._ import MockModels.Address import MockRepo._ trait FormatConverter extends SprayJsonSupport with DefaultJsonProtocol{ implicit val addrFormat = jsonFormat4(Address.apply) } case class AddressRoute(val pathName: String)(implicit akkaSys: ActorSystem) extends Directives with FormatConverter{ implicit val mat = ActorMaterializer() implicit val jsonStreamingSupport = EntityStreamingSupport.json() .withParallelMarshalling(parallelism = 2, unordered = false) val route = path(pathName) { get { complete(AddressRepo.getAll) } ~ post { withoutSizeLimit { entity(asSourceOf[Address]) { source =>
 /* val futSavedRows: Future[Seq[Address]] = source.runFold(Seq[Address]())((acc, addr) => acc :+ addr) onComplete(futSavedRows) { rows => */ onComplete(AddressRepo.saveAll(source)) {rows => complete { s"$rows address saved."} } } } } ~ path(pathName / LongNumber) { id =>
    get { complete(AddressRepo.getById(id))) } ~ put { entity(as[Address]) { addr => onComplete(AddressRepo.updateById(id,addr)) { addr => complete(s"address updated to: $addr") } } ~ delete { onComplete(AddressRepo.deleteById(id)) { addr => complete(s"address deleted: $addr") } } }

這樣作能夠靈活的使用akka-stream提供的功能。api

上面的例子Mock PersonRoute.route能夠直接貼在主route後面:app

  val route = path("auth") { authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo => post { complete(authenticator.issueJwt(userinfo))} } } ~ pathPrefix("openspace") { (path("hello") & get) { complete(s"Hello, you are in open space.") } } ~ pathPrefix("api") { authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken => (path("hello") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ (path("how are you") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ PersonRoute.route // ~ ...
 } }

和前面的示範同樣,咱們仍是寫一個客戶端來測試:ide

import akka.actor._ import akka.http.scaladsl.model.headers._ import scala.concurrent._ import scala.concurrent.duration._ import akka.http.scaladsl.Http import spray.json.DefaultJsonProtocol import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer trait JsonFormats extends SprayJsonSupport with DefaultJsonProtocol object JsonConverters extends JsonFormats { case class Person(name: String,age: Int) implicit val fmtPerson = jsonFormat2(Person) } object TestCrudClient { type UserInfo = Map[String,Any] def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/") val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd")) val authRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.11.189:50081/auth", headers = List(authorization) ) val futToken: Future[HttpResponse] = Http().singleRequest(authRequest) val respToken = for { resp <- futToken jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String} } yield jstr val jstr =  Await.result[String](respToken,2 seconds) println(jstr) scala.io.StdIn.readLine() val authentication = headers.Authorization(OAuth2BearerToken(jstr)) val getAllRequest = HttpRequest( HttpMethods.GET, uri = "http://192.168.11.189:50081/api/crud/person", ).addHeader(authentication) val futGet: Future[HttpResponse] = Http().singleRequest(getAllRequest) println(Await.result(futGet,2 seconds)) scala.io.StdIn.readLine() import JsonConverters._ val saveRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.11.189:50081/api/crud/person" ).addHeader(authentication) val futPost: Future[HttpResponse] =
      for { reqEntity <- Marshal(Person("tiger chan",18)).to[RequestEntity] response <- Http().singleRequest(saveRequest.copy(entity=reqEntity)) } yield response println(Await.result(futPost,2 seconds)) scala.io.StdIn.readLine() system.terminate() } }

下面是restapi發展到如今狀態的源代碼:工具

build.sbtpost

 

name := "restapi"

version := "0.3"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http"   % "10.1.8",
  "com.typesafe.akka" %% "akka-stream" % "2.5.23",
  "com.pauldijou" %% "jwt-core" % "3.0.1",
  "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
  "org.json4s" %% "json4s-native" % "3.6.1",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "org.json4s" %% "json4s-jackson" % "3.6.7",
  "org.json4s" %% "json4s-ext" % "3.6.7"
)

RestApiServer.scala測試

package com.datatech.restapi

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._

object RestApiServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher



  implicit val authenticator = new AuthBase()
    .withAlgorithm(JwtAlgorithm.HS256)
    .withSecretKey("OpenSesame")
    .withUserFunc(getValidUser)

  val route =
    path("auth") {
      authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
        post { complete(authenticator.issueJwt(userinfo))}
      }
    } ~
      pathPrefix("api") {
        authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            FileRoute(validToken)
              .route ~
            (pathPrefix("crud")) {
              PersonRoute.route
            }
          // ~ ...
        } ~
          (pathPrefix("crud")) {
            PersonRoute.route
            // ~ ...
          }
      }

  val (port, host) = (50081,"192.168.11.189")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()


  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())


}
相關文章
相關標籤/搜索