在 Java 世界裏,有一種技術能夠實現「跨虛擬機」的調用,它就是 RMI
(Remote Method Invocation,遠程方法調用)。例如,服務A 在 JVM1 中運行,服務B 在 JVM2 中運行,服務A 與 服務B 可相互進行遠程調用,就像調用本地方法同樣,這就是 RMI。在分佈式系統中,咱們使用 RMI 技術可輕鬆將 服務提供者
(Service Provider)與 服務消費者
(Service Consumer)進行分離,充分體現組件之間的弱耦合,系統架構更易於擴展。java
本文先從經過一個最簡單的 RMI 服務與調用示例,讓讀者快速掌握 RMI 的使用方法,而後指出 RMI 的侷限性,最後筆者對此問題提供了一種簡單的解決方案,即便用 ZooKeeper 輕鬆解決 RMI 調用過程當中所涉及的問題。node
下面咱們就從一個最簡單的 RMI 示例開始吧!apache
發佈一個 RMI 服務,咱們只需作三件事情:服務器
RMI 接口實際上仍是一個普通的 Java 接口,只是 RMI 接口必須繼承 java.rmi.Remote
,此外,每一個 RMI 接口的方法必須聲明拋出一個 java.rmi.RemoteException
異常,就像下面這樣:架構
<!-- lang: java --> package demo.zookeeper.rmi.common; import java.rmi.Remote; import java.rmi.RemoteException; public interface HelloService extends Remote { String sayHello(String name) throws RemoteException; }
繼承了 Remote
接口,其實是讓 JVM 得知該接口是須要用於遠程調用的,拋出了 RemoteException
是爲了讓調用 RMI 服務的程序捕獲這個異常。畢竟遠程調用過程當中,什麼奇怪的事情都會發生(好比:斷網)。須要說明的是,RemoteException 是一個「受檢異常」,在調用的時候必須使用 try...catch...
自行處理。負載均衡
實現以上的 HelloService
是一件很是簡單的事情,但須要注意的是,咱們必須讓實現類繼承 java.rmi.server.UnicastRemoteObject
類,此外,必須提供一個構造器,而且構造器必須拋出 java.rmi.RemoteException
異常。咱們既然使用 JVM 提供的這套 RMI 框架,那麼就必須按照這個要求來實現,不然是沒法成功發佈 RMI 服務的,一句話:咱們得按規矩出牌!框架
<!-- lang: java --> package demo.zookeeper.rmi.server; import demo.zookeeper.rmi.common.HelloService; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; public class HelloServiceImpl extends UnicastRemoteObject implements HelloService { protected HelloServiceImpl() throws RemoteException { } @Override public String sayHello(String name) throws RemoteException { return String.format("Hello %s", name); } }
爲了知足 RMI 框架的要求,咱們確實作了不少額外的工做(繼承了 UnicastRemoteObject
類,拋出了 RemoteException
異常),但這些工做阻止不了咱們發佈 RMI 服務的決心!咱們能夠經過 JVM 提供的 JNDI(Java Naming and Directory Interface,Java 命名與目錄接口)這個 API 輕鬆發佈 RMI 服務。dom
發佈 RMI 服務,咱們須要告訴 JNDI 三個基本信息:1. 域名或 IP 地址(host)、2. 端口號(port)、3. 服務名(service),它們構成了 RMI 協議的 URL(或稱爲「RMI 地址」):分佈式
rmi://<host>:<port>/<service>
若是咱們是在本地發佈 RMI 服務,那麼 host
就是「localhost」。此外,RMI 默認的 port
是「1099」,咱們也能夠自行設置 port 的值(只要不與其它端口衝突便可)。service
其實是一個基於同一 host 與 port 下惟一的服務名,咱們不妨使用 Java 徹底類名來表示吧,這樣也比較容易保證 RMI 地址的惟一性。ide
對於咱們的示例而言,RMI 地址爲:
rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl
咱們只需簡單提供一個 main() 方法就能發佈 RMI 服務,就像下面這樣:
<!-- lang: java --> package demo.zookeeper.rmi.server; import java.rmi.Naming; import java.rmi.registry.LocateRegistry; public class RmiServer { public static void main(String[] args) throws Exception { int port = 1099; String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl"; LocateRegistry.createRegistry(port); Naming.rebind(url, new HelloServiceImpl()); } }
須要注意的是,咱們經過 LocateRegistry.createRegistry()
方法在 JNDI 中建立一個註冊表,只需提供一個 RMI 端口號便可。此外,經過 Naming.rebind()
方法綁定 RMI 地址與 RMI 服務實現類,這裏使用了 rebind()
方法,它至關於前後調用 Naming 的 unbind()
與 bind()
方法,只是使用 rebind() 方法來得更加痛快而已,因此咱們選擇了它。
運行這個 main() 方法,RMI 服務就會自動發佈,剩下要作的就是寫一個 RMI 客戶端來調用已發佈的 RMI 服務。
一樣咱們也使用一個 main() 方法來調用 RMI 服務,相比發佈而言,調用會更加簡單,咱們只須要知道兩個東西:1. RMI 請求路徑、2. RMI 接口(必定不須要 RMI 實現類,不然就是本地調用了)。數行代碼就能調用剛纔發佈的 RMI 服務,就像下面這樣:
<!-- lang: java --> package demo.zookeeper.rmi.client; import demo.zookeeper.rmi.common.HelloService; import java.rmi.Naming; public class RmiClient { public static void main(String[] args) throws Exception { String url = "rmi://localhost:1099/demo.zookeeper.rmi.server.HelloServiceImpl"; HelloService helloService = (HelloService) Naming.lookup(url); String result = helloService.sayHello("Jack"); System.out.println(result); } }
當咱們運行以上 main() 方法,在控制檯中看到「Hello Jack」輸出,就代表 RMI 調用成功。
可見,藉助 JNDI 這個所謂的命名與目錄服務,咱們成功地發佈並調用了 RMI 服務。實際上,JNDI 就是一個註冊表,服務端將服務對象放入到註冊表中,客戶端從註冊表中獲取服務對象。在服務端咱們發佈了 RMI 服務,並在 JNDI 中進行了註冊,此時就在服務端建立了一個 Skeleton
(骨架),當客戶端第一次成功鏈接 JNDI 並獲取遠程服務對象後,立馬就在本地建立了一個 Stub
(存根),遠程通訊其實是經過 Skeleton 與 Stub 來完成的,數據是基於 TCP/IP 協議,在「傳輸層」上發送的。毋庸置疑,理論上 RMI 必定比 WebService 要快,畢竟 WebService 是基於 HTTP 的,而 HTTP 所攜帶的數據是經過「應用層」來傳輸的,傳輸層較應用層更爲底層,越底層越快。
既然 RMI 比 WebService 快,使用起來也方便,那麼爲何咱們有時候還要用 WebService 呢?
其實緣由很簡單,WebService 能夠實現跨語言系統之間的調用,而 RMI 只能實現 Java 系統之間的調用。也就是說,RMI 的跨平臺性不如 WebService 好,假如咱們的系統都是用 Java 開發的,那麼固然首選就是 RMI 服務了。
貌似 RMI 確實挺優秀的,除了不能跨平臺之外,還有那些問題呢?
筆者認爲有兩點侷限性:
在通常的狀況下,Java 默認的序列化方式確實已經足以知足咱們的要求了,若是性能方面若是不是問題的話,咱們須要解決的其實是第二點,也就是說,讓使系統具有 HA(High Availability,高可用性)。
ZooKeeper 是 Hadoop 的一個子項目,用於解決分佈式系統之間的數據一致性問題。若是讀者尚不瞭解 ZooKeeper 的工做原理與使用方法,能夠經過如下連接來了解:
本文假設讀者已經對 ZooKeeper 有必定了解的前提下,對 RMI 的高可用性問題提供一個簡單的解決方案。
要想解決 RMI 服務的高可用性問題,咱們須要利用 ZooKeeper 充當一個 服務註冊表
(Service Registry),讓多個 服務提供者
(Service Provider)造成一個集羣,讓 服務消費者
(Service Consumer)經過服務註冊表獲取具體的服務訪問地址(也就是 RMI 服務地址)去訪問具體的服務提供者。以下圖所示:
須要注意的是,服務註冊表並非 Load Balancer(負載均衡器),提供的不是「反向代理」服務,而是「服務註冊」與「心跳檢測」功能。
利用服務註冊表來註冊 RMI 地址,這個很好理解,那麼「心跳檢測」又如何理解呢?說白了就是經過服務中心定時向各個服務提供者發送一個請求(實際上創建的是一個 Socket 長鏈接),若是長期沒有響應,服務中心就認爲該服務提供者已經「掛了」,只會從還「活着」的服務提供者中選出一個作爲當前的服務提供者。
也許讀者會考慮到,服務中心可能會出現單點故障,若是服務註冊表都壞掉了,整個系統也就癱瘓了。看來要想實現這個架構,必須保證服務中心也具有高可用性。
ZooKeeper 正好可以知足咱們上面提到的全部需求。
須要編寫一個 ServiceProvider
類,來發布 RMI 服務,並將 RMI 地址註冊到 ZooKeeper 中(實際存放在 ZNode 上)。
<!-- lang: java --> package demo.zookeeper.rmi.server; import demo.zookeeper.rmi.common.Constant; import java.io.IOException; import java.net.MalformedURLException; import java.rmi.Naming; import java.rmi.Remote; import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServiceProvider { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class); // 用於等待 SyncConnected 事件觸發後繼續執行當前線程 private CountDownLatch latch = new CountDownLatch(1); // 發佈 RMI 服務並註冊 RMI 地址到 ZooKeeper 中 public void publish(Remote remote, String host, int port) { String url = publishService(remote, host, port); // 發佈 RMI 服務並返回 RMI 地址 if (url != null) { ZooKeeper zk = connectServer(); // 鏈接 ZooKeeper 服務器並獲取 ZooKeeper 對象 if (zk != null) { createNode(zk, url); // 建立 ZNode 並將 RMI 地址放入 ZNode 上 } } } // 發佈 RMI 服務 private String publishService(Remote remote, String host, int port) { String url = null; try { url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName()); LocateRegistry.createRegistry(port); Naming.rebind(url, remote); LOGGER.debug("publish rmi service (url: {})", url); } catch (RemoteException | MalformedURLException e) { LOGGER.error("", e); } return url; } // 鏈接 ZooKeeper 服務器 private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); // 喚醒當前正在執行的線程 } } }); latch.await(); // 使當前線程處於等待狀態 } catch (IOException | InterruptedException e) { LOGGER.error("", e); } return zk; } // 建立 ZNode private void createNode(ZooKeeper zk, String url) { try { byte[] data = url.getBytes(); String path = zk.create(Constant.ZK_PROVIDER_PATH, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 建立一個臨時性且有序的 ZNode LOGGER.debug("create zookeeper node ({} => {})", path, url); } catch (KeeperException | InterruptedException e) { LOGGER.error("", e); } } }
涉及到的 Constant
常量,見以下代碼:
<!-- lang: java --> package demo.zookeeper.rmi.common; public interface Constant { String ZK_CONNECTION_STRING = "localhost:2181"; int ZK_SESSION_TIMEOUT = 5000; String ZK_REGISTRY_PATH = "/registry"; String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider"; }
注意:咱們首先須要使用 ZooKeeper 的客戶端工具建立一個持久性 ZNode,名爲「/registry」,該節點是不存聽任何數據的,可以使用以下命令:
create /registry null
服務消費者須要在建立的時候鏈接 ZooKeeper,同時監聽 /registry
節點的 NodeChildrenChanged
事件,也就是說,一旦該節點的子節點有變化,就須要從新獲取最新的子節點。這裏提到的子節點,就是存放服務提供者發佈的 RMI 地址。須要強調的是,這些子節點都是臨時性的,當服務提供者與 ZooKeeper 服務註冊表的 Session 中斷後,該臨時性節會被自動刪除。
<!-- lang: java --> package demo.zookeeper.rmi.client; import demo.zookeeper.rmi.common.Constant; import java.io.IOException; import java.net.MalformedURLException; import java.rmi.ConnectException; import java.rmi.Naming; import java.rmi.NotBoundException; import java.rmi.Remote; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServiceConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class); // 用於等待 SyncConnected 事件觸發後繼續執行當前線程 private CountDownLatch latch = new CountDownLatch(1); // 定義一個 volatile 成員變量,用於保存最新的 RMI 地址(考慮到該變量或許會被其它線程所修改,一旦修改後,該變量的值會影響到全部線程) private volatile List<String> urlList = new ArrayList<>(); // 構造器 public ServiceConsumer() { ZooKeeper zk = connectServer(); // 鏈接 ZooKeeper 服務器並獲取 ZooKeeper 對象 if (zk != null) { watchNode(zk); // 觀察 /registry 節點的全部子節點並更新 urlList 成員變量 } } // 查找 RMI 服務 public <T extends Remote> T lookup() { T service = null; int size = urlList.size(); if (size > 0) { String url; if (size == 1) { url = urlList.get(0); // 若 urlList 中只有一個元素,則直接獲取該元素 LOGGER.debug("using only url: {}", url); } else { url = urlList.get(ThreadLocalRandom.current().nextInt(size)); // 若 urlList 中存在多個元素,則隨機獲取一個元素 LOGGER.debug("using random url: {}", url); } service = lookupService(url); // 從 JNDI 中查找 RMI 服務 } return service; } // 鏈接 ZooKeeper 服務器 private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); // 喚醒當前正在執行的線程 } } }); latch.await(); // 使當前線程處於等待狀態 } catch (IOException | InterruptedException e) { LOGGER.error("", e); } return zk; } // 觀察 /registry 節點下全部子節點是否有變化 private void watchNode(final ZooKeeper zk) { try { List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk); // 若子節點有變化,則從新調用該方法(爲了獲取最新子節點中的數據) } } }); List<String> dataList = new ArrayList<>(); // 用於存放 /registry 全部子節點中的數據 for (String node : nodeList) { byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); // 獲取 /registry 的子節點中的數據 dataList.add(new String(data)); } LOGGER.debug("node data: {}", dataList); urlList = dataList; // 更新最新的 RMI 地址 } catch (KeeperException | InterruptedException e) { LOGGER.error("", e); } } // 在 JNDI 中查找 RMI 遠程服務對象 @SuppressWarnings("unchecked") private <T> T lookupService(String url) { T remote = null; try { remote = (T) Naming.lookup(url); } catch (NotBoundException | MalformedURLException | RemoteException e) { if (e instanceof ConnectException) { // 若鏈接中斷,則使用 urlList 中第一個 RMI 地址來查找(這是一種簡單的重試方式,確保不會拋出異常) LOGGER.error("ConnectException -> url: {}", url); if (urlList.size() != 0) { url = urlList.get(0); return lookupService(url); } } LOGGER.error("", e); } return remote; } }
咱們須要調用 ServiceProvider 的 publish() 方法來發布 RMI 服務,發佈成功後也會自動在 ZooKeeper 中註冊 RMI 地址。
<!-- lang: java --> package demo.zookeeper.rmi.server; import demo.zookeeper.rmi.common.HelloService; public class Server { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("please using command: java Server <rmi_host> <rmi_port>"); System.exit(-1); } String host = args[0]; int port = Integer.parseInt(args[1]); ServiceProvider provider = new ServiceProvider(); HelloService helloService = new HelloServiceImpl(); provider.publish(helloService, host, port); Thread.sleep(Long.MAX_VALUE); } }
注意:在運行 Server 類的 main() 方法時,必定要使用命令行參數來指定 host 與 port,例如:
java Server localhost 1099 java Server localhost 2099
以上兩條 Java 命令可在本地運行兩個 Server 程序,固然也能夠同時運行更多的 Server 程序,只要 port 不一樣就行。
經過調用 ServiceConsumer 的 lookup() 方法來查找 RMI 遠程服務對象。咱們使用一個「死循環」來模擬每隔 3 秒鐘調用一次遠程方法。
<!-- lang: java --> package demo.zookeeper.rmi.client; import demo.zookeeper.rmi.common.HelloService; public class Client { public static void main(String[] args) throws Exception { ServiceConsumer consumer = new ServiceConsumer(); while (true) { HelloService helloService = consumer.lookup(); String result = helloService.sayHello("Jack"); System.out.println(result); Thread.sleep(3000); } } }
根據如下步驟驗證 RMI 服務的高可用性:
經過本文,咱們嘗試使用 ZooKeeper 實現了一個簡單的 RMI 服務高可用性解決方案,經過 ZooKeeper 註冊全部服務提供者發佈的 RMI 服務,讓服務消費者監聽 ZooKeeper 的 Znode,從而獲取當前可用的 RMI 服務。此方案侷限於 RMI 服務,對於任何形式的服務(好比:WebService),也提供了必定參考。
若是再配合 ZooKeeper 自身的集羣,那纔是一個相對完美的解決方案,對於 ZooKeeper 的集羣,請讀者自行實踐。
因爲筆者水平有限,對於描述有誤之處,還請各位讀者提出建議,並期待更加優秀的解決方案。