重學設計模式——線程安全的觀察者模式

線程安全的觀察者模式

先來講下觀察者模式,其實在Android開發中,咱們使用觀察者模式的時候仍是很是多的,不管是廣播使用的發佈-訂閱模式,仍是listview的notifyDataSetChanged,或者是RxJava的使用,都是觀察者模式的運用。今天就來從新看一下觀察者模式。html

概念

訂閱模式,又稱觀察者模式。定義對象間一種一對多的依賴關係(註冊),使得當一個對象改變狀態後,則全部依賴它的對象都會獲得通知並被自動更新(通知)。說白了就是個註冊,通知的過程。java

觀察者模式的UML圖

  • Subject主題,也就是被觀察者Observable

簡單使用觀察者模式

先來定下一種場景,我每月都會在微信上,收到房東的房租單(跪舔有房的大佬們),假設房東是被觀察者,我是觀察者,租客我把聯繫方式留給房東這就是一個註冊過程,每月月初,房東大佬都會通知咱們交租(被觀察者變化通知觀察者)。設計模式

JDK爲咱們提供了Observer與Observable內置接口,咱們能夠很方便的使用它們。安全

public class TheOppressed implements Observer {
    public String name;
    public TheOppressed(String name){
        this.name = name;
    }
    @Override
    public void update(Observable o, Object arg) {
        System.out.println(name +" 收到要交租信息");
    }
}
public class Exploiter extends Observable {

    public void postChange(String content){
        setChanged();

        notifyObservers(content);
    }
}

    public static void main(String[] args) {
        Exploiter exploiter = new Exploiter();

        TheOppressed theOppressed1 = new TheOppressed("打工仔1");
        TheOppressed theOppressed2 = new TheOppressed("打工仔2");
        TheOppressed theOppressed3 = new TheOppressed("打工仔3");

        exploiter.addObserver(theOppressed1);
        exploiter.addObserver(theOppressed2);
        exploiter.addObserver(theOppressed3);

        exploiter.postChange("打工仔們快來交房租啦~~");

    }
複製代碼

其實每每觀察者模式咱們本身也能夠來寫,而不用系統提供的方式微信

/** * 觀察者 */
public interface MyObserver {
    /** * 找我 * @param content 找我啥事 */
    void callMe(String content);
}
/** * 被觀察者 */
public interface MySubject {
    /** * 觀察者註冊 */
    void registerObserver(MyObserver observer);

    /** * 刪除觀察者 */
    void removeObserver(MyObserver observer);

    /** * 主題有變化時通知觀察者 */
    void notifyObserver();
}
複製代碼

先定義兩個接口,實現了觀察者模式的基本方法多線程

/** * 房東 */
public class Exploiter implements MySubject {
    private String name;
    //存放觀察者的集合
    private List<MyObserver> observers;
    private String content;

    public Exploiter(String name) {
        this.name = name;
        observers = new ArrayList<>();
        content = "";
    }

    @Override
    public void registerObserver(MyObserver observer) {
        this.observers.add(observer);
    }

    @Override
    public void removeObserver(MyObserver observer) {
        int index = observers.indexOf(observer);
        if (index > 0) {
            observers.remove(observer);
        }
    }

    @Override
    public void notifyObserver() {
        for (int i = 0; i < observers.size(); i++) {
            //遍歷全部房客,並調用他們的通知方法
            MyObserver observer = observers.get(i);
            observer.callMe(content);
        }
    }

   /** * 房東要發信息啦 */
    public void postMessage(String content) {
        if (null != content && !content.trim().equals("")) {
            this.content = content;
            this.notifyObserver();
        }
    }
}

/** * 勞動人民 */
public class TheOppressed implements MyObserver {
    public String name;

    public TheOppressed(String name) {
        this.name = name;
    }

    @Override
    public void callMe(String content) {
        System.out.println(name + " " + content);
    }
}
複製代碼

調用代碼用下併發

public class Test {
    public static void main(String[] args) {
        Exploiter exploiter = new Exploiter("房東");

        TheOppressed theOppressed1 = new TheOppressed("打工仔1");
        TheOppressed theOppressed2 = new TheOppressed("打工仔2");
        TheOppressed theOppressed3 = new TheOppressed("打工仔3");

        exploiter.registerObserver(theOppressed1);
        exploiter.registerObserver(theOppressed2);
        exploiter.registerObserver(theOppressed3);

        exploiter.postMessage("打工仔們快來交房租啦~~");
    }
}
//結果
打工仔1 打工仔們快來交房租啦~~
打工仔2 打工仔們快來交房租啦~~
打工仔3 打工仔們快來交房租啦~~
複製代碼

其實在JDK9以後,Observer 這些都已經被廢棄了,主要由於它異步

  • 不可序列化,Observable是個類,而不是一個接口,沒有實現Serializable,因此,不能序列化和它的子類
  • 沒有線程安全,方法能夠被其子類覆蓋,而且事件通知能夠以不一樣的順序而且可能在不一樣的線程上發生。
  • 可使用PropertyChangeEvent和PropertyChangeListener,它是java.beans包下的類

手寫線程安全的觀察者模式

其實咱們從上面很容易看出來,多觀察者須要串行調用,被觀察者發生動做,觀察者要做出迴應,若是觀察霆太多,並且處理時間長怎麼辦?用異步,也許你會脫口而出,那麼,異步的處理就要考慮到線程安全和隊列的問題。ide

就是剛剛一樣的場景,若是房東先發了一條漲租200,後來又發了一條,收房租(固然要多準備200),假設延遲性是很是的大的狀況下,咱們不可能單線程串行一直等,太費性能了,開了多線程的狀況下,那麼就會出現問題,可能某人會先收到交房租,這樣就亂了。函數

現實中有好多這種併發場景,一個或者多個線程,要等待另外一組線程執行完成後,才能繼續執行的問題,jdk已的com.util.concurrent下爲咱們提供了不少多線程的類,之後有機會再細講,今天先用一個,CountDownLatch

CountDownLatch

CountDownLatch就一個線程同步工具,它至關於一個倒序計數器,用來協調多個線程的執行。多個線程經過調用它們所共享的計數器CountDownLatch的countDown方法來讓計數器減1。經過await方法來阻塞當前線程,直到計數器變成0。達到線程阻塞直至其餘線程執行完成被從新喚醒。主要有三個方法:

  • 構造函數,初始化state的值,state等於同步線程數
  • await(),讓線程阻塞
  • countDown(),計數器(state)減1的方法。

概念抄完,直接下手

先來定義一下Observer與Subject的接口

/** * 被觀察者 */
public interface MySubject {
    /** * 觀察者註冊 */
    void registerObserver(MyObserver observer);

    /** * 刪除觀察者 */
    void removeObserver(MyObserver observer);

    /** * 主題有變化時通知觀察者 */
    void notifyObserver();
}

/** * 觀察者 */
public interface MyObserver {
    /** * 找我 * @param content 找我啥事 */
    void callMe(String content);

    /** * 觀察者名字 */
    String getName();
}
複製代碼

而後能夠實現租客與房東的實現類

/** * 房東 */
public class Exploiter implements MySubject {
    private String name;
    // private List<MyObserver> observers;
    private ConcurrentMap<String, MyObserver> observers;
    private String content;

    public Exploiter(String name) {
        this.name = name;
// observers = new ArrayList<>();
        observers = new ConcurrentHashMap<>();
        content = "";
    }

    @Override
    public void registerObserver(MyObserver observer) {
// this.observers.add(observer);
        this.observers.put(observer.getName(), observer);
    }

    @Override
    public void removeObserver(MyObserver observer) {
        observers.remove(observer.getName());
// int index = observers.indexOf(observer);
// if (index > 0) {
// observers.remove(observer);
// }
    }

    @Override
    public void notifyObserver() {
        try {
            long beginTime = System.currentTimeMillis();
            CountDownLatch latch = new CountDownLatch(observers.size());
            int i = 0;
            for (MyObserver observer : observers.values()) {
                MessageSending messageSending = new MessageSending(latch, observer, content);
                messageSending.start();
                i++;
            }
// for (int i = 0; i < observers.size(); i++) {
// MyObserver observer = observers.get(i);
// observer.callMe(content);
// }
            latch.await();
            long endTime = System.currentTimeMillis();
            System.out.println(name + "消息發送完畢,耗時:" + (endTime - beginTime));
            System.out.println();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /** * 房東要發信息啦 */
    public void postMessage(String content) {
        if (null != content && !content.trim().equals("")) {
            this.content = content;
            System.out.println(name + "發佈信息:" + content);
            this.notifyObserver();
        }
    }
}
/** * 勞動人民 */
public class TheOppressed implements MyObserver {
    public String name;

    public TheOppressed(String name) {
        this.name = name;
    }

    @Override
    public void callMe(String content) {
        System.out.println(name + "接收到通知:" + content);
    }

    @Override
    public String getName() {
        return name;
    }
}
複製代碼

最後在main方法調用以下:

public class Test {
    public static void main(String[] args) {
        Exploiter exploiter = new Exploiter("房東");

        TheOppressed theOppressed1 = new TheOppressed("打工仔1");
        TheOppressed theOppressed2 = new TheOppressed("打工仔2");
        TheOppressed theOppressed3 = new TheOppressed("打工仔3");

        exploiter.registerObserver(theOppressed1);
        exploiter.registerObserver(theOppressed2);
        exploiter.registerObserver(theOppressed3);

        exploiter.postMessage("這個月房租加200!!!");
        exploiter.postMessage("打工仔們快來交房租啦~~");
    }
}
複製代碼

能夠看到輸出結果爲:

房東發佈信息:這個月房租加200!!!
打工仔2接收到通知:這個月房租加200!!!
打工仔1接收到通知:這個月房租加200!!!
往打工仔1消息發送完畢
往打工仔2消息發送完畢
打工仔3接收到通知:這個月房租加200!!!
往打工仔3消息發送完畢
房東消息發送完畢,耗時:2004

房東發佈信息:打工仔們快來交房租啦~~
打工仔1接收到通知:打工仔們快來交房租啦~~
往打工仔1消息發送完畢
打工仔2接收到通知:打工仔們快來交房租啦~~
往打工仔2消息發送完畢
打工仔3接收到通知:打工仔們快來交房租啦~~
往打工仔3消息發送完畢
房東消息發送完畢,耗時:2001
複製代碼

能夠看到,使用多線程往觀察者發送信息,觀察者均可以很迅速的接收到信息,這是並行的,可是又保證了被觀察者的多個消息之間是有前後順序的。

總結

觀察者模式主要是對象的解耦,將觀察者與被觀察者之間徹底隔離。jdk提供的默認觀察者Observer/Observable在多線程下有安全性問題,須要本身手寫,JDK9以後已經廢棄了。

PS:今天看的比較簡單,卻也挺重要吧,之前歷來沒注意過關於觀察者模式的 多線程安全性問題,也算是查漏補缺吧。


參考

《Android源碼設計模式》

深刻理解設計模式(八):觀察者模式

觀察者模式(二)——多線程與CountDownLatch淺析


個人CSDN

下面是個人公衆號,歡迎你們關注我

相關文章
相關標籤/搜索