觀察者模式使用頻率很高,用於創建一種對象之間的依賴關係,當一個對象發生改變時自動通知其餘對象,其餘對象將作出相應反應。在觀察者模式中,發生改變的對象叫作觀察目標,也叫被觀察者,而被通知的對象叫作觀察者。java
一個觀察目標能夠對應多個觀察者,並且這些觀察者之間沒有任何相互關聯,能夠根據須要增長和刪除觀察者,使得系統便於擴展。編程
觀察者模式:定義對象之間的一種一對多依賴關係,使得每個對象狀態發生改變時,其相關依賴對象皆獲得通知並自動更新。api
觀察者模式是一種對象行爲型模式。安全
Subejct
(抽象目標):又叫主題,指被觀察的對象,也就是被觀察者,在目標中定義了一個觀察者集合,同時提供一系列方法來增長或者刪除觀察者對象,也定義了通知方法notify
ConcreteSubject
(具體目標):抽象目標的子類,一般包含有常常改變的數據,當狀態發生改變時,向各個觀察者發出通知,同時還實現了目標類中定義的抽象業務邏輯,若是無須擴展抽象目標類則能夠省略具體目標類Observer
(抽象觀察者):對觀察目標做出響應,通常定義爲接口ConcreteObserver
(具體觀察者):具體觀察者中維護一個指向具體目標的引用,存儲具體觀察者的有關狀態,這些狀態須要與具體目標的狀態保持一致,同時實現了抽象觀察者的update
方法notifyObserver
的通知觀察者的抽象方法interface Observer { void update(String state); }
這裏實現爲一個接口,update
方法供抽象目標,也就是供被觀察者調用。微信
class ConcreteObserver implements Observer { public String state; public ConcreteObserver(String state) { this.state = state; } @Override public void update(String state) { System.out.println("觀察者狀態更新爲"+state); } }
實現其中的update
方法,這裏只是簡單將狀態輸出。異步
abstract class Subject { private List<Observer> list = new ArrayList<>(); public void attach(Observer observer) { list.add(observer); } public void detach(Observer observer) { list.remove(observer); } public void notifyObservers(String state) { list.forEach(t->t.update(state)); } public abstract void change(String newState); }
抽象目標類負責管理觀察者集合,使用List
存儲抽象觀察者,包含添加/刪除觀察者方法。notifyObservers
中通知了全部的觀察者,將狀態做爲具體參數進行傳遞。change
做爲被觀察者的狀態改變函數,將新狀態做爲參數傳入。ide
class ConcreteSubject extends Subject { private String state; public String getState() { return state; } @Override public void change(String newState) { state = newState; System.out.println("被觀察者狀態爲:"+newState); notifyObservers(newState); } }
具體目標類負責實現抽象目標的change
方法,保存新狀態後,經過抽象目標的notifyObservers
通知全部觀察者。函數
public static void main(String[] args) { Observer observer1 = new ConcreteObserver("111"); Observer observer2 = new ConcreteObserver("111"); Observer observer3 = new ConcreteObserver("111"); Subject subject = new ConcreteSubject(); subject.attach(observer1); subject.attach(observer2); subject.attach(observer3); subject.change("2222"); }
客戶端針對抽象觀察者以及抽象目標進行編程,定義好各個觀察者後,添加到抽象目標中進行管理,接着更新被觀察者的狀態。性能
輸出以下:測試
一個多人聯機遊戲中,擁有戰隊機制,當基地受到攻擊時,將通知該戰隊全部成員進入警惕狀態,使用觀察者模式進行設計。
設計以下:
Observer
Player
Subject
Base
抽象觀察者:
interface Observer { void update(String state); }
包含一個供抽象目標調用的update()
方法。
接着是具體觀察者:
class Player implements Observer { public String state; public String name; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void update(String state) { System.out.println("戰隊成員"+name+"狀態更新爲"+state); } }
在update
中輸出更新的狀態。
抽象目標以下:
abstract class Subject { private List<Observer> list = new ArrayList<>(); public void attach(Observer observer) { list.add(observer); } public void detach(Observer observer) { list.remove(observer); } public void notifyObservers(String state) { System.out.println("基地通知全部戰隊成員"); list.forEach(t->t.update(state)); } public abstract void change(String newState); }
使用List
存儲全部戰隊成員,在通知方法中通知全部的觀察者,change
定義爲抽象方法供子類實現。
具體目標(被觀察者)以下:
class Base extends Subject { private String state; public String getState() { return state; } @Override public void change(String newState) { state = newState; System.out.println("基地狀態更新爲:"+newState); notifyObservers(newState); } }
實現抽象目標的change
方法,裏面須要調用notifyObservers
方法通知全部觀察者。
測試:
public static void main(String[] args) { Observer player1 = new Player("A","無警惕狀態"); Observer player2 = new Player("B","無警惕狀態"); Observer player3 = new Player("C","無警惕狀態"); Subject subject = new Base(); subject.attach(player1); subject.attach(player2); subject.attach(player3); subject.change("警惕狀態"); }
輸出以下:
在觀察者模式中,能夠分爲推模型以及拉模型。
推模型是被觀察者向觀察者推送觀察目標的詳細信息,無論觀察者是否須要,推送的信息一般是被觀察者對象的所有或部分數據。像上面的例子就是推模型,被觀察者(基地)主動把狀態數據推送給觀察者(戰隊成員)。
拉模型當被觀察者通知觀察者時,只傳遞少許信息,若是觀察者須要更加詳細的信息,由觀察者主動到觀察目標中獲取,至關於時觀察者從主題對象中拉去數據。這種方式通常把被觀察者自身經過update
傳遞給觀察者,獲取數據時時直接經過這個被觀察者引用獲取。
能夠將上面的基地例子修改從推模型修改成拉模型,首先修改觀察者中的update()
參數:
interface Observer { void update(Subject subject); }
接着修改具體觀察者:
class Player implements Observer { public String state; public String name; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void update(Subject subject) { System.out.println("戰隊成員"+name+"狀態更新爲"+subject.getState()); } }
主要的不一樣是原來的推模型直接把狀態做爲參數傳遞,如今傳遞一個抽象目標對象,須要具體觀察者從中主動獲取數據。
而後是抽象目標:
abstract class Subject { private String state; private List<Observer> list = new ArrayList<>(); public void attach(Observer observer) { list.add(observer); } public void detach(Observer observer) { list.remove(observer); } public void notifyObservers() { System.out.println("基地通知全部戰隊成員"); list.forEach(t->t.update(this)); } public String getState() { return state; } public void setState(String state) { this.state = state; } public abstract void change(String newState); }
主要改變是多了一個state
成員,同時去掉notifyObservers()
中的參數。
最後是具體目標:
class Base extends Subject { @Override public void change(String newState) { setState(newState); System.out.println("基地狀態更新爲:"+newState); notifyObservers(); } }
客戶端代碼無須任何修改,測試輸出結果一致:
update
方法Observer
與Observable
觀察者模式在Java中很是重要,JDK的java.util
提供了Observer
以及Observable
接口做爲對觀察者模式的支持。
Observer
java.util.Observer
接口充當抽象觀察者,只聲明瞭一個方法:
void update(Observable o,Object arg);
當觀察目標的狀態發生變化時,該方法會被調用,在Observer
子類實現update
,不一樣的具體觀察者具備不一樣的更新行爲,當調用Observable
的notifyObservers()
時,將執行update
方法。
update
的接口兩個參數中,一個表示被觀察者,一個表示調用notifyObservers
的參數,換句話說,這樣設計能同時支持推模型與拉模型:
notifyObervers()
中傳入arg
參數,也就是update
中的arg
參數notifyObservers
中傳入參數,可是須要在被觀察者中聲明獲取狀態或數據的方法,方便在update
中經過被觀察者引用o
進行強制類型轉換後調用Observable
java.util.Observable
充當抽象目標類,其中定義了一個Vector
存儲觀察者對象,包含的方法(OpenJDK11.0.2)以下:
public class Observable { private boolean changed = false; private Vector<Observer> obs; public Observable() { //構造函數,初始化 obs } public synchronized void addObserver(Observer o) { //註冊觀察者到obs中 } public synchronized void deleteObserver(Observer o) { //刪除obs中的某個觀察者 } public void notifyObservers() { //通知方法,內部調用每個觀察者的update() } public void notifyObservers(Object arg) { //相似上面的通知方法,帶參數調用update() } public synchronized void deleteObservers() { //刪除全部觀察者 } protected synchronized void setChanged() { //設置changed爲true,表示觀察目標的狀態發生變化 } protected synchronized void clearChanged() { //清除changed的狀態,表示觀察目標狀態再也不發生改變 //或者已經通知了全部的觀察者 } public synchronized boolean hasChanged() { //返回changed,表示觀察對象是否發生改變 } public synchronized int countObservers() { //返回觀察者數量 } }
將上面基地的例子用Observable
以及Observer
實現以下:
public class Test { public static void main(String[] args) { Observer player1 = new Player("A","無警惕狀態"); Observer player2 = new Player("B","無警惕狀態"); Observer player3 = new Player("C","無警惕狀態"); Base base = new Base(); base.addObserver(player1); base.addObserver(player2); base.addObserver(player3); base.change("警惕狀態"); } } class Player implements Observer { private String name; private String state; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void update(Observable o,Object arg) { System.out.println("戰隊成員"+name+"更新狀態爲"+arg); } } class Base extends Observable { public void change(String state) { setChanged(); notifyObservers(state); } }
具體觀察者Player
實現Observer
接口,具體目標Base
(被觀察者)繼承Observable
,注意須要在notifyObservers
以前,使用Observable
的setChanged
表示被觀察者狀態改變,這樣使用notifyObservers
才能生效,不然認爲被觀察者沒有發生狀態改變:
查看源碼發現notifyObservers
中先對changed
內部布爾變量進行了判斷,若是具體目標沒有使用setChanged
方法,將致使沒法通知觀察者。
這裏使用了推模型實現,具體目標在notifyObservers
中傳遞狀態參數:
class Player implements Observer { //... @Override public void update(Observable o,Object arg) { System.out.println("戰隊成員"+name+"更新狀態爲"+arg); } } class Base extends Observable { public void change(String state) { setChanged(); notifyObservers(state); } }
使用拉模型修改以下:
class Player implements Observer { //... public void update(Observable o,Object arg) { System.out.println("戰隊成員"+name+"更新狀態爲"+((Base)o).getState()); } } class Base extends Observable { private String state; public String getState() { return state; } public void change(String state) { this.state = state; setChanged(); notifyObservers(); } }
具體觀察者的update
中由原來的從arg
獲取狀態變爲從Observable
中經過getter獲取狀態,同時具體目標增長了state
成員,在notifyObservers
中不需手動傳入狀態參數。
Flow API
雖然使用JDK的Observable
以及Observer
實現觀察者模式很容易,不須要定義抽象目標以及抽象觀察者,可是很遺憾的是從Java9開始標記爲過期了(看着一條條橫線也挺難受的):
查了一下緣由,標記爲過期主要是由於:
Observable
沒有實現序列化接口Flow API
爲了克服原來的缺點,從JDK9開始出現了Flow API
,位於java.util.concurrent
下。
在講Flow API
以前,先看一下響應式編程。
響應式編程能夠理解爲一種處理數據項的異步流,即在數據產生的時候,接收者就對其進行響應。在響應式編程中,會有一個數據發佈者(Publisher
)以及數據訂閱者(Subscriber
),後者用於異步接收發布者發佈的數據。
在該模式中,還引入了一個更高級的特性:數據處理器(Processor
),用於將數據發佈者發佈的數據進行某些轉換操做,而後再發布給數據訂閱者。響應式編程是異步非阻塞編程,可以提高程序性能,能夠解決傳統編程遇到的困難,基於這個模型實現的有Java 9 Flow API
,RxJava
,Reactor
等。
Flow API
Flow
是一個final
類,裏面定義了四個接口:
Publisher<T>
:數據發佈者接口Subscriber<T>
:數據訂閱者接口Subscription
:發佈者和訂閱者之間的訂閱關係Processor<T,R>
:數據處理器public static int defaultBufferSize()
:返回緩衝區長度,默認256。當發佈者發送速率高於接收速率時,數據接收者緩衝區將會被填滿,當緩衝區填滿後,發佈者會中止發送數據,直到訂閱者有空閒位置時,發佈者纔會繼續發佈數據Publisher<T>
Publisher
源碼以下:
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
這是一個函數式接口,只包含一個subscribe
方法,經過該方法將數據發佈出去。
Subscriber<T>
Subscriber
源碼以下:
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
方法解釋以下:
onSubscribe
:訂閱成功的回調方法,用於初始化Subscription
,代表能夠開始接收訂閱數據了onNext
:接收下一項訂閱數據的回調方法onError
:在Publisher
或Subscriber
遇到不可恢復的錯誤時會調用該方法,Subscriber
再也不接收訂閱信息onComplete
:接收完全部訂閱數據,而且發佈者已經關閉後會回調該方法Subscription
Subscription
源碼以下:
public static interface Subscription { public void request(long n); public void cancel(); }
方法解釋以下:
request
:用於向數據發佈者請求n個數據項cancel
:取消消息訂閱,訂閱者再也不接收數據Processor<T,R>
Processor
源碼以下:
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
這是一個空接口,繼承了Subscriber
以及Publisher
,它既能發佈數據也能訂閱數據,基於這個特性它能夠充當數據轉換的角色,先從數據發佈者接收數據,通過處理後發佈給數據訂閱者。
public class Test { public static void main(String[] args) { //JDK9自帶的數據發佈者,實現了Publisher<T> SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //建立訂閱者,用於接收發布者消息 Subscriber<String> subscriber = new Subscriber<>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { //經過Subscription和發佈者保持訂閱關係 //並用它來給發佈者反饋 this.subscription = subscription; //請求一個數據 this.subscription.request(1); } @Override public void onNext(String item) { //接收發布者發佈的信息 System.out.println("訂閱者接收消息:"+item); //接收後再次請求一個數據 this.subscription.request(1); //若是不想接收直接調用cancel // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { //異常回調 System.out.println("訂閱者接收數據異常:"+throwable); throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { //發佈者發送的數據都被接收了 //而且發佈者關閉後就會回調該方法 System.out.println("訂閱者接收數據完畢"); } }; //創建發佈者與訂閱者的關係 publisher.subscribe(subscriber); //發佈數據 for(int i=0;i<10;++i) { String message = "flow api "+i; System.out.println("發佈者發佈消息:"+message); publisher.submit(message); } //發佈結束後關閉發佈者 publisher.close(); //main延遲關閉,不然訂閱者沒接收完消息線程就被關閉 try { Thread.currentThread().join(2000); } catch(Exception e) { e.printStackTrace(); } } }
步驟:
SubmissionPublisher<String>
做爲消息發佈者Subscriber<String>
做爲消息訂閱者publisher.subscribe(subsciber)
創建submit
發佈數據close()
關閉發佈者,同時會回調訂閱者的onComplete
方法輸出以下:
注意例子中最後須要延遲關閉main
線程,若是沒有這個操做,訂閱者就不能徹底接收全部信息:
能夠從輸出看到,訂閱者接收到第8條消息後,線程就被關閉了。
前面說過Flow
中有一個靜態方法返回緩衝區大小,下面進行模擬填滿,在訂閱者中的訂閱方法中,加入延遲:
@Override public void onNext(String item) { //模擬接收數據緩慢填滿緩衝池 try { TimeUnit.MILLISECONDS.sleep(300); } catch(InterruptedException e) { e.printStackTrace(); } System.out.println("訂閱者接收消息:"+item); //接收後再次請求一個數據 this.subscription.request(1); }
由於默認的緩衝區大小爲256,所以,發佈256條信息後,能夠看到再也不發送,直到等到訂閱者處理才繼續發佈:
Processor
Processor
就是Publisher
+Subscriber
,一般是用做接收發布者發佈的信息,進行相應處理後,再將數據發佈,供消息者訂閱接收,下面是一個簡例:
public class Test { public static void main(String[] args) { //JDK9自帶的數據發佈者,實現了Publisher<T> SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //建立訂閱者,用於接收發布者消息 TestProcessor processor = new TestProcessor(); Subscriber<String> subscriber = new Subscriber<>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(String item) { System.out.println("訂閱者接收消息:"+item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("訂閱者接收異常"); throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { System.out.println("訂閱者接收完畢"); } }; publisher.subscribe(processor); processor.subscribe(subscriber); //發佈數據 for(int i=0;i<10;++i) { String message = "flow api "+i; System.out.println("發佈者發佈消息:"+message); publisher.submit(message); } //發佈結束後關閉發佈者 publisher.close(); //main延遲關閉,不然訂閱者沒接收完消息線程就被關閉 try { Thread.currentThread().join(2000); } catch(Exception e) { e.printStackTrace(); } } } class TestProcessor extends SubmissionPublisher<String> implements Processor<String,String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { //經過Subscription和發佈者保持訂閱關係 //並用它來給發佈者反饋 this.subscription = subscription; //請求一個數據 this.subscription.request(1); } @Override public void onNext(String item) { //模擬接收數據緩慢填滿緩衝池 System.out.println("處理器處理消息:"+item); item = "通過處理器處理的消息:"+item; //接收後再次請求一個數據 this.submit(item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { //異常回調 System.out.println("處理器處理數據異常:"+throwable); throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { System.out.println("處理者處理數據完畢"); this.close(); } }
步驟:
SubmissionPublisher<String>
SubmissionPublisher<String>
並實現Processor<String,String>
的類,在其中的onNext
方法中對消息進行處理並調用submit
發佈給訂閱者,在其中的onComplete
調用close()
關閉處理器Subscriber<String>
輸出:
Flow API
實現例子講了這麼多Flow API
的例子,下面來看看如何使用Flow API
實現基地的例子。
public class Test { public static void main(String[] args) { Base base = new Base(); Player player1 = new Player("A", "非戒備狀態"); Player player2 = new Player("B", "非戒備狀態"); Player player3 = new Player("C", "非戒備狀態"); base.add(player1); base.add(player2); base.add(player3); base.changed("戒備狀態"); base.close(); } } class Base { SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); private List<Player> players = new ArrayList<>(); public void add(Player player) { publisher.subscribe(player); players.add(player); } public void remove(Player player) { player.cancel(); players.remove(player); } public void changed(String state) { System.out.println("基地正在遭受攻擊"); publisher.submit(state); } public void close() { publisher.close(); try { Thread.currentThread().join(2000); } catch(Exception e) { e.printStackTrace(); } } } class Player implements Subscriber<String> { private Subscription subscription; private String name; private String state; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(String item) { System.out.println("戰隊成員"+name+"更新狀態:"+item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("戰隊成員接收異常"); throwable.printStackTrace(); this.subscription.cancel(); } public void cancel() { this.subscription.cancel(); } @Override public void onComplete() { System.out.println("戰隊成員接收完畢"); } }
大部分代碼都與上面的例子相同,就不解釋了,貼一下輸出:
若是以爲文章好看,歡迎點贊。
同時歡迎關注微信公衆號:氷泠之路。