使用 RMI + ZooKeeper 實現遠程調用框架

在 Java 世界裏,有一種技術能夠實現「跨虛擬機」的調用,它就是 RMI(Remote Method Invocation,遠程方法調用)。例如,服務A 在 JVM1 中運行,服務B 在 JVM2 中運行,服務A 與 服務B 可相互進行遠程調用,就像調用本地方法同樣,這就是 RMI。在分佈式系統中,咱們使用 RMI 技術可輕鬆將 服務提供者(Service Provider)與 服務消費者(Service Consumer)進行分離,充分體現組件之間的弱耦合,系統架構更易於擴展。java

本文先從經過一個最簡單的 RMI 服務與調用示例,讓讀者快速掌握 RMI 的使用方法,而後指出 RMI 的侷限性,最後筆者對此問題提供了一種簡單的解決方案,即便用 ZooKeeper 輕鬆解決 RMI 調用過程當中所涉及的問題。node

下面咱們就從一個最簡單的 RMI 示例開始吧!apache

1 發佈 RMI 服務

發佈一個 RMI 服務,咱們只需作三件事情:服務器

  1. 定義一個 RMI 接口
  2. 編寫 RMI 接口的實現類
  3. 經過 JNDI 發佈 RMI 服務

1.1 定義一個 RMI 接口

RMI 接口實際上仍是一個普通的 Java 接口,只是 RMI 接口必須繼承 java.rmi.Remote,此外,每一個 RMI 接口的方法必須聲明拋出一個 java.rmi.RemoteException 異常,就像下面這樣:架構

<!-- lang: java -->   
package demo.zookeeper.rmi.common;

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface HelloService extends Remote {

    String sayHello(String name) throws RemoteException;
}

繼承了 Remote 接口,其實是讓 JVM 得知該接口是須要用於遠程調用的,拋出了 RemoteException 是爲了讓調用 RMI 服務的程序捕獲這個異常。畢竟遠程調用過程當中,什麼奇怪的事情都會發生(好比:斷網)。須要說明的是,RemoteException 是一個「受檢異常」,在調用的時候必須使用 try...catch... 自行處理。負載均衡

1.2 編寫 RMI 接口的實現類

實現以上的 HelloService 是一件很是簡單的事情,但須要注意的是,咱們必須讓實現類繼承 java.rmi.server.UnicastRemoteObject 類,此外,必須提供一個構造器,而且構造器必須拋出 java.rmi.RemoteException 異常。咱們既然使用 JVM 提供的這套 RMI 框架,那麼就必須按照這個要求來實現,不然是沒法成功發佈 RMI 服務的,一句話:咱們得按規矩出牌!框架

<!-- lang: java -->
package demo.zookeeper.rmi.server;

import demo.zookeeper.rmi.common.HelloService;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {

    protected HelloServiceImpl() throws RemoteException {
    }

    @Override
    public String sayHello(String name) throws RemoteException {
        return String.format("Hello %s", name);
    }
}

爲了知足 RMI 框架的要求,咱們確實作了不少額外的工做(繼承了 UnicastRemoteObject 類,拋出了 RemoteException 異常),但這些工做阻止不了咱們發佈 RMI 服務的決心!咱們能夠經過 JVM 提供的 JNDI(Java Naming and Directory Interface,Java 命名與目錄接口)這個 API 輕鬆發佈 RMI 服務。dom

1.3 經過 JNDI 發佈 RMI 服務

發佈 RMI 服務,咱們須要告訴 JNDI 三個基本信息:1. 域名或 IP 地址(host)、2. 端口號(port)、3. 服務名(service),它們構成了 RMI 協議的 URL(或稱爲「RMI 地址」):分佈式

rmi://<host>:<port>/<service>

若是咱們是在本地發佈 RMI 服務,那麼 host 就是「localhost」。此外,RMI 默認的 port 是「1099」,咱們也能夠自行設置 port 的值(只要不與其它端口衝突便可)。service 其實是一個基於同一 host 與 port 下惟一的服務名,咱們不妨使用 Java 徹底類名來表示吧,這樣也比較容易保證 RMI 地址的惟一性。ide

對於咱們的示例而言,RMI 地址爲:

rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl

咱們只需簡單提供一個 main() 方法就能發佈 RMI 服務,就像下面這樣:

<!-- lang: java -->
package demo.zookeeper.rmi.server;

import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;

public class RmiServer {

    public static void main(String[] args) throws Exception {
        int port = 1099;
        String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl";
        LocateRegistry.createRegistry(port);
        Naming.rebind(url, new HelloServiceImpl());
    }
}

須要注意的是,咱們經過 LocateRegistry.createRegistry() 方法在 JNDI 中建立一個註冊表,只需提供一個 RMI 端口號便可。此外,經過 Naming.rebind() 方法綁定 RMI 地址與 RMI 服務實現類,這裏使用了 rebind() 方法,它至關於前後調用 Naming 的 unbind()bind() 方法,只是使用 rebind() 方法來得更加痛快而已,因此咱們選擇了它。

運行這個 main() 方法,RMI 服務就會自動發佈,剩下要作的就是寫一個 RMI 客戶端來調用已發佈的 RMI 服務。

2 調用 RMI 服務

一樣咱們也使用一個 main() 方法來調用 RMI 服務,相比發佈而言,調用會更加簡單,咱們只須要知道兩個東西:1. RMI 請求路徑、2. RMI 接口(必定不須要 RMI 實現類,不然就是本地調用了)。數行代碼就能調用剛纔發佈的 RMI 服務,就像下面這樣:

<!-- lang: java -->
package demo.zookeeper.rmi.client;

import demo.zookeeper.rmi.common.HelloService;
import java.rmi.Naming;

public class RmiClient {

    public static void main(String[] args) throws Exception {
        String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl";
        HelloService helloService = (HelloService) Naming.lookup(url);
        String result = helloService.sayHello("Jack");
        System.out.println(result);
    }
}

當咱們運行以上 main() 方法,在控制檯中看到「Hello Jack」輸出,就代表 RMI 調用成功。

3 RMI 服務的侷限性

可見,藉助 JNDI 這個所謂的命名與目錄服務,咱們成功地發佈並調用了 RMI 服務。實際上,JNDI 就是一個註冊表,服務端將服務對象放入到註冊表中,客戶端從註冊表中獲取服務對象。在服務端咱們發佈了 RMI 服務,並在 JNDI 中進行了註冊,此時就在服務端建立了一個 Skeleton(骨架),當客戶端第一次成功鏈接 JNDI 並獲取遠程服務對象後,立馬就在本地建立了一個 Stub(存根),遠程通訊其實是經過 Skeleton 與 Stub 來完成的,數據是基於 TCP/IP 協議,在「傳輸層」上發送的。毋庸置疑,理論上 RMI 必定比 WebService 要快,畢竟 WebService 是基於 HTTP 的,而 HTTP 所攜帶的數據是經過「應用層」來傳輸的,傳輸層較應用層更爲底層,越底層越快。

既然 RMI 比 WebService 快,使用起來也方便,那麼爲何咱們有時候還要用 WebService 呢?

其實緣由很簡單,WebService 能夠實現跨語言系統之間的調用,而 RMI 只能實現 Java 系統之間的調用。也就是說,RMI 的跨平臺性不如 WebService 好,假如咱們的系統都是用 Java 開發的,那麼固然首選就是 RMI 服務了。

貌似 RMI 確實挺優秀的,除了不能跨平臺之外,還有那些問題呢?

筆者認爲有兩點侷限性:

  1. RMI 使用了 Java 默認的序列化方式,對於性能要求比較高的系統,可能須要使用其它序列化方案來解決(例如:Protobuf)。
  2. RMI 服務在運行時不免會存在出故障,例如,若是 RMI 服務沒法鏈接了,就會致使客戶端沒法響應的現象。

在通常的狀況下,Java 默認的序列化方式確實已經足以知足咱們的要求了,若是性能方面若是不是問題的話,咱們須要解決的其實是第二點,也就是說,讓使系統具有 HA(High Availability,高可用性)。

4 使用 ZooKeeper 提供高可用的 RMI 服務

ZooKeeper 是 Hadoop 的一個子項目,用於解決分佈式系統之間的數據一致性問題。若是讀者尚不瞭解 ZooKeeper 的工做原理與使用方法,能夠經過如下連接來了解:

本文假設讀者已經對 ZooKeeper 有必定了解的前提下,對 RMI 的高可用性問題提供一個簡單的解決方案。

要想解決 RMI 服務的高可用性問題,咱們須要利用 ZooKeeper 充當一個 服務註冊表(Service Registry),讓多個 服務提供者(Service Provider)造成一個集羣,讓 服務消費者(Service Consumer)經過服務註冊表獲取具體的服務訪問地址(也就是 RMI 服務地址)去訪問具體的服務提供者。以下圖所示:

服務註冊表

須要注意的是,服務註冊表並非 Load Balancer(負載均衡器),提供的不是「反向代理」服務,而是「服務註冊」與「心跳檢測」功能。

利用服務註冊表來註冊 RMI 地址,這個很好理解,那麼「心跳檢測」又如何理解呢?說白了就是經過服務中心定時向各個服務提供者發送一個請求(實際上創建的是一個 Socket 長鏈接),若是長期沒有響應,服務中心就認爲該服務提供者已經「掛了」,只會從還「活着」的服務提供者中選出一個作爲當前的服務提供者。

也許讀者會考慮到,服務中心可能會出現單點故障,若是服務註冊表都壞掉了,整個系統也就癱瘓了。看來要想實現這個架構,必須保證服務中心也具有高可用性。

ZooKeeper 正好可以知足咱們上面提到的全部需求。

  1. 使用 ZooKeeper 的臨時性 ZNode 來存放服務提供者的 RMI 地址,一旦與服務提供者的 Session 中斷,會自動清除相應的 ZNode。
  2. 讓服務消費者去監聽這些 ZNode,一旦發現 ZNode 的數據(RMI 地址)有變化,就會從新獲取一份有效數據的拷貝。
  3. ZooKeeper 與生俱來的集羣能力(例如:數據同步與領導選舉特性),能夠確保服務註冊表的高可用性。

4.1 服務提供者

須要編寫一個 ServiceProvider 類,來發布 RMI 服務,並將 RMI 地址註冊到 ZooKeeper 中(實際存放在 ZNode 上)。

<!-- lang: java -->
package demo.zookeeper.rmi.server;

import demo.zookeeper.rmi.common.Constant;
import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceProvider {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);

    // 用於等待 SyncConnected 事件觸發後繼續執行當前線程
    private CountDownLatch latch = new CountDownLatch(1);

    // 發佈 RMI 服務並註冊 RMI 地址到 ZooKeeper 中
    public void publish(Remote remote, String host, int port) {
        String url = publishService(remote, host, port); // 發佈 RMI 服務並返回 RMI 地址
        if (url != null) {
            ZooKeeper zk = connectServer(); // 鏈接 ZooKeeper 服務器並獲取 ZooKeeper 對象
            if (zk != null) {
                createNode(zk, url); // 建立 ZNode 並將 RMI 地址放入 ZNode 上
            }
        }
    }

    // 發佈 RMI 服務
    private String publishService(Remote remote, String host, int port) {
        String url = null;
        try {
            url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName());
            LocateRegistry.createRegistry(port);
            Naming.rebind(url, remote);
            LOGGER.debug("publish rmi service (url: {})", url);
        } catch (RemoteException | MalformedURLException e) {
            LOGGER.error("", e);
        }
        return url;
    }

    // 鏈接 ZooKeeper 服務器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown(); // 喚醒當前正在執行的線程
                    }
                }
            });
            latch.await(); // 使當前線程處於等待狀態
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }

    // 建立 ZNode
    private void createNode(ZooKeeper zk, String url) {
        try {
            byte[] data = url.getBytes();
            String path = zk.create(Constant.ZK_PROVIDER_PATH, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 建立一個臨時性且有序的 ZNode
            LOGGER.debug("create zookeeper node ({} => {})", path, url);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }
}

涉及到的 Constant 常量,見以下代碼:

<!-- lang: java -->
package demo.zookeeper.rmi.common;

public interface Constant {

    String ZK_CONNECTION_STRING = "localhost:2181";
    int ZK_SESSION_TIMEOUT = 5000;
    String ZK_REGISTRY_PATH = "/registry";
    String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";
}

注意:咱們首先須要使用 ZooKeeper 的客戶端工具建立一個持久性 ZNode,名爲「/registry」,該節點是不存聽任何數據的,可以使用以下命令:

create /registry null

4.2 服務消費者

服務消費者須要在建立的時候鏈接 ZooKeeper,同時監聽 /registry 節點的 NodeChildrenChanged 事件,也就是說,一旦該節點的子節點有變化,就須要從新獲取最新的子節點。這裏提到的子節點,就是存放服務提供者發佈的 RMI 地址。須要強調的是,這些子節點都是臨時性的,當服務提供者與 ZooKeeper 服務註冊表的 Session 中斷後,該臨時性節會被自動刪除。

<!-- lang: java -->
package demo.zookeeper.rmi.client;

import demo.zookeeper.rmi.common.Constant;
import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.ConnectException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);

    // 用於等待 SyncConnected 事件觸發後繼續執行當前線程
    private CountDownLatch latch = new CountDownLatch(1);

    // 定義一個 volatile 成員變量,用於保存最新的 RMI 地址(考慮到該變量或許會被其它線程所修改,一旦修改後,該變量的值會影響到全部線程)
    private volatile List<String> urlList = new ArrayList<>(); 

    // 構造器
    public ServiceConsumer() {
        ZooKeeper zk = connectServer(); // 鏈接 ZooKeeper 服務器並獲取 ZooKeeper 對象
        if (zk != null) {
            watchNode(zk); // 觀察 /registry 節點的全部子節點並更新 urlList 成員變量
        }
    }

    // 查找 RMI 服務
    public <T extends Remote> T lookup() {
        T service = null;
        int size = urlList.size();
        if (size > 0) {
            String url;
            if (size == 1) {
                url = urlList.get(0); // 若 urlList 中只有一個元素,則直接獲取該元素
                LOGGER.debug("using only url: {}", url);
            } else {
                url = urlList.get(ThreadLocalRandom.current().nextInt(size)); // 若 urlList 中存在多個元素,則隨機獲取一個元素
                LOGGER.debug("using random url: {}", url);
            }
            service = lookupService(url); // 從 JNDI 中查找 RMI 服務
        }
        return service;
    }

    // 鏈接 ZooKeeper 服務器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown(); // 喚醒當前正在執行的線程
                    }
                }
            });
            latch.await(); // 使當前線程處於等待狀態
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }

    // 觀察 /registry 節點下全部子節點是否有變化
    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk); // 若子節點有變化,則從新調用該方法(爲了獲取最新子節點中的數據)
                    }
                }
            });
            List<String> dataList = new ArrayList<>(); // 用於存放 /registry 全部子節點中的數據
            for (String node : nodeList) {
                byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); // 獲取 /registry 的子節點中的數據
                dataList.add(new String(data));
            }
            LOGGER.debug("node data: {}", dataList);
            urlList = dataList; // 更新最新的 RMI 地址
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }

    // 在 JNDI 中查找 RMI 遠程服務對象
    @SuppressWarnings("unchecked")
    private <T> T lookupService(String url) {
        T remote = null;
        try {
            remote = (T) Naming.lookup(url);
        } catch (NotBoundException | MalformedURLException | RemoteException e) {
            if (e instanceof ConnectException) {
                // 若鏈接中斷,則使用 urlList 中第一個 RMI 地址來查找(這是一種簡單的重試方式,確保不會拋出異常)
                LOGGER.error("ConnectException -> url: {}", url);
                if (urlList.size() != 0) {
                    url = urlList.get(0);
                    return lookupService(url);
                }
            }
            LOGGER.error("", e);
        }
        return remote;
    }
}

4.3 發佈服務

咱們須要調用 ServiceProvider 的 publish() 方法來發布 RMI 服務,發佈成功後也會自動在 ZooKeeper 中註冊 RMI 地址。

<!-- lang: java -->
package demo.zookeeper.rmi.server;

import demo.zookeeper.rmi.common.HelloService;

public class Server {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("please using command: java Server <rmi_host> <rmi_port>");
            System.exit(-1);
        }

        String host = args[0];
        int port = Integer.parseInt(args[1]);

        ServiceProvider provider = new ServiceProvider();

        HelloService helloService = new HelloServiceImpl();
        provider.publish(helloService, host, port);

        Thread.sleep(Long.MAX_VALUE);
    }
}

注意:在運行 Server 類的 main() 方法時,必定要使用命令行參數來指定 host 與 port,例如:

java Server localhost 1099
java Server localhost 2099

以上兩條 Java 命令可在本地運行兩個 Server 程序,固然也能夠同時運行更多的 Server 程序,只要 port 不一樣就行。

4.4 調用服務

經過調用 ServiceConsumer 的 lookup() 方法來查找 RMI 遠程服務對象。咱們使用一個「死循環」來模擬每隔 3 秒鐘調用一次遠程方法。

<!-- lang: java -->
package demo.zookeeper.rmi.client;

import demo.zookeeper.rmi.common.HelloService;

public class Client {

    public static void main(String[] args) throws Exception {
        ServiceConsumer consumer = new ServiceConsumer();

        while (true) {
            HelloService helloService = consumer.lookup();
            String result = helloService.sayHello("Jack");
            System.out.println(result);
            Thread.sleep(3000);
        }
    }
}

4.5 使用方法

根據如下步驟驗證 RMI 服務的高可用性:

  1. 運行兩個 Server 程序,必定要確保 port 是不一樣的。
  2. 運行一個 Client 程序。
  3. 中止其中一個 Server 程序,並觀察 Client 控制檯的變化(中止一個 Server 不會致使 Client 端調用失敗)。
  4. 從新啓動剛纔關閉的 Server 程序,繼續觀察 Client 控制檯變化(新啓動的 Server 會加入候選)。
  5. 前後中止全部的 Server 程序,仍是觀察 Client 控制檯變化(Client 會重試鏈接,屢次鏈接失敗後,自動關閉)。

5 總結

經過本文,咱們嘗試使用 ZooKeeper 實現了一個簡單的 RMI 服務高可用性解決方案,經過 ZooKeeper 註冊全部服務提供者發佈的 RMI 服務,讓服務消費者監聽 ZooKeeper 的 Znode,從而獲取當前可用的 RMI 服務。此方案侷限於 RMI 服務,對於任何形式的服務(好比:WebService),也提供了必定參考。

若是再配合 ZooKeeper 自身的集羣,那纔是一個相對完美的解決方案,對於 ZooKeeper 的集羣,請讀者自行實踐。

因爲筆者水平有限,對於描述有誤之處,還請各位讀者提出建議,並期待更加優秀的解決方案。

相關文章
相關標籤/搜索