響應式編程目前在編程領域都算式流行詞彙了,做爲Scala的創造公司Lightbend 公司(前身是 Typesafe)發起了響應式宣言(Reactive Manifesto)。Akka、Rx系列甚至Spring最新版本的WebFlux都加入到這場流行文化中。react
響應式編程用一句話總結可能也沒法立刻理解,其實它自己也沒有新的東西,主要式關注基於事件驅動的快速異步響應,固然爲達到目的也得考慮如何容錯(響應式四準則:靈敏性、伸縮性、容錯性、事件驅動)。簡單能夠類比下咱們經常使用MVC模式,Model的變化會通知View層的快速改變(事件通知),而無需View不停的去查詢Model層的變化(輪詢)。這就是一種經過事件機制快速做出響應,固然響應式模式不僅簡單的關注事件驅動和快速響應,也關注應用的對不一樣負載的伸縮擴展,還有在異步模式下的高容錯性。 編程
響應式編程中有個核心的問題要處理,就是響應流。Netflix、Pivotal、Typesafe等公司的工程師們在2013年共同發起了關於制定「響應式流規範(Reactive Stream Specification)」的倡議。其中描述了響應流的特色:緩存
Akka Stream就徹底實現了「響應式流規範(Reactive Stream Specification)」。bash
任何東西均可以成爲一個響應流,例如變量、用戶輸入、屬性、緩存、數據結構等等。你能夠建立、合併、過濾響應流,一個響應流能夠做爲另外一個響應流的輸入,甚至多個響應流也能夠做爲另外一個響應流的輸入。舉個例子咱們要從一個點擊的響應流中處理雙擊或着屢次點擊事件: 網絡
背壓(backpressure)是爲了解決響應流源和接收者處理速度不一致而採起的一種由處理者告訴上游進行速度調整的一種方式。在沒有背壓的狀況下,響應流可能出現以下狀況: 數據結構
咱們開始寫一些Akka Stream的相關代碼來了解它。先創建一個sbt工程,在build中加入:異步
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
複製代碼
爲了可以運行全部的Stream,咱們先加入兩個ActorSystem
和ActorMaterializer
的隱式變量:oop
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object MyFirstStream {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
}
}
複製代碼
Akka Stream包含了三大基礎構件Source、Sink、Flow。ui
val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
複製代碼
val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
複製代碼
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // 當buffer滿了後進行背壓
複製代碼
流能夠經過基礎組件構成的圖和網絡來表示,咱們先從最簡單的方式來定義,將一個Source和Sink連起來就能夠造成一個流: spa
val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
複製代碼
這裏面的Keep.right說明咱們只關心Sink最後獲得的值。
咱們能夠在者中間加上Flow,造成一個稍微複製的流:
val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
複製代碼
咱們如今可使用run方法執行前面建立的流,結果會放到Future中。
val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // 打印出6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // 打印出12
複製代碼
咱們可使用更簡單的方式來定義並執行流,不須要中間量:
// 使用指定的sink來執行流
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// 使用Fold全部元素的sink來執行流
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)
複製代碼