dubbo源碼解析(五)註冊中心——multicast

註冊中心——multicast

目標:解釋覺得multicast實現的註冊中心原理,理解單播、廣播、多播區別,解讀duubo-registry-multicast的源碼

這是dubbo實現註冊中心的第二種方式,也是dubbo的demo模塊中用的註冊中心實現方式。multicast實際上是用到了MulticastSocket來實現的。java

我這邊稍微補充一點關於多點廣播,也就是MulticastSocket的介紹。MulticastSocket類是繼承了DatagramSocket類,DatagramSocket只容許把數據報發送給一個指定的目標地址,而MulticastSocket能夠將數據報以廣播的形式發送給多個客戶端。它的思想是MulticastSocket會把一個數據報發送給一個特定的多點廣播地址,這個多點廣播地址是一組特殊的網絡地址,當客戶端須要發送或者接收廣播信息時,只要加入該組就好。IP協議爲多點廣播提供了一批特殊的IP地址,地址範圍是224.0.0.0至239.255.255.255。MulticastSocket類既能夠將數據報發送到多點廣播地址,也能夠接收其餘主機的廣播信息。git

以上是對multicast背景的簡略介紹,接下來讓咱們具體的來看dubbo怎麼把MulticastSocket運用到註冊中心的實現中。github

咱們先來看看包下面有哪些類:segmentfault

multicast目錄

能夠看到跟默認的註冊中心的包結構很是相似。接下來咱們就來解讀一下這兩個類。緩存

(一)MulticastRegistry

該類繼承了FailbackRegistry類,該類就是針對註冊中心核心的功能註冊、訂閱、取消註冊、取消訂閱,查詢註冊列表進行展開,利用廣播的方式去實現。微信

1.屬性

// logging output
// 日誌記錄輸出
private static final Logger logger = LoggerFactory.getLogger(MulticastRegistry.class);

// 默認的多點廣播端口
private static final int DEFAULT_MULTICAST_PORT = 1234;

// 多點廣播的地址
private final InetAddress mutilcastAddress;

// 多點廣播
private final MulticastSocket mutilcastSocket;

// 多點廣播端口
private final int mutilcastPort;

//收到的URL
private final ConcurrentMap<URL, Set<URL>> received = new ConcurrentHashMap<URL, Set<URL>>();

// 任務調度器
private final ScheduledExecutorService cleanExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboMulticastRegistryCleanTimer", true));

// 定時清理執行器,必定時間清理過時的url
private final ScheduledFuture<?> cleanFuture;

// 清理的間隔時間
private final int cleanPeriod;

// 管理員權限
private volatile boolean admin = false;

看上面的屬性,須要關注如下幾個點:網絡

  1. mutilcastSocket,該類是muticast註冊中心實現的關鍵,這裏補充一下單播、廣播、以及多播的區別,由於下面會涉及到。單播是每次只有兩個實體相互通訊,發送端和接收端都是惟一肯定的;廣播目的地址爲網絡中的全體目標,而多播的目的地址是一組目標,加入該組的成員均是數據包的目的地。
  2. 關注任務調度器和清理計時器,該類封裝了定時清理過時的服務的策略。

2.構造方法

public MulticastRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    if (!isMulticastAddress(url.getHost())) {
        throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255");
    }
    try {
        mutilcastAddress = InetAddress.getByName(url.getHost());
        // 若是url攜帶的配置中沒有端口號,則使用默認端口號
        mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
        mutilcastSocket = new MulticastSocket(mutilcastPort);
        // 禁用多播數據報的本地環回
        mutilcastSocket.setLoopbackMode(false);
        // 加入同一組廣播
        mutilcastSocket.joinGroup(mutilcastAddress);
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                byte[] buf = new byte[2048];
                // 實例化數據報
                DatagramPacket recv = new DatagramPacket(buf, buf.length);
                while (!mutilcastSocket.isClosed()) {
                    try {
                        // 接收數據包
                        mutilcastSocket.receive(recv);
                        String msg = new String(recv.getData()).trim();
                        int i = msg.indexOf('\n');
                        if (i > 0) {
                            msg = msg.substring(0, i).trim();
                        }
                        // 接收消息請求,根據消息並相應操做,好比註冊,訂閱等
                        MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                        Arrays.fill(buf, (byte) 0);
                    } catch (Throwable e) {
                        if (!mutilcastSocket.isClosed()) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        }, "DubboMulticastRegistryReceiver");
        // 設置爲守護進程
        thread.setDaemon(true);
        // 開啓線程
        thread.start();
    } catch (IOException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    // 優先從url中獲取清理延遲配置,若沒有,則默認爲60s
    this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    // 若是配置了須要清理
    if (url.getParameter("clean", true)) {
        // 開啓計時器
        this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    // 清理過時的服務
                    clean(); // Remove the expired
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
                }
            }
        }, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
    } else {
        this.cleanFuture = null;
    }
}

這個構造器最關鍵的就是一個線程和一個定時清理任務。socket

  1. 線程中作的工做是根據接收到的消息來斷定是什麼請求,做出對應的操做,只要mutilcastSocket沒有斷開,就一直接收消息,內部的實現體如今receive方法中,下文會展開講述。
  2. 定時清理任務是清理過時的註冊的服務。經過兩次socket的嘗試來斷定是否過時。clean方法下文會展開講述

3.isMulticastAddress

private static boolean isMulticastAddress(String ip) {
    int i = ip.indexOf('.');
    if (i > 0) {
        String prefix = ip.substring(0, i);
        if (StringUtils.isInteger(prefix)) {
            int p = Integer.parseInt(prefix);
            return p >= 224 && p <= 239;
        }
    }
    return false;
}

該方法很簡單,爲也沒寫註釋,就是判斷是否爲多點廣播地址,地址範圍是224.0.0.0至239.255.255.255。ide

4.clean

private void clean() {
    // 當url中攜帶的服務接口配置爲是*時候,才能夠執行清理
    if (admin) {
        for (Set<URL> providers : new HashSet<Set<URL>>(received.values())) {
            for (URL url : new HashSet<URL>(providers)) {
                // 判斷是否過時
                if (isExpired(url)) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Clean expired provider " + url);
                    }
                    //取消註冊
                    doUnregister(url);
                }
            }
        }
    }
}

該方法也比較簡單,關機的是如何判斷過時以及作的取消註冊的操做。下面會展開講解這幾個方法。oop

5.isExpired

private boolean isExpired(URL url) {
    // 若是爲非動態管理模式或者協議是consumer、route或者override,則沒有過時
    if (!url.getParameter(Constants.DYNAMIC_KEY, true)
            || url.getPort() <= 0
            || Constants.CONSUMER_PROTOCOL.equals(url.getProtocol())
            || Constants.ROUTE_PROTOCOL.equals(url.getProtocol())
            || Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
        return false;
    }
    Socket socket = null;
    try {
        // 利用url攜帶的主機地址和端口號實例化socket
        socket = new Socket(url.getHost(), url.getPort());
    } catch (Throwable e) {
        // 若是實例化失敗,等待100ms重試第二次,若是還失敗,則斷定已過時
        try {
            // 等待100ms
            Thread.sleep(100);
        } catch (Throwable e2) {
        }
        Socket socket2 = null;
        try {
            socket2 = new Socket(url.getHost(), url.getPort());
        } catch (Throwable e2) {
            return true;
        } finally {
            if (socket2 != null) {
                try {
                    socket2.close();
                } catch (Throwable e2) {
                }
            }
        }
    } finally {
        if (socket != null) {
            try {
                socket.close();
            } catch (Throwable e) {
            }
        }
    }
    return false;
}

這個方法就是判斷服務是否過時,有兩次嘗試socket的操做,若是嘗試失敗,則判斷爲過時。

6.receive

private void receive(String msg, InetSocketAddress remoteAddress) {
    if (logger.isInfoEnabled()) {
        logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
    }
    // 若是這個消息是以register、unregister、subscribe開頭的,則進行相應的操做
    if (msg.startsWith(Constants.REGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
        // 註冊服務
        registered(url);
    } else if (msg.startsWith(Constants.UNREGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
        // 取消註冊服務
        unregistered(url);
    } else if (msg.startsWith(Constants.SUBSCRIBE)) {
        URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
        // 得到以及註冊的url集合
        Set<URL> urls = getRegistered();
        if (urls != null && !urls.isEmpty()) {
            for (URL u : urls) {
                // 判斷是否合法
                if (UrlUtils.isMatch(url, u)) {
                    String host = remoteAddress != null && remoteAddress.getAddress() != null
                            ? remoteAddress.getAddress().getHostAddress() : url.getIp();
                    // 建議服務提供者和服務消費者在不一樣機器上運行,若是在同一機器上,需設置unicast=false
                    // 同一臺機器中的多個進程不能單播單播,或者只有一個進程接收信息,發給消費者的單播消息可能被提供者搶佔,兩個消費者在同一臺機器也同樣,
                    // 只有multicast註冊中心有此問題
                    if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
                            && !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
                        unicast(Constants.REGISTER + " " + u.toFullString(), host);
                    } else {
                        broadcast(Constants.REGISTER + " " + u.toFullString());
                    }
                }
            }
        }
    }/* else if (msg.startsWith(UNSUBSCRIBE)) {
    }*/
}

能夠很清楚的看到,根據接收到的消息開頭的數據來判斷須要作什麼類型的操做,重點在於訂閱,能夠選擇單播訂閱仍是廣播訂閱,這個取決於url攜帶的配置是什麼。

7.broadcast

private void broadcast(String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort);
    }
    try {
        byte[] data = (msg + "\n").getBytes();
        // 實例化數據報,重點是目的地址是mutilcastAddress
        DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort);
        // 發送數據報
        mutilcastSocket.send(hi);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

這是廣播的實現方法,重點是數據報的目的地址是mutilcastAddress。表明着一組地址

8.unicast

private void unicast(String msg, String host) {
    if (logger.isInfoEnabled()) {
        logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort);
    }
    try {
        byte[] data = (msg + "\n").getBytes();
        // 實例化數據報,重點是目的地址是隻是單個地址
        DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort);
        // 發送數據報
        mutilcastSocket.send(hi);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

這是單播的實現,跟廣播的區別就只是目的地址不同,單播的目的地址就只是一個地址,而廣播的是一組地址。

9.doRegister && doUnregister && doSubscribe && doUnsubscribe

@Override
protected void doRegister(URL url) {
    broadcast(Constants.REGISTER + " " + url.toFullString());
}
@Override
protected void doUnregister(URL url) {
    broadcast(Constants.UNREGISTER + " " + url.toFullString());
}
@Override
protected void doSubscribe(URL url, NotifyListener listener) {
    // 當url中攜帶的服務接口配置爲是*時候,才能夠執行清理,相似管理員權限
    if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        admin = true;
    }
    broadcast(Constants.SUBSCRIBE + " " + url.toFullString());
    // 對監聽器進行同步鎖
    synchronized (listener) {
        try {
            listener.wait(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        } catch (InterruptedException e) {
        }
    }
}
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        unregister(url);
    }
    broadcast(Constants.UNSUBSCRIBE + " " + url.toFullString());
}

這幾個方法就是實現了父類FailbackRegistry的抽象方法。都是調用了broadcast方法。

10.destroy

@Override
public void destroy() {
    super.destroy();
    try {
        // 取消清理任務
        if (cleanFuture != null) {
            cleanFuture.cancel(true);
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    try {
        // 把該地址從組內移除
        mutilcastSocket.leaveGroup(mutilcastAddress);
        // 關閉mutilcastSocket
        mutilcastSocket.close();
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    // 關閉線程池
    ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}

該方法的邏輯跟dubbo註冊中心的destroy方法相似,就多了把該地址從組內移除的操做。gracefulShutdown方法我在《dubbo源碼解析(四)註冊中心——dubbo》中已經講到。

11.register

@Override
public void register(URL url) {
    super.register(url);
    registered(url);
}
protected void registered(URL url) {
    // 遍歷訂閱的監聽器集合
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        // 判斷是否合法
        if (UrlUtils.isMatch(key, url)) {
            // 經過消費者url得到接收到的服務url集合
            Set<URL> urls = received.get(key);
            if (urls == null) {
                received.putIfAbsent(key, new ConcurrentHashSet<URL>());
                urls = received.get(key);
            }
            // 加入服務url
            urls.add(url);
            List<URL> list = toList(urls);
            for (NotifyListener listener : entry.getValue()) {
                // 把服務url的變化通知監聽器
                notify(key, listener, list);
                synchronized (listener) {
                    listener.notify();
                }
            }
        }
    }
}

能夠看到該類重寫了父類的register方法,不過邏輯沒有過多的變化,就是把須要註冊的url放入緩存中,若是通知監聽器url的變化。

12.unregister

@Override
public void unregister(URL url) {
    super.unregister(url);
    unregistered(url);
}
protected void unregistered(URL url) {
    // 遍歷訂閱的監聽器集合
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            // 緩存中移除
            if (urls != null) {
                urls.remove(url);
            }
            if (urls == null || urls.isEmpty()){
                if (urls == null){
                    urls = new ConcurrentHashSet<URL>();
                }
                // 設置攜帶empty協議的url
                URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
                urls.add(empty);
            }
            List<URL> list = toList(urls);
            // 通知監聽器 服務url變化
            for (NotifyListener listener : entry.getValue()) {
                notify(key, listener, list);
            }
        }
    }
}

這個邏輯也比較清晰,把須要取消註冊的服務url從緩存中移除,而後若是沒有接收的服務url了,就加入一個攜帶empty協議的url,而後通知監聽器服務變化。

13.lookup

@Override
public List<URL> lookup(URL url) {
    List<URL> urls = new ArrayList<URL>();
    // 經過消費者url得到訂閱的服務的監聽器
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    // 得到註冊的服務url集合
    if (notifiedUrls != null && notifiedUrls.size() > 0) {
        for (List<URL> values : notifiedUrls.values()) {
            urls.addAll(values);
        }
    }
    // 若是爲空,則從內存緩存properties得到相關value,而且返回爲註冊的服務
    if (urls.isEmpty()) {
        List<URL> cacheUrls = getCacheUrls(url);
        if (cacheUrls != null && !cacheUrls.isEmpty()) {
            urls.addAll(cacheUrls);
        }
    }
    // 若是仍是爲空則從緩存registered中得到已註冊 服務URL 集合
    if (urls.isEmpty()) {
        for (URL u : getRegistered()) {
            if (UrlUtils.isMatch(url, u)) {
                urls.add(u);
            }
        }
    }
    // 若是url攜帶的配置服務接口爲*,也就是全部服務,則從緩存subscribed得到已註冊 服務URL 集合
    if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        for (URL u : getSubscribed().keySet()) {
            if (UrlUtils.isMatch(url, u)) {
                urls.add(u);
            }
        }
    }
    return urls;
}

該方法是返回註冊的服務url列表,能夠看到有不少種得到的方法這些緩存都保存在AbstractRegistry類中,相關的介紹能夠查看《dubbo源碼解析(三)註冊中心——開篇》

14.subscribe && unsubscribe

@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    subscribed(url, listener);
}

@Override
public void unsubscribe(URL url, NotifyListener listener) {
    super.unsubscribe(url, listener);
    received.remove(url);
}
protected void subscribed(URL url, NotifyListener listener) {
    // 查詢註冊列表
    List<URL> urls = lookup(url);
    // 通知url
    notify(url, listener, urls);
}

這兩個重寫了父類的方法,分別是訂閱和取消訂閱。邏輯很簡單。

(二)MulticastRegistryFactory

該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原代碼:

public class MulticastRegistryFactory extends AbstractRegistryFactory {

    @Override
    public Registry createRegistry(URL url) {
        return new MulticastRegistry(url);
    }

}

能夠看到就是實例化了MulticastRegistry而已,全部這裏就不解釋了。

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了dubbo利用multicast來實現註冊中心,其中關鍵的是須要弄明白MulticastSocket以及單播、廣播、多播的概念,其餘的邏輯並不複雜。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。

相關文章
相關標籤/搜索