我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?(一)

就在前不久作了一個關於RXJava的相關教學視頻,事後整理了關於RxJava的預習資料和相關內容以及圖文和相關源碼,須要借鑑的能夠和我聯繫~java

一丶 面試輔助路線(所有內容在完整的PDF裏都有講解)

我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?(一)
順手留下GitHub連接,須要獲取相關面試等內容的能夠本身去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)
我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?(一)react

二丶 RXJava預習:

JAVA設計模式之觀察者模式
一、初步認識

觀察者模式的定義:
在對象之間定義了一對多的依賴,這樣一來,當一個對象改變狀態,依賴它的對象會收到通知並自動更新。git

大白話:
其實就是發佈訂閱模式,發佈者發佈信息,訂閱者獲取信息,訂閱了就能收到信息,沒訂閱就收不到信息。github

2丶這個模式的結構圖
我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?(一)面試

三、能夠看到,該模式包含四個角色

  • 抽象被觀察者角色: 也就是一個抽象主題,它把全部對觀察者對象的引用保存在一個集合中,每一個主題均可以有任意數量的觀察者。抽象主題提供一個接口,能夠增長和刪除觀察者角色。通常用一個抽象類和接口來實現。
  • 抽象觀察者角色: 爲全部的具體觀察者定義一個接口,在獲得主題通知時更新本身。
  • 具體被觀察者角色: 也就是一個具體的主題,在集體主題的內部狀態改變時,全部登記過的觀察者發出通知。
  • 具體觀察者角色: 實現抽象觀察者角色所須要的更新接口,一邊使自己的狀態與製圖的狀態相協調。

四、使用場景例子

有一個微信公衆號服務,不定時發佈一些消息,關注公衆號就能夠收到推送消息,取消關注就收不到推送消息。編程

三丶Rxjava介紹

ReactiveX的歷史

ReactiveX是Reactive Extensions的縮寫,通常簡寫爲Rx,最初是LINQ的一個擴展,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流,Rx庫支持.NET、JavaScript和C++,Rx近幾年愈來愈流行了,如今已經支持幾乎所有的流行編程語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社區網站是 reactivex.io。設計模式

什麼是ReactiveX

微軟給的定義是,Rx是一個函數庫,讓開發者能夠利用可觀察序列和LINQ風格查詢操做符來編寫異步和基於事件的程序,使用Rx,開發者能夠用Observables表示異步數據流,用LINQ操做符查詢異步數據流, 用Schedulers參數化異步數據流的併發處理安全

Rx能夠這樣定義: Rx=Observables + LINQ + Schedulers。ReactiveX.io給的定義是,Rx是一個使用可觀察數據流進行異步編程的編程接口,ReactiveX結合了觀察者模式、迭代器模式和函數式編程的精華。微信

ReactiveX的應用

不少公司都在使用ReactiveX,例如Microsoft、Netflix、Github、Trello、SoundCloud。網絡

ReactiveX宣言

ReactiveX不只僅是一個編程接口,它是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言

Rx模式

使用觀察者模式
  • 建立: Rx能夠方便的建立事件流和數據流
  • 組合: Rx使用查詢式的操做符組合和變換數據流
  • 監聽: Rx能夠訂閱任何可觀察的數據流並執行操做
簡化代碼
  • 函數式風格: 對可觀察數據流使用無反作用的輸入輸出函數,避免了程序裏錯綜複雜的狀態
  • 簡化代碼: Rx的操做符統統常能夠將複雜的難題簡化爲不多的幾行代碼
  • 異步錯誤處理: 傳統的try/catch沒辦法處理異步計算,Rx提供了合適的錯誤處理機制
  • 輕鬆使用併發: Rx的ObservablesSchedulers讓開發者能夠擺脫底層的線程同步和各類併發問題
使用Observable的優點

Rx擴展了觀察者模式用於支持數據和事件序列,添加了一些操做符,它讓你能夠聲明式的組合這些序列,而無需關注底層的實現:如線程、同步、線程安全、併發數據結構和非阻塞IO。

Observable經過使用最佳的方式訪問異步數據序列填補了這個間隙
我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?(一)
Rx的Observable模型讓你能夠像使用集合數據同樣操做異步事件流,對異步事件流使用各類簡單、可組合的操做。

Observable可組合

對於單層的異步操做來講,Java中Future對象的處理方式是很是簡單有效的,可是一旦涉及到嵌套,它們就開始變得異常繁瑣和複雜。使用Future很難很好的組合帶條件的異步執行流程(考慮到運行時各類潛在的問題,甚至能夠說是不可能的),固然,要想實現仍是能夠作到的,可是很是困難,或許你能夠用 Future.get() ,但這樣作,異步執行的優點就徹底沒有了。從另外一方面說,Rx的Observable一開始就是爲組合異步數據流準備的。

Observable更靈活

Rx的Observable不只支持處理單獨的標量值(就像Future能夠作的),也支持數據序列,甚至是無窮的數據流。Observable 是一個抽象概念,適用於任何場景。Observable擁有它的近親Iterable的所有優雅與靈活。Observable是異步的雙向push,Iterable是同步的單向pull,對比:
我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?(一)

Observable無偏見

Rx對於對於併發性或異步性沒有任何特殊的偏好,Observable能夠用任何方式實現,線程池、事件循環、非阻塞IO、Actor模式,任何知足你的需求的,你擅長或偏好的方式均可以。不管你選擇怎樣實現它,不管底層實現是阻塞的仍是非阻塞的,客戶端代碼將全部與Observable的交互都當作是異步的。

Observable是如何實現的?
public observable<data>getData();
  • 它能與調用者在同一線程同步執行嗎?
  • 它能異步地在單獨的線程執行嗎?
  • 它會將工做分發到多個線程,返回數據的順序是任意的嗎?
  • 它使用Actor模式而不是線程池嗎?
  • 它使用NIO和事件循環執行異步網絡訪問嗎?
  • 它使用事件循環將工做線程從回調線程分離出來嗎?

從Observer的視角看,這些都無所謂,重要的是:使用Rx,你能夠改變你的觀念,你能夠在徹底不影響Observable程序庫使用者的狀況下,完全的改變Observable的底層實現。

使用回調存在不少問題

回調在不阻塞任何事情的狀況下,解決了 Future.get() 過早阻塞的問題。因爲響應結果一旦就緒Callback就會被調用,它們天生就是高效率的。不過,就像使用Future同樣,對於單層的異步執行來講,回調很容易使用,對於嵌套的異步組合,它們顯得很是笨拙。

Rx是一個多語言的實現

Rx在大量的編程語言中都有實現,並尊重實現語言的風格,並且更多的實現正在飛速增長。

響應式編程

Rx提供了一系列的操做符,你可使用它們來過濾(filter)、選擇(select)、變換(transform)、結合(combine)和組合(compose)多個Observable,這些操做符讓執行和複合變得很是高效。你能夠把Observable當作Iterable的推送方式的等價物,使用Iterable,消費者從生產者那拉取數據,線程阻塞直至數據準備好。使用Observable,在數據準備好時,生產者將數據推送給消費者。數據能夠同步或異步的到達,這種
方式更靈活。

下面的例子展現了類似的高階函數在IterableObservable上的應用

//Iterable
  getDataFromLoca1Memory()
     .skip(10)
     .take(5)
     .map({ s -> return s + " transformed" })
     .forEach({ print1n "netx =>" + it })

  // observable
  getDataFromNetwork()
     .skip(10)
     .take(5)
     .map({ s -> return s + " transformed" })
     .subscribe({ print1n "onNetx =>" + it })

Observable類型給GOF的觀察者模式添加了兩種缺乏的語義,這樣就和Iterable類型中可用的操做一致了:

  1. 生產者能夠發信號給消費者,通知它沒有更多數據可用了(對於Iterable,一個for循環正常完成表示沒有數據了;對於Observable,就是調用觀察者的 onCompleted 方法)
  2. 生產者能夠發信號給消費者,通知它遇到了一個錯誤(對於Iterable,迭代過程當中發生錯誤會拋出異常;對於Observable,就是調用觀察者(Observer)的 onError 方法

有了這兩種功能,Rx就能使Observable與Iterable保持一致了,惟一的不一樣是數據流的方向。任何對Iterable的操做,你均可以對Observable使用。

四丶名詞定義

這裏給出一些名詞的翻譯
  • Reactive 直譯爲反應性的,有活性的,根據上下文通常翻譯爲反應式、響應式
  • Iterable 可迭代對象,支持以迭代器的形式遍歷,許多語言中都存在這個概念
  • Observable 可觀察對象,在Rx中定義爲更強大的Iterable,在觀察者模式中是被觀察的對象,一旦數據產生或發生變化,會經過某種方式通知觀察者或訂閱者
  • Observer 觀察者對象,監聽Observable發射的數據並作出響應,Subscriber是它的一個特殊實現
  • emit 直譯爲發射,發佈,發出,含義是Observable在數據產生或變化時發送通知給Observer,調用Observer對應的方法,文章裏一概譯爲發射
  • items 直譯爲項目,條目,在Rx裏是指Observable發射的數據項,文章裏一概譯爲數據,數據項
Observable
概述

在ReactiveX中,一個觀察者(Observer)訂閱一個可觀察對象(Observable)。觀察者對Observable發射的數據或數據序列做出響應。這種模式能夠極大地簡化併發操做,由於它建立了一個處於待命狀態的觀察者哨兵,在將來某個時刻響應Observable的通知,不須要阻塞等待Observable發射數據。

這篇文章會解釋什麼是響應式編程模式(reactive pattern),以及什麼是可觀察對象(Observables)和觀察者(observers),其它幾篇文章會展現如何用操做符組合和改變Observable的行爲。

相關參考:

Single - 一個特殊的Observable,只發射單個數據。

背景知識

在不少軟件編程任務中,或多或少你都會指望你寫的代碼能按照編寫的順序,一次一個的順序執行和完成。可是在ReactiveX中,不少指令多是並行執行的,以後他們的執行結果纔會被觀察者捕獲,順序是不肯定的。爲達到這個目的,你定義一種獲取和變換數據的機制,而不是調用一個方法。在這種機制下,存在一個可觀察對象(Observable),觀察者(Observer)訂閱(Subscribe)它,當數據就緒時,以前定義的機制就會分發數據給一直處於等待狀態的觀察者哨兵。

這種方法的優勢是,若是你有大量的任務要處理,它們互相之間沒有依賴關係。你能夠同時開始執行它們,不用等待一個完成再開始下一個(用這種方式,你的整個任務隊列能耗費的最長時間,不會超過任務裏最耗時的那個)。

有不少術語可用於描述這種異步編程和設計模式,在在本文裏咱們使用這些術語:一個觀察者訂閱一個可觀察對象(An observer subscribes to an Observable)。經過調用觀察者的方法,Observable發射數據或通知給它的觀察者。

在其它的文檔和場景裏,有時咱們也將Observer叫作Subscriber、Watcher、Reactor。這個模型一般被稱做Reactor模式。

建立觀察者

本文使用相似於Groovy的僞代碼舉例,可是ReactiveX有多種語言的實現。普通的方法調用(不是某種異步方法,也不是Rx中的並行調用),流程一般是這樣的:

  1. 調用某一個方法
  2. 用一個變量保存方法返回的結果
  3. 使用這個變量和它的新值作些有用的事

用代碼描述就是:

// make the call, assign its return value to `returnVal`
  returnVal = someMethod(itsParameters);
  // do something useful with returnVal

在異步模型中流程更像這樣的:

  1. 定義一個方法,這個方法拿着某個異步調用的返回值作一些有用的事情。這個方法是觀察者的一部分。
  2. 將這個異步調用自己定義爲一個Observable
  3. 觀察者經過訂閱(Subscribe)操做關聯到那個Observable
  4. 繼續你的業務邏輯,等方法返回時,Observable會發射結果,觀察者的方法會開始處理結果或結果集

用代碼描述就是:

// defines, but does not invoke, the Subscriber's onNext handler
  // (in this example, the observer is very simple and has only an onNext handler)
  def myOnNext = { it -> do something useful with it };
  // defines, but does not invoke, the Observable
  def myObservable = someObservable(itsParameters);
  // subscribes the Subscriber to the Observable, and invokes the Observable
  myObservable.subscribe(myOnNext);
  // go on about my business
回調方法 (onNext, onCompleted, onError)

Subscribe方法用於將觀察者鏈接到Observable,你的觀察者須要實現如下方法的一個子集:

  • onNext(T item)
    Observable調用這個方法發射數據,方法的參數就是Observable發射的數據,這個方法可能會被調用屢次,取決於你的實現。
  • onError(Exception ex)
    當Observable遇到錯誤或者沒法返回指望的數據時會調用這個方法,這個調用會終止Observable,後續不會再調用onNext和onCompleted,onError方法的參數是拋出的異常。
  • onComplete
    正常終止,若是沒有遇到錯誤,Observable在最後一次調用onNext以後調用此方法。

根據Observable協議的定義,onNext可能會被調用零次或者不少次,最後會有一次onCompleted或onError調用(不會同時),傳遞數據給onNext一般被稱做發射,onCompleted和onError被稱做通知。

下面是一個更完整的例子:

def myOnNext   = { item -> /* do something useful with item */ };
  def myError    = { throwable -> /* react sensibly to a failed call */ };
  def myComplete  = { /* clean up after the final response */ };
  def myObservable = someMethod(itsParameters);
  myObservable.subscribe(myOnNext, myError, myComplete);
  // go on about my business
取消訂閱 (Unsubscribing)

在一些ReactiveX實現中,有一個特殊的觀察者接口Subscriber,它有一個unsubscribe方法。調用這個方法表示你不關心當前訂閱的Observable了,所以Observable能夠選擇中止發射新的數據項(若是沒有其它觀察者訂閱)。

取消訂閱的結果會傳遞給這個Observable的操做符鏈,並且會致使這個鏈條上的每一個環節都中止發射數據項。這些並不保證會當即發生,然而,對一個Observable來講,即便沒有觀察者了,它也能夠在一個while循環中繼續生成並嘗試發射數據項。

關於命名約定

ReactiveX的每種特定語言的實現都有本身的命名偏好,雖然不一樣的實現之間有不少共同點,但並不存在一個統一的命名標準。

並且,在某些場景中,一些名字有不一樣的隱含意義,或者在某些語言看來比較怪異。

例如,有一個onEvent命名模式(onNext, onCompleted, onError),在一些場景中,這些名字可能意味着事件處理器已經註冊。然而在ReactiveX裏,他們是事件處理器的名字。

Observables的"熱"和"冷"

Observable何時開始發射數據序列?這取決於Observable的實現,一個"熱"的Observable可能一建立完就開始發射數據,所以全部後續訂閱它的觀察者可能從序列中間的某個位置開始接受數據(有一些數據錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它纔開始發射數據,所以這個觀察者能夠確保會收到整個數據序列。

在一些ReactiveX實現裏,還存在一種被稱做Connectable的Observable,無論有沒有觀察者訂閱它,這種Observable都不會開始發射數據,除非Connect方法被調用。

用操做符組合Observable

對於ReactiveX來講,Observable和Observer僅僅是個開始,它們自己不過是標準觀察者模式的一些輕量級擴展,目的是爲了更好的處理事件序列。

ReactiveX真正強大的地方在於它的操做符,操做符讓你能夠變換、組合、操縱和處理Observable發射的數據。

Rx的操做符讓你能夠用聲明式的風格組合異步操做序列,它擁有回調的全部效率優點,同時又避免了典型的異步系統中嵌套回調的缺點。

下面是經常使用的操做符列表:

  1. 建立操做 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 變換操做 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 過濾操做 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take,TakeLast
  4. 組合操做 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 錯誤處理 CatchRetry
  6. 輔助操做 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn,TimeInterval, Timeout, Timestamp, Using
  7. 條件和布爾操做All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil,TakeWhile
  8. 算術和集合操做 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 轉換操做 To
  10. 鏈接操做 Connect, Publish, RefCount, Replay
  11. 反壓操做,用於增長特殊的流程控制策略的操做符

這些操做符並不全都是ReactiveX的核心組成部分,有一些是語言特定的實現或可選的模塊。

順手留下GitHub連接,須要獲取相關面試等內容的能夠本身去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)

相關文章
相關標籤/搜索