設計模式 | 中介者模式及典型應用

本文的主要內容:java

  • 介紹中介者模式
  • 數據同步示例
  • 中介者模式總結
  • 源碼分析中介者模式的典型應用
    • Java Timer 中的中介者模式

更多內容可訪問個人我的博客:laijianfeng.org
關注【小旋鋒】微信公衆號,及時接收博文推送mysql

長按關注【小旋鋒】微信公衆號

中介者模式

世界上存在着各類各樣的數據庫,不一樣數據庫有各自的應用場景,對於同一份數據,最開始可能使用關係型數據庫(如MySQL)進行存儲查詢,使用Redis做爲緩存數據庫,當數據量較大時使用MySQL進行查詢可能較慢,因此須要將數據同步到Elasticsearch或者列式數據庫如Hbase中進行大數據查詢。redis

如何設計數據同步方案是一個重要的問題。數據源衆多,目標端也衆多,設計得很差可能 "牽一髮而動全身"。sql

若是咱們這樣設計:每一個數據源直接同步數據到目標端數據庫的,若是數據庫有 N 個,那麼最多可能的同步做業將達到 N * N 個,當修改了其中一個數據庫的某些配置,可能須要修改另外的 N - 1 個數據庫的同步做業。數據庫

如今介紹另外一種方案,DataX 是阿里巴巴集團內被普遍使用的離線數據同步工具/平臺,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各類異構數據源之間高效的數據同步功能。設計模式

DataX

DataX 其實至關於一箇中介,從數據源讀取數據,寫入到目標端,數據源再也不須要維護到目標端的同步做業,只須要與 DataX 通訊便可。DataX 體現了中介者模式的思想。緩存

中介者模式(Mediator Pattern):用一箇中介對象(中介者)來封裝一系列的對象交互,中介者使各對象不須要顯式地相互引用,從而使其耦合鬆散,並且能夠獨立地改變它們之間的交互。中介者模式又稱爲調停者模式,它是一種對象行爲型模式。bash

角色

Mediator(抽象中介者):它定義一個接口,該接口用於與各同事對象之間進行通訊。微信

ConcreteMediator(具體中介者):它是抽象中介者的子類,經過協調各個同事對象來實現協做行爲,它維持了對各個同事對象的引用。多線程

Colleague(抽象同事類):它定義各個同事類公有的方法,並聲明瞭一些抽象方法來供子類實現,同時它維持了一個對抽象中介者類的引用,其子類能夠經過該引用來與中介者通訊。

ConcreteColleague(具體同事類):它是抽象同事類的子類;每個同事對象在須要和其餘同事對象通訊時,先與中介者通訊,經過中介者來間接完成與其餘同事類的通訊;在具體同事類中實現了在抽象同事類中聲明的抽象方法。

中介者模式的核心在於中介者類的引入,在中介者模式中,中介者類承擔了兩方面的職責

  • 中轉做用(結構性):經過中介者提供的中轉做用,各個同事對象就再也不須要顯式引用其餘同事,當須要和其餘同事進行通訊時,可經過中介者來實現間接調用。該中轉做用屬於中介者在結構上的支持。
  • 協調做用(行爲性):中介者能夠更進一步的對同事之間的關係進行封裝,同事能夠一致的和中介者進行交互,而不須要指明中介者須要具體怎麼作,中介者根據封裝在自身內部的協調邏輯,對同事的請求進行進一步處理,將同事成員之間的關係行爲進行分離和封裝。

示例

咱們來實現一個簡化版的數據同步方案,有三種數據庫 Mysql、Redis、Elasticsearch,其中的 Mysql 做爲主數據庫,當增長一條數據時須要同步到另外兩個數據庫中;Redis 做爲緩存數據庫,當增長一條數據時不須要同步到另外另個數據庫;而 Elasticsearch 做爲大數據查詢數據庫,有一個統計功能,當增長一條數據時只須要同步到 Mysql,因此它們之間的關係圖以下所示。

簡化的數據同步需求

首先咱們來實現第一種不使用中介者模式的數據同步方案,各數據源維護各自的同步做業。

抽象數據庫

public abstract class AbstractDatabase {
    public abstract void add(String data);

    public abstract void addData(String data);
}
複製代碼

具體數據庫 Mysql,維護同步到 Redis和Elasticsearch 的同步做業

public class MysqlDatabase extends AbstractDatabase {
    private List<String> dataset = new ArrayList<String>();
    @Setter
    private RedisDatabase redisDatabase;
    @Setter
    private EsDatabase esDatabase;

    @Override
    public void addData(String data) {
        System.out.println("Mysql 添加數據:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.redisDatabase.addData(data);   // 維護同步到Redis的同步做業
        this.esDatabase.addData(data);  // 維護同步到Elasticsearch的同步做業
    }

    public void select() {
        System.out.println("- Mysql 查詢,數據:" + this.dataset.toString());
    }
}
複製代碼

具體數據庫 Redis,不須要同步到其它數據庫

public class RedisDatabase extends AbstractDatabase {
    private List<String> dataset = new LinkedList<String>();

    @Override
    public void addData(String data) {
        System.out.println("Redis 添加數據:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data); // 不一樣步到其它數據庫
    }

    public void cache() {
        System.out.println("- Redis 緩存的數據:" + this.dataset.toString());
    }
}
複製代碼

Elasticsearch ,只須要同步到Mysql

public class EsDatabase extends AbstractDatabase {
    private List<String> dataset = new CopyOnWriteArrayList<String>();
    @Setter
    private MysqlDatabase mysqlDatabase;
    @Override
    public void addData(String data) {
        System.out.println("ES 添加數據:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mysqlDatabase.addData(data);   // 維護同步到MySQL的同步做業
    }

    public void count() {
        int count = this.dataset.size();
        System.out.println("- Elasticsearch 統計,目前有 " + count + " 條數據,數據:" + this.dataset.toString());
    }
}
複製代碼

測試客戶端,分別往三個數據庫中加入一些數據查看同步效果

public class Client {
    public static void main(String[] args) {
        MysqlDatabase mysqlDatabase = new MysqlDatabase();
        RedisDatabase redisDatabase = new RedisDatabase();
        EsDatabase esDatabase = new EsDatabase();

        mysqlDatabase.setRedisDatabase(redisDatabase);
        mysqlDatabase.setEsDatabase(esDatabase);
        esDatabase.setMysqlDatabase(mysqlDatabase);

        System.out.println("\n---------mysql 添加數據 1,將同步到Redis和ES中-----------");
        mysqlDatabase.add("1");

        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------Redis添加數據 2,將不一樣步到其它數據庫-----------");
        redisDatabase.add("2");

        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------ES 添加數據 3,只同步到 Mysql-----------");
        esDatabase.add("3");

        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();
    }
}
複製代碼

輸出結果

---------mysql 添加數據 1,將同步到Redis和ES中-----------
Mysql 添加數據:1
Redis 添加數據:1
ES 添加數據:1
- Mysql 查詢,數據:[1]
- Redis 緩存的數據:[1]
- Elasticsearch 統計,目前有 1 條數據,數據:[1]

---------Redis添加數據 2,將不一樣步到其它數據庫-----------
Redis 添加數據:2
- Mysql 查詢,數據:[1]
- Redis 緩存的數據:[1, 2]
- Elasticsearch 統計,目前有 1 條數據,數據:[1]

---------ES 添加數據 3,只同步到 Mysql-----------
ES 添加數據:3
Mysql 添加數據:3
- Mysql 查詢,數據:[1, 3]
- Redis 緩存的數據:[1, 2]
- Elasticsearch 統計,目前有 2 條數據,數據:[1, 3]
複製代碼

其實這樣已經實現了咱們的需求,可是存在一些問題

  • 系統結構複雜且耦合度高。數據源須要維護目標端數據庫的引用,以便完成數據同步
  • 組件的可重用性差。因爲每個數據源和目標端之間具備很強的關聯,若沒有目標端的支持,這個組件很難被另外一個系統或模塊重用
  • 系統的可擴展性差:若是須要增長、修改或刪除其中一個數據庫、將致使多個類的源代碼須要修改,這違反了 "開閉原則",可擴展性和靈活性欠佳。

咱們使用中介者模式來重構,將數據同步的功能遷移到中介者中,由中介者來管理數據同步做業

首先仍是抽象數據庫類(抽象同事類),維護了一箇中介者

public abstract class AbstractDatabase {
    public static final String MYSQL = "mysql";
    public static final String REDIS = "redis";
    public static final String ELASTICSEARCH = "elasticsearch";

    protected AbstractMediator mediator;    // 中介者

    public AbstractDatabase(AbstractMediator mediator) {
        this.mediator = mediator;
    }

    public abstract void addData(String data);

    public abstract void add(String data);
}
複製代碼

Mysql 數據庫(具體同事類)

public class MysqlDatabase extends AbstractDatabase {
    private List<String> dataset = new ArrayList<String>();

    public MysqlDatabase(AbstractMediator mediator) {
        super(mediator);
    }

    @Override
    public void addData(String data) {
        System.out.println("Mysql 添加數據:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mediator.sync(AbstractDatabase.MYSQL, data); // 數據同步做業交給中介者管理
    }

    public void select() {
        System.out.println("Mysql 查詢,數據:" + this.dataset.toString());
    }
}
複製代碼

Redis 數據庫(具體同事類)

public class RedisDatabase extends AbstractDatabase {
    private List<String> dataset = new LinkedList<String>();

    public RedisDatabase(AbstractMediator mediator) {
        super(mediator);
    }

    @Override
    public void addData(String data) {
        System.out.println("Redis 添加數據:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mediator.sync(AbstractDatabase.REDIS, data);    // 數據同步做業交給中介者管理
    }

    public void cache() {
        System.out.println("Redis 緩存的數據:" + this.dataset.toString());
    }
}
複製代碼

Elasticsearch(具體同事類)

public class EsDatabase extends AbstractDatabase {
    private List<String> dataset = new CopyOnWriteArrayList<String>();

    public EsDatabase(AbstractMediator mediator) {
        super(mediator);
    }

    @Override
    public void addData(String data) {
        System.out.println("ES 添加數據:" + data);
        this.dataset.add(data);
    }

    @Override
    public void add(String data) {
        addData(data);
        this.mediator.sync(AbstractDatabase.ELASTICSEARCH, data);    // 數據同步做業交給中介者管理
    }

    public void count() {
        int count = this.dataset.size();
        System.out.println("Elasticsearch 統計,目前有 " + count + " 條數據,數據:" + this.dataset.toString());
    }
}
複製代碼

抽象中介者

@Data
public abstract class AbstractMediator {
    protected MysqlDatabase mysqlDatabase;
    protected RedisDatabase redisDatabase;
    protected EsDatabase esDatabase;

    public abstract void sync(String databaseName, String data);
}
複製代碼

具體中介者

public class SyncMediator extends AbstractMediator {
    @Override
    public void sync(String databaseName, String data) {
        if (AbstractDatabase.MYSQL.equals(databaseName)) {
            // mysql 同步到 redis 和 Elasticsearch
            this.redisDatabase.addData(data);
            this.esDatabase.addData(data);
        } else if (AbstractDatabase.REDIS.equals(databaseName)) {
            // redis 緩存同步,不須要同步到其餘數據庫
        } else if (AbstractDatabase.ELASTICSEARCH.equals(databaseName)) {
            // Elasticsearch 同步到 Mysql
            this.mysqlDatabase.addData(data);
        }
    }
}
複製代碼

測試客戶端

public class Client {
    public static void main(String[] args) {
        AbstractMediator syncMediator = new SyncMediator();
        MysqlDatabase mysqlDatabase = new MysqlDatabase(syncMediator);
        RedisDatabase redisDatabase = new RedisDatabase(syncMediator);
        EsDatabase esDatabase = new EsDatabase(syncMediator);

        syncMediator.setMysqlDatabase(mysqlDatabase);
        syncMediator.setRedisDatabase(redisDatabase);
        syncMediator.setEsDatabase(esDatabase);

        System.out.println("\n---------mysql 添加數據 1,將同步到Redis和ES中-----------");
        mysqlDatabase.add("1");
        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------Redis添加數據 2,將不一樣步到其它數據庫-----------");
        redisDatabase.add("2");
        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();

        System.out.println("\n---------ES 添加數據 3,只同步到 Mysql-----------");
        esDatabase.add("3");
        mysqlDatabase.select();
        redisDatabase.cache();
        esDatabase.count();
    }
}
複製代碼

輸出結果,與預期一致

---------mysql 添加數據 1,將同步到Redis和ES中-----------
Mysql 添加數據:1
Redis 添加數據:1
ES 添加數據:1
- Mysql 查詢,數據:[1]
- Redis 緩存的數據:[1]
- Elasticsearch 統計,目前有 1 條數據,數據:[1]

---------Redis添加數據 2,將不一樣步到其它數據庫-----------
Redis 添加數據:2
- Mysql 查詢,數據:[1]
- Redis 緩存的數據:[1, 2]
- Elasticsearch 統計,目前有 1 條數據,數據:[1]

---------ES 添加數據 3,只同步到 Mysql-----------
ES 添加數據:3
Mysql 添加數據:3
- Mysql 查詢,數據:[1, 3]
- Redis 緩存的數據:[1, 2]
- Elasticsearch 統計,目前有 2 條數據,數據:[1, 3]
複製代碼

畫出類圖以下

示例.中介者模式

中介者模式總結

中介者模式的主要優勢

  • 中介者模式簡化了對象之間的交互,它用中介者和同事的一對多交互代替了原來同事之間的多對多交互,一對多關係更容易理解、維護和擴展,將本來難以理解的網狀結構轉換成相對簡單的星型結構。

  • 中介者模式可將各同事對象解耦。中介者有利於各同事之間的鬆耦合,咱們能夠獨立的改變和複用每個同事和中介者,增長新的中介者和新的同事類都比較方便,更好地符合 "開閉原則"。

  • 能夠減小子類生成,中介者將本來分佈於多個對象間的行爲集中在一塊兒,改變這些行爲只需生成新的中介者子類便可,這使各個同事類可被重用,無須對同事類進行擴展。

中介者模式的主要缺點

  • 具體中介者類中包含了大量同事之間的交互細節,可能會致使具體中介者類很是複雜,使得系統難以維護。(也就是把具體同事類之間的交互複雜性集中到了中介者類中,結果中介者成了最複雜的類)

適用場景

  • 系統中對象之間存在複雜的引用關係,系統結構混亂且難以理解。

  • 一個對象因爲引用了其餘不少對象而且直接和這些對象通訊,致使難以複用該對象。

  • 想經過一箇中間類來封裝多個類中的行爲,而又不想生成太多的子類。能夠經過引入中介者類來實現,在中介者中定義對象交互的公共行爲,若是須要改變行爲則能夠增長新的具體中介者類。

中介者模式的典型應用

Java Timer 中的中介者模式

敲一個 java.util.Timer 的Demo

兩個任務類

public class MyOneTask extends TimerTask {
    private static int num = 0;
    @Override
    public void run() {
        System.out.println("I'm MyOneTask " + ++num);
    }
}

public class MyTwoTask extends TimerTask {
    private static int num = 1000;
    @Override
    public void run() {
        System.out.println("I'm MyTwoTask " + num--);
    }
}
複製代碼

客戶端測試,3秒後開始執行,循環週期爲 1秒

public class TimerTest {
    public static void main(String[] args) {
        // 注意:多線程並行處理定時任務時,Timer運行多個TimeTask時,只要其中之一沒有捕獲拋出的異常,
        // 其它任務便會自動終止運行,使用ScheduledExecutorService則沒有這個問題
        Timer timer = new Timer();
        timer.schedule(new MyOneTask(), 3000, 1000); // 3秒後開始運行,循環週期爲 1秒
        timer.schedule(new MyTwoTask(), 3000, 1000);
    }
}
複製代碼

輸出

I'm MyOneTask 1 I'm MyTwoTask 1000
I'm MyTwoTask 999 I'm MyOneTask 2
I'm MyOneTask 3 I'm MyTwoTask 998
I'm MyTwoTask 997 I'm MyOneTask 4
I'm MyOneTask 5 I'm MyTwoTask 996
I'm MyTwoTask 995 I'm MyOneTask 6
...
複製代碼

Timer 的部分關鍵源碼以下

public class Timer {

    private final TaskQueue queue = new TaskQueue();
    private final TimerThread thread = new TimerThread(queue);
    
    public void schedule(TimerTask task, long delay) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        sched(task, System.currentTimeMillis()+delay, 0);
    }
    
    public void schedule(TimerTask task, Date time) {
        sched(task, time.getTime(), 0);
    }
    
    private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");

        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;

        // 獲取任務隊列的鎖(同一個線程屢次獲取這個鎖並不會被阻塞,不一樣線程獲取時纔可能被阻塞)
        synchronized(queue) {
            // 若是定時調度線程已經終止了,則拋出異常結束
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");

            // 再獲取定時任務對象的鎖(爲何還要再加這個鎖呢?想不清)
            synchronized(task.lock) {
                // 判斷線程的狀態,防止多線程同時調度到一個任務時屢次被加入任務隊列
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                // 初始化定時任務的下次執行時間
                task.nextExecutionTime = time;
                // 重複執行的間隔時間
                task.period = period;
                // 將定時任務的狀態由TimerTask.VIRGIN(一個定時任務的初始化狀態)設置爲TimerTask.SCHEDULED
                task.state = TimerTask.SCHEDULED;
            }
            
            // 將任務加入任務隊列
            queue.add(task);
            // 若是當前加入的任務是須要第一個被執行的(也就是他的下一次執行時間離如今最近)
            // 則喚醒等待queue的線程(對應到上面提到的queue.wait())
            if (queue.getMin() == task)
                queue.notify();
        }
    }
    
    // cancel會等到全部定時任務執行完後馬上終止定時線程
    public void cancel() {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.clear();
            queue.notify();  // In case queue was already empty.
        }
    }
    // ...
}
複製代碼

Timer 中在 schedulexxx 方法中經過 TaskQueue 協調各類 TimerTask 定時任務,Timer 是中介者,TimerTask 是抽象同事類,而咱們本身寫的任務則是具體同事類

TimerThreadTimer 中定時調度線程類的定義,這個類會作爲一個線程一直運行來執行 Timer 中任務隊列中的任務。

Timer 這個中介者的功能就是定時調度咱們寫的各類任務,將任務添加到 TaskQueue 任務隊列中,給 TimerThread 執行,讓任務與執行線程解耦

其餘的中介者模式應用

  • java.util.concurrent.Executor#executejava.util.concurrent.ExecutorService#submitTimer#schedule 相似

  • MVC模式中,Controller 是中介者,根據 View 層的請求來操做 Model 層

參考:
劉偉:設計模式Java版
慕課網java設計模式精講 Debug 方式+內存分析
java.util系列源碼解讀之Timer定時器

後記

歡迎評論、轉發、分享,您的支持是我最大的動力

推薦閱讀

設計模式 | 簡單工廠模式及典型應用
設計模式 | 工廠方法模式及典型應用
設計模式 | 抽象工廠模式及典型應用
設計模式 | 建造者模式及典型應用
設計模式 | 原型模式及典型應用
設計模式 | 外觀模式及典型應用
設計模式 | 裝飾者模式及典型應用
設計模式 | 適配器模式及典型應用
設計模式 | 享元模式及典型應用
設計模式 | 組合模式及典型應用
設計模式 | 模板方法模式及典型應用
設計模式 | 迭代器模式及典型應用
設計模式 | 策略模式及典型應用
設計模式 | 觀察者模式及典型應用
設計模式 | 備忘錄模式及典型應用

相關文章
相關標籤/搜索