響應式編程入門(RxJava)

做者:不洗碗工做室 - Markluxgit

出處:Marklux's Pub程序員

版權歸做者全部,轉載請註明出處github

響應式編程入門(RxJava)

背景

隨着時間的發展,編程領域不斷地推出新的技術來嘗試解決已有的問題,**響應式編程(流式編程)**正是近幾年來很是流行的一個解決方案,若是咱們關注一下業界動態,就會發現絕大多數的語言和框架都紛紛開始支持這一編程模型:編程

  • Java 8 => 引入Stream流,Observable 和 Observer 模式
  • Spring 5 => 引入WebFlux,底層全面採用了響應式模型
  • RxJava => 去年長期霸佔github最受歡迎的項目第一名

能夠預見,響應式編程將來必將大規模的應用於開發領域,阿里內部也已經開始了相關的改造,目前淘寶應用架構已經走上了流式架構升級之路,做爲開發,響應式的編程範式仍是頗有必要掌握的,下文將基於RxJava 2.0給出相關概念的簡單介紹和基本編程思路的講解(基於《Learning Rx Java》一書前三章總結)設計模式

基本思路

關於響應式編程(Reactive Programming, 下文簡稱RP)的定義,衆說紛紜。維基百科將其定義爲一種編程範式,ReactiveX將其定義爲一種設計模式的強化(觀察者模式),也有大牛認爲RP只不過是已有各類的輪子的一個組裝...有關RP的本質,咱們能夠在文章的最後進行簡單的討論,但從學習的角度而言,我認爲最好的方式是將RP看作是一種面向事件和流的編程思想,以下:緩存

Java推崇OOP的編程思想,所以對於Java程序員而言,程序就是各類對象的組合,編程就是控制對象狀態和行爲的一個過程。bash

RP推崇面向流的編程的思想,所以對於開發人員而言,不管是事件仍是數據,所有都將以流的方式呈現,這些流時而並行,時而交叉,編程就是觀察和調控這些流的一個過程。多線程

從現實世界出發

在現實世界中有一個顯而易見的物理現象,那就是一切都在運動(變化),不管是交通、天氣、人,即使是一塊岩石,也會隨着地球的自轉而運動。而不一樣物體的運動之間可能互不干擾,好比運動在不一樣道路上的車輛和行人,也可能出現交叉,好比在同一個十字路口相遇的行人和車輛。架構

迴歸到咱們的編程當中,咱們每每將程序抽象成多個過程,和現實世界同樣,這些過程也在變化和運動,它們之間有時能夠並行,有時會產生依賴(前置、後置條件),傳統編程模型中,咱們每每會採用多線程或者異步的方式來處理這些過程,但這樣的方式並不天然併發

所以RP從現實世界進行抽象和採樣,將程序分爲了如下三種組成:

  1. Observable:可被觀察的事物,也就是事件和數據流
  2. Observer:觀察流的事物
  3. Operator:操做符,對流進行鏈接和過濾等等操做的事物

讓咱們用一個最簡單的例子來引入這三個概念:

Observable<String> myStrings =
	Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
    myStrings.map(s -> s.length())
        .subscribe(s -> System.out.println(s));
複製代碼

在上面的例子中,myStrings就是Observable,map(s -> s.length())就是Operator,subscribe(s -> System.out.println(s))就是Observer。

這個例子將會把幾個字符串先取長度,而後再逐個輸出。

Observable

Observable簡單來講,就是流,在RxJava中只有三個最核心的API:

  • onNext():傳遞一個對象
  • onComplete():傳遞完成的「信號」
  • onError():傳遞一個錯誤

基本上全部的流都是這三種方法的一個包裝。

建立

  1. 使用Observable.create()

    Observable<String> myStrings = Observable.create(emitter -> {
        emitter.onNext("apple");
        emitter.onNext("bear");
        emitter.onNext("change");
        emitter.onComplete();
    });
    複製代碼
  2. 使用Observable.just()

    Observable<String> myStrings =
    	Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
    複製代碼

    注意這種方法建立的元素數量必須是有限的

  3. 從其餘數據源建立,例如Observable.fromIterable()Observable.range()

Hot & Cold Observables

Cold的流生產的數據是靜態的,比如一張CD,不管何時,不管什麼人來聽,均可以聽到完整的內容。

Observable<String> source =
      Observable.just("Alpha","Beta","Gamma","Delta","Epsilon");
//first observer
source.subscribe(s -> System.out.println("Observer 1 Received: " + s));
//second observer
source.subscribe(s -> System.out.println("Observer 2 Received: " + s));
複製代碼

上面兩個subscribe註冊的observer將會獲得徹底相同的一串流

Hot的流產生的數據是動態的,比如收音機電臺,錯過了播放的時段,過去的數據就取不到了。直接建立Hot流須要用到Listener,官方給出了一個JavaFx的例子,但並不合適放在這裏。

事實上,一般經過ConnectableObservable的方式來將一個Cold的流轉換成一個Hot的流:

ConnectableObservable<String> source =
   Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
   .publish();
   //Set up observer 1
   source.subscribe(s -> System.out.println("Observer 1: " + s));
  //Set up observer 2
  source.map(String::length)
    .subscribe(i -> System.out.println("Observer 2: " + i));
//Fire!
  source.connect();
複製代碼

經過publish()方法能夠建立一個ConnectableObservable,而後經過connect()方法啓動流的傳輸,這時source將會把全部的數據都傳遞給兩個observer。此後若是再給source註冊新的Observer,將不會獲得任何數據,由於Hot流不容許數據被重複消費。

Observers

觀察者自己是比較簡單的結構,主要功能由調用方本身去實現。

能夠經過實現Observer接口的方式去建立一個觀察者,固然更常見的case是經過lambda表達式來建立一個觀察者,就像以前用到的例子同樣,這裏不詳細展開了。

註冊的方式是經過調用Observerbale的subscribe方法。

Operators

只建立流和觀察者並不能有什麼太大的做用,大多時候咱們須要經過操做符(Operator)來對流進行各類各樣的操做才能使RP變得有實際意義。

RxJava中提供了很是豐富的操做符,大體分爲如下幾類:

  1. 建立操做符,用於建立流,剛纔咱們已經用到了其中的幾個,好比Create, Just, Range和Interval
  2. 變換操做符,用於將一個流變換成另外一種形式,好比Buffer(將流中的元素打包轉換成集合)、Map(對流中的每一個元素執行一個函數),Window(將流中的元素拆分紅不一樣的窗口再發射)
  3. 過濾操做符,過濾掉流中的部分數據以獲取指定的數據元素,好比Filter、First、Distinct
  4. 組合操做符,將多個流融合成一個流,好比And、Then、Merge、Zip
  5. 條件/算術/聚合/轉換操做符 ... 起到各類運算輔助做用
  6. 自定義操做符,由用戶本身建立

若是把每一個操做符都展開講一遍,差很少就能出一本書了,可見操做符提供的功能之豐富。下文只展開一些和背壓有關的操做符。

背壓

所謂背壓,是指異步環境中,生產者的速度大於消費者速度時,消費者反過來控制生產者速度的策略(也稱爲回壓),這是一種流控的方式,能夠更有效的利用資源、同時防止錯誤的雪崩。

爲了實現背壓策略,可使用如下幾種操做符

  1. Throttling 節流類

    經過操做符來調節Observable發射消息的速度,好比使用sample()來按期採樣,併發出最後一個數據,或者使用throttleWithTimeout()來丟棄超時的數據。但這樣作會丟棄一部分數據,最終消費者拿不到完整的消息。

  2. Buffer & Window 緩衝和窗口類

    使用緩衝buffer()和窗口window()暫存那些發送速度過快的消息,等到消息發送速度降低的時候再釋放它們。這主要應用於Observable發送速率不均勻的場景。

除了使用操做符以外,還有一種方式能夠實現背壓,那就是響應式拉取(Reactive pull)。

經過在Observer中調用request(n)方法,能夠實現由消費者來反控生產者的生產,也就是說只有當消費者請求的時候,生產者纔去產生數據,當消費者消費完了數據再去請求新的數據,這樣就不會出現生產速度過快的狀況了,以下圖

可是這樣作須要上游的被觀察者可以對request請求作出響應,這時候又能夠用到幾個操做符來控制Observable的行爲:

  1. onBackPressureBuffer

    爲Observable發出來的數據製做緩存,產生的數據先放在緩存中,每當有request請求過來時,就從緩存裏取出對應數量的事件返回。

  2. onBackPressureDrop

    命令Observable丟棄後來的時間,直到Subscriber再次調用request(n)方法的時候,就發送給該subscriber調用時間之後的n個時間。

背壓策略是一個值得深刻研究和探討的領域,基於消費者消息的回壓讓動態限流、斷路成爲可能,也由於有了背壓的感知,應用有機會作到動態的縮擴容

思考:爲何須要RP

RP的基本概念介紹的差很少了,如今須要思考一下爲何須要RP,在服務端怎麼應用RP,以及它所可以帶來的優點。

首先,有關RP的本質,我以爲它就是一個異步編程的輪子,用觀察者模式的API把異步編程的過程變得更加清晰和簡單,這就比如go使用CSP來簡化併發操做同樣,底層其實仍是對已有技術的封裝。

那麼問題在於,爲何要封裝異步編程,使用異步編程能帶來什麼好處,爲了解決這個問題咱們又須要迴歸原點。

若是要問服務端最大的性能瓶頸是什麼,那答案必定是IO,由於處理一個請求的過程當中最耗時的部分就是等待IO,而等待就會形成阻塞,因此若是要提高性能,就不能寫出阻塞的代碼來。

如何才能讓代碼不阻塞?以Java服務端來講,傳統的處理方式無外乎如下兩種:

  1. 使用Thread,把業務代碼和IO代碼放到不一樣的線程裏跑。但你須要面對併發問題(資源競爭),同時根據以前對Java線程調度的分析咱們知道這樣對CPU的資源利用率並不高效(上下文切換消耗比較大)。
  2. 使用異步回調,你能夠用Callback或者Future來實現,但須要本身去實現調度邏輯,同時Callback這樣的模式寫出來的代碼是很差理解的,有可能出現Callbcak Hell。

因此最終爲了解決性能瓶頸,RP給出的辦法就是:

提供一個優秀的異步處理框架,同時簡化編寫異步代碼的流程,最終實現減小阻塞,提高性能的大目標。

相關文章
相關標籤/搜索