restapi做爲先後端交互的樞紐:面對大批量的前端請求,須要確保回覆的及時性。使用緩存是一項有效工具。咱們能夠把多數前端請求的回覆response存入緩存,特別是一些須要大量計算才能獲取的回覆值,更能夠大大提升後端的反應速度。值得慶幸的是akka-http已經提供了對緩存的支持,是基於java8 caffein的一套緩存操做工具包的。下面就介紹一下akka-http的caching。前端
akka-http caching 有個依賴:java
"com.typesafe.akka" %% "akka-http-caching" % akkaHttpVersion,
先從緩存存儲結構開始,看看下面的一段緩存結構定義:算法
import akka.http.scaladsl.util.FastFuture import akka.http.caching.scaladsl.Cache import akka.http.caching.scaladsl.CachingSettings import akka.http.caching.LfuCache val defaultCachingSettings = CachingSettings(sys) val lfuCacheSettings = //最少使用排除算法緩存
defaultCachingSettings.lfuCacheSettings .withInitialCapacity(128) //起始單位
.withMaxCapacity(1024) //最大單位
.withTimeToLive(1.hour) //最長存留時間
.withTimeToIdle(30.minutes) //最長未使用時間
val cachingSettings = defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings) //key -> String
val lfuCache: Cache[String, Option[Map[String, Any]]] = LfuCache(cachingSettings)
lfuCache是一種基於使用頻率算法的緩存管理系統,這個咱們就沒必要多說了。最好能拿個例子來示範解釋:恰好手頭有個獲取用戶信息的http請求樣板:sql
val route = pathPrefix(pathName) { pathPrefix("getuserinfo") { (get & parameter('userid)) { userid => {
val userinfo = posRepo.getUserInfo(userid) userinfo match { case Some(ui) => complete(toJson(ui)) case None => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } } } } def getUserInfo(userid: String): Option[UserInfo] = { val sql = "SELECT CUSTOMERS.SHOPID AS SHOPID, TERMINALID, DEVICEID, IMPSVCURL FROM CUSTOMERS INNER JOIN TERMINALS " +
" ON CUSTOMERS.SHOPID=TERMINALS.SHOPID " +
" WHERE (CUSTOMERS.DISABLED=0 AND TERMINALS.DISABLED=0) " +
" AND (CUSTOMERS.EXPDATE > GETDATE() AND TERMINALS.EXPDATE > GETDATE()) AND TERMINALID='" + userid + "'" val rows = query[Map[String, Any]]("mpos", sql, rsc.resultSet2Map) val futUI: Future[Option[Map[String, Any]]] = rows.runWith(Sink.lastOption) Await.result(futUI, 3 seconds) }
當收到前端 http://mycom.com/pos/getuserinfo?userid=1234 這樣的請求時須要從數據庫裏讀取用戶信息數據及進行一些轉換處理。這個請求調用得頻率較高、數據庫讀取也比較耗時,是個實在的例子。咱們來看看如何實現緩存管理:數據庫
在akka-http裏能夠用兩種方式來實現緩存管理:一、直接用cache工具,二、用akka-http提供的Directive: cache, alwaysCache後端
咱們先看看如何直接使用cache操做,先看看Cache的構建:api
abstract class Cache[K, V] extends akka.http.caching.javadsl.Cache[K, V] { cache =>
/** * Returns either the cached Future for the given key or evaluates the given value generating * function producing a `Future[V]`. */ def apply(key: K, genValue: () => Future[V]): Future[V]
Cache[K,V]是以K爲鍵,一個()=> Future[V]爲值的結構,也就是說咱們須要把一個獲取Future值的函數存在緩存裏:緩存
pathPrefix("getuserinfo") { (get & parameter('userid)) { userid => {
val userinfo = lfuCache.getOrLoad(userid, _ => posRepo.futureUserInfo(userid)) onComplete(userinfo) { _ match { case Success(oui) => oui match { case Some(ui) => complete(toJson(ui)) case None => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } case Failure(_) => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } } } } def futureUserInfo(userid: String): Future[Option[Map[String, Any]]] = { val sql = "SELECT CUSTOMERS.SHOPID AS SHOPID, TERMINALID, DEVICEID, IMPSVCURL FROM CUSTOMERS INNER JOIN TERMINALS " +
" ON CUSTOMERS.SHOPID=TERMINALS.SHOPID " +
" WHERE (CUSTOMERS.DISABLED=0 AND TERMINALS.DISABLED=0) " +
" AND (CUSTOMERS.EXPDATE > GETDATE() AND TERMINALS.EXPDATE > GETDATE()) AND TERMINALID='" + userid + "'" val rows = query[Map[String, Any]]("mpos", sql, rsc.resultSet2Map) rows.runWith(Sink.lastOption) }
首先咱們須要把getUserInfo修改爲futureUserInfo,而後傳入cache.getOrLoad():app
/** * Returns either the cached Future for the given key, or applies the given value loading * function on the key, producing a `Future[V]`. */ def getOrLoad(key: K, loadValue: K => Future[V]): Future[V]
跟着咱們再試試用akka-http的Directive, cache和alwaysCache。這兩個是同一個東西,只是cache多了個是否使用緩存這麼個控制,是經過request-header Cache-Control來實現的,如:Cache-Control`(`no-cache`)。cache函數是這樣定義的;
函數
def cache[K](cache: Cache[K, RouteResult], keyer: PartialFunction[RequestContext, K]): Directive0
這個函數返回Directive0, 能夠直接對應 { ... complete(...) },因此cache能夠把一個route包嵌在裏面如:
cache(myCache, simpleKeyer) { complete { i += 1 i.toString } }
simpleKeyer是個K對應函數:在咱們這個例子裏K -> Uri, Cache[Uri,RouteResult]。這裏有個現成的構建器:routeCache[Uri]
/** * Creates an [[LfuCache]] with default settings obtained from the system's configuration. */ def routeCache[K](implicit s: ActorSystem): Cache[K, RouteResult] = LfuCache[K, RouteResult](s)
不過這個LfuCache使用了application.conf裏面的cachingSettings. 咱們想直接控制lfuCache構建,因此能夠用:
val lfuCache = LfuCache[Uri,RouteResult](cachingSettings)
alwaysCache的具體使用和上面的cache.getOrLoad相同:
import akka.http.scaladsl.model.{HttpMethods, StatusCodes, Uri} import akka.http.scaladsl.util.FastFuture import akka.http.caching.scaladsl.Cache import akka.http.caching.scaladsl.CachingSettings import akka.http.caching.LfuCache import akka.http.scaladsl.server.RequestContext import akka.http.scaladsl.server.RouteResult import akka.http.scaladsl.server.directives.CachingDirectives._ import scala.concurrent.duration._ import scala.util._ val defaultCachingSettings = CachingSettings(sys) val lfuCacheSettings = //最少使用排除算法緩存
defaultCachingSettings.lfuCacheSettings .withInitialCapacity(128) //起始單位
.withMaxCapacity(1024) //最大單位
.withTimeToLive(1.hour) //最長存留時間
.withTimeToIdle(30.minutes) //最長未使用時間
val cachingSettings = defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings) //Uri->key, RouteResult -> value
val lfuCache = LfuCache[Uri,RouteResult](cachingSettings) //Example keyer for non-authenticated GET requests
val simpleKeyer: PartialFunction[RequestContext, Uri] = { val isGet: RequestContext => Boolean = _.request.method == HttpMethods.GET // val isAuthorized: RequestContext => Boolean = // _.request.headers.exists(_.is(Authorization.lowercaseName))
val result: PartialFunction[RequestContext, Uri] = { case r: RequestContext if isGet(r) => r.request.uri } result } val route = pathPrefix(pathName) { pathPrefix("getuserinfo") { (get & parameter('userid)) { userid => {
alwaysCache(lfuCache,simpleKeyer) { onComplete(posRepo.futureUserInfo(userid)) { _ match { case Success(oui) => oui match { case Some(ui) => complete(toJson(ui)) case None => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } case Failure(_) => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } } } } } } ~
好了,我覺着可能直接調用cache.getOrLoad會更好些,由於akka-http還在不停的變,java8caffein應該不會再調整了吧。