[從源碼學設計]螞蟻金服SOFARegistry之消息總線

[從源碼學設計]螞蟻金服SOFARegistry之消息總線

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。javascript

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓你們藉以學習阿里如何設計。html

本文爲第四篇,介紹SOFARegistry之消息總線。java

0x01 相關概念

1.1 事件驅動模型

事件驅動模型,也便是咱們一般說的觀察者。基於發佈-訂閱模式的編程模型。spring

1.1.1 概念

定義對象間的一種一對多的依賴關係,當一個對象的狀態發生變化時,全部依賴它的對象都獲得通知並自動更新。編程

從程序設計的角度來看,事件驅動模型的核心構件一般包含如下幾個:數組

  • 事件源:負責產生事件的對象。好比咱們常見的按鈕,按鈕就是一個事件源,可以產生「點擊」這個事件
  • 事件監聽器(事件處理器):負責處理事件的對象
  • 事件:或者稱爲事件對象,是事件源和事件監聽器之間的信息橋樑。是整個事件模型驅動的核心

1.1.2 應用環境

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:網絡

  • 程序中有許多任務;
  • 任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此);
  • 在等待事件到來時,某些任務會阻塞;

1.2 消息總線

總線(Bus)通常指計算機各類功能部件之間傳送信息的公共通訊幹線,而EventBus則是事件源(publisher)向訂閱方(subscriber)發送訂閱事件的總線,它解耦了觀察者模式中訂閱方和事件源之間的強依賴關係session

消息總線扮演着一種消息路由的角色,擁有一套完備的路由機制來決定消息傳輸方向。發送端只須要向消息總線發出消息而不用管消息被如何轉發,爲了避免消息丟失,部分消息總線提供了必定的持久化存儲和災備的機制架構

消息總線簡單理解就是一個消息中心,衆多微服務實例能夠鏈接到總線上,實例能夠往消息中心發送或接收信息(經過監聽)。app

通常的應用的場景就是在用觀察者模式的地方就能夠用EventBus進行替代。

img

0x02 業務領域

2.1 業務範疇

DataServer 本質上是一個網絡應用程序,因此有以下特色:

  • 須要處理各個方面發送來的消息;
  • 程序中任務繁多,任務之間獨立,大多數任務不存在互斥通信等操做;在等待事件到來時,某些任務會阻塞;
  • 某一個消息每每有多個投遞源;

所以自然適合用事件驅動機制來實現。

2.2 問題點

可以想到的問題點以下:

  • 由於一個事件每每會有多個投遞源,如何解耦事件投遞和事件處理之間的邏輯?
  • 怎樣實現Listener一次註冊,就可以知道Listener對那些事件感興趣的,進而在有某類事件發生時通知到Listener的呢?
  • 如何使得一個Listener能夠處理多個事件?
  • 如何使得一個事件被多個Listener處理?
  • 能否簡化註冊流程?
  • 是否須要維護消息順序?
  • 處理消息方式是異步仍是同步?
  • 多個一樣消息是否要歸併?

具體咱們在後文會詳述阿里的思路。

2.3 解決方案

DataServer 內部邏輯主要是經過事件驅動機制來實現的,下圖列舉了部分事件在事件中心的交互流程,從圖中能夠看到,一個事件每每會有多個投遞源,很是適合用 EventCenter 來解耦事件投遞和事件處理之間的邏輯;

0x03 EventCenter

業界消息總線有不少,好比 Android EventBus是一個發佈/訂閱事件總線框架,基於觀察者模式,將事件的接收者和發送者分開,簡化了組件之間的通訊。

而SOFARegistry EventCenter 的做用也相似:從邏輯上解耦,將事件的接收者和發送者分開,簡化組件之間通訊。阿里的實現有本身的特色,開發者能夠借鑑這裏的使用技巧和思路。

3.1 目錄結構

├── event
│   ├── AfterWorkingProcess.java
│   ├── DataServerChangeEvent.java
│   ├── Event.java
│   ├── EventCenter.java
│   ├── LocalDataServerChangeEvent.java
│   ├── MetaServerChangeEvent.java
│   ├── RemoteDataServerChangeEvent.java
│   ├── StartTaskEvent.java
│   ├── StartTaskTypeEnum.java
│   └── handler
│       ├── AbstractEventHandler.java
│       ├── AfterWorkingProcessHandler.java
│       ├── DataServerChangeEventHandler.java
│       ├── LocalDataServerChangeEventHandler.java
│       ├── MetaServerChangeEventHandler.java
│       └── StartTaskEventHandler.java

3.2 類定義

類定義以下:

public class EventCenter {

    private Multimap<Class<? extends Event>, AbstractEventHandler> MAP = ArrayListMultimap.create();

    /**
     * eventHandler register
     * @param handler
     */
    public void register(AbstractEventHandler handler) {
        List<Class<? extends Event>> interests = handler.interest();
        for (Class<? extends Event> interest : interests) {
            MAP.put(interest, handler);
        }
    }

    /**
     * event handler handle process
     * @param event
     */
    public void post(Event event) {
        Class clazz = event.getClass();
        if (MAP.containsKey(clazz)) {
            Collection<AbstractEventHandler> handlers = MAP.get(clazz);
            if (handlers != null) {
                for (AbstractEventHandler handler : handlers) {
                    handler.handle(event);
                }
            }
        } else {
            throw new RuntimeException("no suitable handler was found:" + clazz);
        }
    }
}

3.2.1 操做

普通 EventBus 大多有三個操做:

  • 註冊 Listener--register (Object Listener);
  • 註銷 Listener--unregister (Object Listener);
  • 發佈 Event--post (Object event);

可是阿里的EventCenter並無註銷操做,由於業務上不須要,因此只有以下接口。

  • register(AbstractEventHandler handler) 的工做就是找出這個Listener對哪些事件感興趣,而後把這種事件類型和對應的Listener註冊到 EventCenter;
  • post一個event時候,會遍歷這個消息的處理函數列表,逐一調用處理函數,其實就是同步執行了,固然也許 EventHandler 內部本身實現了異步;由於是同步執行,因此不須要維持消息的有序性,不然須要使用queue來實現每一個線程post的Event是有序的;

具體使用舉例以下:在MetaServerChangeEventHandler中有以下代碼投放消息。

eventCenter.post(new StartTaskEvent(set));

eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,
        DataServerChangeEvent.FromType.REGISTER_META));

3.2.2 執行 & 解耦

handler中聲明瞭本身支持什麼種類的event,當register時候,會以event爲key,把本身註冊到eventCenter的map中,在 post 函數中,根據event的class,取出了handler,從而執行,也作到了解耦。

3.2.3 Listener列表

在觀察者模式中,事件源中會維護一個Listener的列表,並且向這個事件源註冊的Listener通常只會收到一類事件的通知,若是Listener對多個不一樣類的事件感興趣,則須要向多個事件源註冊。

EventCenter 是怎樣實現Listener一次註冊,可以知道Listener對那些事件感興趣的,進而在有某類事件發生時通知到Listener的呢

答案在ArrayListMultimap,其key是Event,其 Value 就是 AbstractEventHandler。這個 map 就是 Event 事件類型 和對其感興趣的處理函數的列表,一個 Event 可能有多個處理函數。

3.2.4 ArrayListMultimap

顧名思義,com.google.common.collect.ArrayListMultimap 能夠在key對應的value中設置一個ArrayList。這樣就保證了一個事件能夠有多個處理函數

具體能夠見下例子。

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;

import java.util.Collection;

public class testArrayListMultimap {
     static void main() {
        Multimap<String, String> multimap = ArrayListMultimap.create();
        multimap.put("fruit", "banana");
        multimap.put("fruit", "apple");
        multimap.put("fruit", "apple");
        multimap.put("fruit", "peach");
        multimap.put("fish","crucian");
        multimap.put("fish","carp");

        System.err.println(multimap.size());//6
        Collection<String> fruits = multimap.get("fruit");
        System.err.println(fruits);//[bannana, apple, apple, peach]
    }
}

3.3 Listener

Listener 是由 AbstractEventHandler 的派生類實現的。

3.3.1 基類

EventHandler基類AbstractEventHandler定義具體以下:

public abstract class AbstractEventHandler<Event> implements InitializingBean {

    @Autowired
    private EventCenter         eventCenter;

    @Override
    public void afterPropertiesSet() throws Exception {
        eventCenter.register(this);
    }

    /**
     * event handle func
     * @param event
     */
    public void handle(Event event) {
            doHandle(event);
    }

    public abstract List<Class<? extends Event>> interest();

    public abstract void doHandle(Event event);
}

其主要做用爲三點:

  • 派生類必須實現interest來聲明本身想處理什麼Event,並且Event是配置在一個數組中,這樣就使得一個函數能夠處理多個事件
@Override
public List<Class<? extends LocalDataServerChangeEvent>> interest() {
    return Lists.newArrayList(LocalDataServerChangeEvent.class);
}
  • 派生類實現doHandle來處理消息;

  • 由於afterPropertiesSet中作了設定,因此每個繼承此類的Handler都會自動註冊到EventCenter之中

3.3.2 派生類

以MetaServerChangeEventHandler爲例,只要在interest函數中聲明本身對哪些消息感興趣,在doHandle函數中實現業務便可。

public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> {
  
    @Override
    public List<Class<? extends MetaServerChangeEvent>> interest() {
        return Lists.newArrayList(MetaServerChangeEvent.class);
    }
  
		@Override
    public void doHandle(MetaServerChangeEvent event) {
       ......
    }
}

3.3.2 自動註冊

這裏須要專門說一下自動註冊,由於初接觸者很容易疏漏從而感到奇怪。

自動註冊使用的是Spring的afterPropertiesSet方法完成

afterPropertiesSet方法能夠針對某個具體的bean進行配置,其將在Bean全部的屬性被初始化後調用,可是會在init前調用。afterPropertiesSet 必須實現 InitializingBean接口。

package org.springframework.beans.factory;

public interface InitializingBean {
    void afterPropertiesSet() throws Exception;
}

基類AbstractEventHandler實現InitializingBean接口。

public abstract class AbstractEventHandler<Event> implements InitializingBean

而每個派生類就註冊了派生類自己到eventCenter。

@Override
public void afterPropertiesSet() throws Exception {
    eventCenter.register(this);
}

3.4 核心消息

具體涉及到業務,EventCenter主要處理三種消息:

  • DataServerChangeEvent,是其餘Data Server的節點變化消息;
  • MetaServerChangeEvent,是Meta Sever的變化消息;
  • StartTaskEvent:;

分別對應三個消息處理handler:

  • public class DataServerChangeEventHandler extends AbstractEventHandler

  • public class MetaServerChangeEventHandler extends AbstractEventHandler

  • public class StartTaskEventHandler extends AbstractEventHandler

咱們用 StartTaskEvent 舉例,具體消息內容根據具體業務設置。

public class StartTaskEvent implements Event {
    private final Set<StartTaskTypeEnum> suitableTypes;

    public StartTaskEvent(Set<StartTaskTypeEnum> suitableTypes) {
        this.suitableTypes = suitableTypes;
    }

    public Set<StartTaskTypeEnum> getSuitableTypes() {
        return suitableTypes;
    }
}

3.5 主要邏輯

EventCenter主要邏輯以下圖所示:

+------------------------------+
          | MetaServerChangeEventHandler |
          +-----------+------------------+
                      |
                      |  post(new StartTaskEvent)
                      |
                      |
                      |                                      +------------------------+
                      v                                      |  StartTaskEventHandler |
+---------------------+-----------------------+              |                        |
|                EventCenter                  |              | +--------------------+ |
|                                             |              | |                    | |
| +-----------------------------------------+ +---------------------> doHandle      | |
| |Multimap< <Event>, AbstractEventHandler> | |              | |                    | |
| +-----------------------------------------+ | <--------------+ afterPropertiesSet | |
|                                             |  register    | |                    | |
+---------------------------------------------+              | |      interest      | |
                                                             | |                    | |
                                                             | +--------------------+ |
                                                             +------------------------+

手機以下圖:

0x04 總結

SOFARegistry EventCenter 的做用與業界大多總線相似:從邏輯上解耦,將事件的接收者和發送者分開,簡化組件之間通訊。可是阿里的實現有本身的特色,開發者能夠借鑑這裏的使用技巧和思路。

針對咱們前面提出的問題,如今回答以下:

  • 由於一個事件每每會有多個投遞源,如何解耦事件投遞和事件處理之間的邏輯?
    • 答案:handler中聲明瞭本身支持什麼種類的event,當register時候會以event爲key,把本身註冊到eventCenter的map中;在 post 函數中,根據event的class,取出了handler從而執行,也作到了解耦。
  • 怎樣實現Listener一次註冊,就可以知道Listener對那些事件感興趣的,進而在有某類事件發生時通知到Listener的呢?
    • 答案:派生類必須實現interest來聲明本身想處理什麼Event;
  • 如何使得一個Listener能夠處理多個事件?
    • 答案:接上問題,Event是配置在一個數組中,這樣就使得一個函數能夠處理多個事件
  • 如何使得一個事件被多個Listener處理?
    • 答案:採用ArrayListMultimap實現listener列表;
  • 能否簡化註冊流程?
    • 答案:自動註冊,派生類不須要操心。afterPropertiesSet中作了設定,因此每個繼承此類的Handler都會自動註冊到EventCenter之中
  • 是否須要維護消息順序?
    • 答案:不須要,由於是同步處理;
  • 處理消息方式是異步仍是同步?
    • 答案:這裏是同步;
  • 多個一樣消息是否要歸併?
    • 答案:這裏不須要歸併,沒有業務需求;

0xFF 參考

Guava中EventBus分析

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 - SOFARegistry 架構介紹

相關文章
相關標籤/搜索