前段時間本身搞了個 RPC 的輪子,不過相對來講比較簡單,最近在原來的基礎上加以改造,使用 Zookeeper 實現了 provider 自動尋址以及消費者的簡單負載均衡,對以前的感興趣的請轉 造個輪子---RPC動手實現。java
在原來使用 TCP 直連的基礎上實現基於 Zookeeper 的服務的註冊與發現,改造後的依賴關係是這樣的。git
話很少說,咱們來看下如何發佈和引用服務。 服務端咱們將服務的 IP 和端口號基礎信息註冊到 Zookeeper 上。github
/** * @author wuhaifei 2019-08-02 */
public class ZookeeperServerMainTest {
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer)
.setHost("172.16.30.114")
.setPort(5201)
.setRef(HelloServiceImpl.class.getName())
.setRegister(true)
.setInterfaceId(HelloService.class.getName());
RegistryConfig registryConfig = new RegistryConfig().setAddress("127.0.0.1:2181")
.setSubscribe(true)
.setRegister(true)
.setProtocol(RpcConstants.ZOOKEEPER);
ServerProxy serverProxy = new ServerProxy(new NettyServerAbstract())
.setServerConfig(serverConfig)
.setRegistryConfig(registryConfig);
try {
serverProxy.export();
while (true){
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
經過 Zookeeper 引用註冊在其上的服務。算法
/** * @author wuhaifei 2019-08-02 */
public class ZookeeperClientMainTest {
public static void main(String[] args) {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProtocol(RpcConstants.ZOOKEEPER)
.setTimeoutMillis(100000)
.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer);
RegistryConfig registryConfig = new RegistryConfig()
.setAddress("127.0.0.1:2181")
.setProtocol(RpcConstants.ZOOKEEPER)
.setRegister(true)
.setSubscribe(true);
ClientProxy<HelloService> clientProxy = new ClientProxy(clientConfig, new NettyClientAbstract(), HelloService.class)
.setRegistryConfig(registryConfig);
for (int i = 0; i < 10; i++) {
HelloService helloService = clientProxy.refer();
System.out.println(helloService.sayHi());
}
}
}
複製代碼
運行結果就不一一貼出了,感興趣的小夥伴能夠查看樓主傳到 github 上的源碼這是一個rpc的輪子。app
樓主在原來代碼的基礎上添加了 Zookeeper 的註冊的邏輯,原來的代碼相關介紹請轉 造個輪子---RPC動手實現。負載均衡
/** * 發佈服務 */
public void export() {
try {
Object serviceBean = Class.forName((String) serverConfig.getRef()).newInstance();
RpcInvokerHandler.serviceMap.put(serverConfig.getInterfaceId(), serviceBean);
this.childServer.start(this.getServerConfig());
if (serverConfig.isRegister()) {
// 將服務註冊到zookeeper
register();
}
} catch (Exception e) {
// 取消服務註冊
unregister();
if (e instanceof ChildRpcRuntimeException) {
throw (ChildRpcRuntimeException) e;
} else {
throw new ChildRpcRuntimeException("Build provider proxy error!", e);
}
}
exported = true;
}
/** * 註冊服務 */
protected void register() {
if (serverConfig.isRegister()) {
Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
registry.init();
registry.start();
try {
registry.register(this.serverConfig);
} catch (ChildRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
String appName = serverConfig.getInterfaceId();
LOGGER.info(appName, "Catch exception when register to registry: "
+ registryConfig.getId(), e);
}
}
}
複製代碼
/** * 服務的引用. */
public T refer() {
try {
if (config.isSubscribe()) {
subscribe();
}
childClient.init(this.clientConfig);
return invoke();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/** * 訂閱zk的服務列表. */
private void subscribe() {
Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
registry.init();
registry.start();
this.clientConfig = (ClientConfig) config;
List<String> providerList = registry.subscribe(this.clientConfig);
if (null == providerList) {
throw new ChildRpcRuntimeException("無可用服務供訂閱!");
}
// 使用隨機算法,隨機選擇一個provider
int index = ThreadLocalRandom.current().nextInt(providerList.size());
String providerInfo = providerList.get(index);
String[] providerArr = providerInfo.split(":");
clientConfig = (ClientConfig) this.config;
clientConfig.setHost(providerArr[0]);
clientConfig.setPort(Integer.parseInt(providerArr[1]));
}
複製代碼
上面代碼比較簡單,就是在原來直連的基礎上添加 zk 的操做,在發佈服務的時候將 provider 的 IP 和端口號基礎信息註冊到 zk 上,在引用服務的時候使用隨機算法從 zk 上選取可用的 provider 信息,而後進行 invoke 調用。框架
RPC(Remote procedure call)底層邏輯相對來講比較簡單,樓主在實現的過程當中參考了其餘 RPC 框架的部分代碼,受益不淺~dom