初識Akka Stream

背景

響應式編程

響應式編程目前在編程領域都算式流行詞彙了,做爲Scala的創造公司Lightbend 公司(前身是 Typesafe)發起了響應式宣言(Reactive Manifesto)。Akka、Rx系列甚至Spring最新版本的WebFlux都加入到這場流行文化中。react

響應式編程用一句話總結可能也沒法立刻理解,其實它自己也沒有新的東西,主要式關注基於事件驅動的快速異步響應,固然爲達到目的也得考慮如何容錯(響應式四準則:靈敏性、伸縮性、容錯性、事件驅動)。簡單能夠類比下咱們經常使用MVC模式,Model的變化會通知View層的快速改變(事件通知),而無需View不停的去查詢Model層的變化(輪詢)。這就是一種經過事件機制快速做出響應,固然響應式模式不僅簡單的關注事件驅動和快速響應,也關注應用的對不一樣負載的伸縮擴展,還有在異步模式下的高容錯性。 編程

響應式編程準則

響應流

響應式編程中有個核心的問題要處理,就是響應流。Netflix、Pivotal、Typesafe等公司的工程師們在2013年共同發起了關於制定「響應式流規範(Reactive Stream Specification)」的倡議。其中描述了響應流的特色:緩存

  • 具備處理無限數量的元素的能力
  • 按序處理
  • 異步地傳遞元素
  • 必須實現非阻塞的背壓

Akka Stream就徹底實現了「響應式流規範(Reactive Stream Specification)」。bash

任何東西均可以成爲一個響應流,例如變量、用戶輸入、屬性、緩存、數據結構等等。你能夠建立、合併、過濾響應流,一個響應流能夠做爲另外一個響應流的輸入,甚至多個響應流也能夠做爲另外一個響應流的輸入。舉個例子咱們要從一個點擊的響應流中處理雙擊或着屢次點擊事件: 網絡

點擊響應流

背壓

背壓(backpressure)是爲了解決響應流源和接收者處理速度不一致而採起的一種由處理者告訴上游進行速度調整的一種方式。在沒有背壓的狀況下,響應流可能出現以下狀況: 數據結構

沒有背壓
那麼使用背壓有兩種,一種是源發送數據過快,另外一種是發送過忙。接收一方都會往上彙報以採起合適的發送速度。固然具體的處理有多種策略,這裏不細講,以下有個簡單的例子:
背壓
背壓
實際上背壓很早就出現了,之前背壓一般經過阻塞生產者來實現,而等待消費者以本身的速度處理消息。這種依賴於系統間消息同步處理的方法是很是低效的,而且否認了異步處理的優點(更大的可擴展性和更好的資源利用率),所以須要一種用於實現背壓的非阻塞解決方案。在響應流的背景下,背壓是異步處理模型的一個組成部分,並經過異步消息傳遞來實現。

開始

咱們開始寫一些Akka Stream的相關代碼來了解它。先創建一個sbt工程,在build中加入:異步

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
複製代碼

爲了可以運行全部的Stream,咱們先加入兩個ActorSystemActorMaterializer的隱式變量:oop

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object MyFirstStream {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    
  }
}
複製代碼

構建Stream基礎構件

Akka Stream包含了三大基礎構件Source、Sink、Flow。ui

Source

Source
Source即響應流的源頭,源頭具備一個數據出口,如上圖比較形象的描述了Source。咱們能夠經過各類數據來建立一個Source:

val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
複製代碼

Sink

Sink
Sink就是流的最終目的地,包含一個數據入口,咱們能夠以下來建立Sink:

val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
複製代碼

Flow

Flow
Flow就是流的中間組件,包含一個數據入口和數據出口。咱們能夠這樣來建立Flow:

val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // 當buffer滿了後進行背壓
複製代碼

定義流

流能夠經過基礎組件構成的圖和網絡來表示,咱們先從最簡單的方式來定義,將一個Source和Sink連起來就能夠造成一個流: spa

Source Sink

val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
複製代碼

這裏面的Keep.right說明咱們只關心Sink最後獲得的值。

咱們能夠在者中間加上Flow,造成一個稍微複製的流:

Source Flow Sink

val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
複製代碼

執行流

咱們如今可使用run方法執行前面建立的流,結果會放到Future中。

val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // 打印出6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // 打印出12
複製代碼

咱們可使用更簡單的方式來定義並執行流,不須要中間量:

// 使用指定的sink來執行流
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// 使用Fold全部元素的sink來執行流
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)
複製代碼
相關文章
相關標籤/搜索