在雲計算的推進下,軟件系統發展趨於平臺化。雲平臺系統通常都是分佈式的集羣系統,採用大數據技術。在這方面akka提供了比較完整的開發技術支持。我在上一個系列有關CQRS的博客中按照實際應用的要求對akka的一些開發技術進行了介紹。CQRS模式着重操做流程控制,主要涉及交易數據的管理。那麼,做爲交易數據產生過程當中發揮驗證做用的一系列基礎數據如用戶信息、商品信息、支付類型信息等又應該怎樣維護呢?首先基礎數據也應該是在平臺水平上的,但數據的採集、維護是在系統前端的,好比一些web界面。因此平臺基礎數據維護系統是一套先後臺結合的系統。對於一個開放的平臺系統來講,應該可以適應各式各樣的前端系統。通常來說,平臺經過定義一套api與前端系統集成是通用的方法。這套api必須遵循行業標準,技術要普及通用,這樣才能支持各類異類前端系統功能開發。在這些要求背景下,相對gRPC, GraphQL來講,REST風格的http集成模式能獲得更多開發人員的接受。前端
在有關CQRS系列博客裏,我以akka-http做爲系統集成工具的一種,零星地針對實際須要對http通訊進行了介紹。在restapi這個系列裏我想系統化的用akka-http構建一套完整的,REST風格數據維護和數據交換api,除CRUD以外還包括網絡安全,文件交換等功能。個人計劃是用akka-http搭建一個平臺數據維護api的REST-CRUD框架,包含全部標配功能如用戶驗證、異常處理等。CRUD部分要儘可能作成通用的generic,框架型的,能用一套標準的方法對任何數據表進行操做。web
akka-http是一套http程序開發工具。它的Routing-DSL及數據序列化marshalling等都功能強大。特別是HttpResponse處理,一句complete解決了一大堆問題,magnet-pattern結合marshalling讓它的使用更加方便。編程
在這篇討論裏先搭一個restapi的基本框架,包括客戶端身份驗證和使用權限。主要是示範如何達到通用框架的目的。這個在akka-http編程裏主要體如今Routing-DSL的結構上,要求Route可以簡潔易懂,以下:json
val route = path("auth") { authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo => post { complete(authenticator.issueJwt(userinfo))} } } ~ pathPrefix("api") { authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken => (path("hello") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ (path("how are you") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } // ~ ...
} }
我覺着這應該是框架型正確的方向:把全部功能都放在api下,通通通過權限驗證。能夠直接在後面不斷加功能Route。api
身份驗證和使用權限也應該是一套標準的東西,但身份驗證方法可能有所不一樣,特別是用戶身份驗證多是經過獨立的身份驗證服務器實現的,對不一樣的驗證機制應該有針對性的定製函數。構建身份管理的對象應該很方便或者很通用,以下:安全
val authenticator = new AuthBase() .withAlgorithm(JwtAlgorithm.HS256) .withSecretKey("OpenSesame") .withUserFunc(getValidUser)
AuthBase源碼以下:服務器
package com.datatech.restapi import akka.http.scaladsl.server.directives.Credentials import pdi.jwt._ import org.json4s.native.Json import org.json4s._ import org.json4s.jackson.JsonMethods._ import pdi.jwt.algorithms._ import scala.util._ object AuthBase { type UserInfo = Map[String, Any] case class AuthBase( algorithm: JwtAlgorithm = JwtAlgorithm.HMD5, secret: String = "OpenSesame", getUserInfo: Credentials => Option[UserInfo] = null) { ctx => def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo) def withSecretKey(key: String): AuthBase = ctx.copy(secret = key) def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f) def authenticateToken(credentials: Credentials): Option[String] = credentials match { case Credentials.Provided(token) => algorithm match { case algo: JwtAsymmetricAlgorithm => Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match { case true => Some(token) case _ => None } case _ => Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match { case true => Some(token) case _ => None } } case _ => None } def getUserInfo(token: String): Option[UserInfo] = { algorithm match { case algo: JwtAsymmetricAlgorithm => Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match { case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo]) case Failure(err) => None } case _ => Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match { case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo]) case Failure(err) => None } } } def issueJwt(userinfo: UserInfo): String = { val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo)) Jwt.encode(claims, secret, algorithm) } } }
我已經把多個通用的函數封裝在裏面了。再模擬一個用戶身份管理對象:網絡
package com.datatech.restapi import akka.http.scaladsl.server.directives.Credentials import AuthBase._ object MockUserAuthService { case class User(username: String, password: String, userInfo: UserInfo) val validUsers = Seq(User("johnny", "p4ssw0rd",Map("shopid" -> "1101", "userid" -> "101")) ,User("tiger", "secret", Map("shopid" -> "1101" , "userid" -> "102"))) def getValidUser(credentials: Credentials): Option[UserInfo] = credentials match { case p @ Credentials.Provided(_) => validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match { case Some(user) => Some(user.userInfo) case _ => None } case _ => None } }
好了,服務端示範代碼中能夠直接構建或者調用這些標準的類型了:框架
package com.datatech.restapi import akka.actor._ import akka.stream._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import pdi.jwt._ import AuthBase._ import MockUserAuthService._ object RestApiServer extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val authenticator = new AuthBase() .withAlgorithm(JwtAlgorithm.HS256) .withSecretKey("OpenSesame") .withUserFunc(getValidUser) val route = path("auth") { authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo => post { complete(authenticator.issueJwt(userinfo))} } } ~ pathPrefix("api") { authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken => (path("hello") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } ~ (path("how are you") & get) { complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}") } // ~ ...
} } val (port, host) = (50081,"192.168.11.189") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }
就是說後面的http功能能夠直接插進這個框架,精力能夠徹底聚焦於具體每項功能的開發上了。分佈式
而後用下面的客戶端測試代碼:
import akka.actor._ import akka.stream._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers._ import scala.concurrent._ import akka.http.scaladsl.model._ import pdi.jwt._ import org.json4s._ import org.json4s.jackson.JsonMethods._ import scala.util._ import scala.concurrent.duration._ object RestApiClient { type UserInfo = Map[String,Any] def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/") val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd")) val authRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.11.189:50081/auth", headers = List(authorization) ) val futToken: Future[HttpResponse] = Http().singleRequest(authRequest) val respToken = for { resp <- futToken jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String} } yield jstr val jstr = Await.result[String](respToken,2 seconds) println(jstr) scala.io.StdIn.readLine() val parts = Jwt.decodeRawAll(jstr, "OpenSesame", Seq(JwtAlgorithm.HS256)) match { case Failure(exception) => println(s"Error: ${exception.getMessage}") case Success(value) => println(((parse(value._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo]) } scala.io.StdIn.readLine() val authentication = headers.Authorization(OAuth2BearerToken(jstr)) val apiRequest = HttpRequest( HttpMethods.GET, uri = "http://192.168.11.189:50081/api/hello", ).addHeader(authentication) val futAuth: Future[HttpResponse] = Http().singleRequest(apiRequest) println(Await.result(futAuth,2 seconds)) scala.io.StdIn.readLine() system.terminate() } }
build.sbt
name := "restapi" version := "0.1" scalaVersion := "2.12.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream" % "2.5.23", "com.pauldijou" %% "jwt-core" % "3.0.1", "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0", "org.json4s" %% "json4s-native" % "3.6.1", "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", "org.slf4j" % "slf4j-simple" % "1.7.25", "org.json4s" %% "json4s-jackson" % "3.6.7", "org.json4s" %% "json4s-ext" % "3.6.7" )