由於我瞭解Akka-http的主要目的不是爲了有關Web-Server的編程,而是想實現一套系統集成的api,因此也須要考慮由服務端主動向客戶端發送指令的應用場景。好比一個零售店管理平臺的服務端在完成了某些數據更新後須要通知各零售門市客戶端下載最新數據。雖然Akka-http也提供對websocket協議的支持,但websocket的網絡鏈接是雙向恆久的,適合頻繁的問答交互式服務端與客戶端的交流,消息結構也比較零碎。而咱們面臨的多是批次型的大量數據庫數據交換,只須要簡單的服務端單向消息就好了,因此websocket不太合適,而Akka-http的SSE應該比較適合咱們的要求。SSE模式的基本原理是服務端統一集中發佈消息,各客戶端持久訂閱服務端發佈的消息並從消息的內容中篩選出屬於本身應該執行的指令,而後進行相應的處理。客戶端接收SSE是在一個獨立的線程裏不斷進行的,不會影響客戶端當前的運算流程。當收到有用的消息後就會調用一個業務功能函數做爲後臺異步運算任務。web
服務端的SSE發佈是以Source[ServerSentEvent,NotUsed]來實現的。ServerSentEvent類型定義以下:數據庫
/** * Representation of a server-sent event. According to the specification, an empty data field designates an event * which is to be ignored which is useful for heartbeats. * * @param data data, may span multiple lines * @param eventType optional type, must not contain \n or \r * @param id optional id, must not contain \n or \r * @param retry optional reconnection delay in milliseconds */ final case class ServerSentEvent( data: String, eventType: Option[String] = None, id: Option[String] = None, retry: Option[Int] = None) {...}
這個類型的參數表明事件消息的數據結構。用戶能夠根據實際須要充分利用這個數據結構來傳遞消息。服務端是經過complete以SeverSentEvent類爲元素的Source來進行SSE的,以下:編程
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map( _ => processToServerSentEvent) .keepAlive(1.second, () => ServerSentEvent.heartbeat) }
以上代碼表明服務端定時運算processToServerSentEvent返回ServerSentEvent類型結果後發佈給全部訂閱的客戶端。咱們用一個函數processToServerSentEvent模擬重複運算的業務功能:api
private def processToServerSentEvent: ServerSentEvent = { Thread.sleep(3000) //processing delay
ServerSentEvent(SyncFiles.fileToSync) }
這個函數模擬發佈事件數據是某種業務運算結果,在這裏表明客戶端須要下載文件名稱。咱們用客戶端request來模擬設定這個文件名稱:websocket
object SyncFiles { var fileToSync: String = "" } private def route = { import Directives._ import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ def syncRequests = pathPrefix("sync") { pathSingleSlash { post { parameter("file") { filename => complete { SyncFiles.fileToSync = filename s"set download file to : $filename" } } } } }
客戶端訂閱SSE的方式以下:網絡
import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ import system.dispatcher Http() .singleRequest(Get("http://localhost:8011/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(se => downloadFiles(se.data)))
每當客戶端收到SSE後即運行downloadFiles(filename)函數。downloadFiles函數定義:數據結構
def downloadFiles(file: String) = { Thread.sleep(3000) //process delay
if (file != "") println(s"Try to download $file") }
下面是客戶端程序的測試運算步驟:異步
scala.io.StdIn.readLine() println("do some thing ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() println("do some other things ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items") ).onSuccess { case msg => println(msg) }
運算結果:socket
do some thing ... HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1)) Try to download Orders Try to download Orders do some other things ... HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1)) Try to download Orders Try to download Orders Try to download Items Try to download Items Try to download Items Process finished with exit code 0
下面是本次討論的示範源代碼:函數
服務端:
import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.duration.DurationInt import akka.http.scaladsl.model.sse.ServerSentEvent object SSEServer { def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() Http().bindAndHandle(route, "localhost", 8011) scala.io.StdIn.readLine() system.terminate() } object SyncFiles { var fileToSync: String = "" } private def route = { import Directives._ import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ def syncRequests = pathPrefix("sync") { pathSingleSlash { post { parameter("file") { filename => complete { SyncFiles.fileToSync = filename s"set download file to : $filename" } } } } } def events = path("events") { get { complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map( _ => processToServerSentEvent) .keepAlive(1.second, () => ServerSentEvent.heartbeat) } } } syncRequests ~ events } private def processToServerSentEvent: ServerSentEvent = { Thread.sleep(3000) //processing delay
ServerSentEvent(SyncFiles.fileToSync) } }
客戶端:
import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.client.RequestBuilding.Get import akka.http.scaladsl.model.HttpMethods import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.model._ object SSEClient { def downloadFiles(file: String) = { Thread.sleep(3000) //process delay
if (file != "") println(s"Try to download $file") } def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ import system.dispatcher Http() .singleRequest(Get("http://localhost:8011/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(se => downloadFiles(se.data))) scala.io.StdIn.readLine() println("do some thing ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() println("do some other things ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() system.terminate() } }
個人博客即將同步至騰訊雲+社區。邀你們一同入駐http://cloud.tencent.com/developer/support-plan