1、帶版本控制的註冊中心RPC框架java
server端linux
//註冊中心接口 public interface IRegisterCenter { public void register(String serviceName,String serviceAddress); }
//實現類 package zoorpc.zk; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class RegisterCenter implements IRegisterCenter { private CuratorFramework curatorFramework; public RegisterCenter() { curatorFramework = CuratorFrameworkFactory.builder().connectString(ZooConfig.CONNECTION_STR) .connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); curatorFramework.start(); } @Override public void register(String serviceName, String serviceAddress) { // 註冊相應服務 String Servicepath = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName; try { //判斷服務/registrys/product-service/是否存在,不然建立 if (curatorFramework.checkExists().forPath(Servicepath) == null) { curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath(Servicepath,"0".getBytes()); } //建立服務iP節點 String adressPath = Servicepath+"/"+serviceAddress; String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .forPath(adressPath,"0".getBytes()); System.out.println("服務節點建立成功:"+rsNode); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
//常量類 package zoorpc.zk; public class ZooConfig { final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181"; final static String ZK_REGISTER_PATH = "/registrys"; }
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface RpcAnnotation { /** * 對外發布的接口地址 * @return */ Class<?> value(); //多版本功能擴展 String version() default ""; }
//服務接口 public interface IHelloWorld { public String sayHello(String msg); }
//服務接口實現類1,不帶版本控制 package zoorpc; import anno.RpcAnnotation; @RpcAnnotation(IHelloWorld.class) public class HelloWorldServiceImpl implements IHelloWorld { @Override public String sayHello(String msg) { // TODO Auto-generated method stub return "HelloWorld,8080"+msg; } }
//服務接口實現類2,帶版本控制 import anno.RpcAnnotation; @RpcAnnotation(value = IHelloWorld.class,version = "2.0") public class HelloWorldServiceImpl2 implements IHelloWorld { @Override public String sayHello(String msg) { // TODO Auto-generated method stub return "HelloWorld2,8081"+msg; } }
//服務發佈類 package zoorpc; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import anno.RpcAnnotation; import zoorpc.zk.IRegisterCenter; public class RpcServer { private static final ExecutorService executorService = Executors.newCachedThreadPool(); private IRegisterCenter registerCenter;//註冊中心 private String serviceAddress;//服務發佈地址 //存放服務名稱和服務對象之間的關係 Map<String,Object> handlerMap = new HashMap<String,Object>(); public RpcServer(IRegisterCenter registerCenter, String serviceAddress) { this.registerCenter = registerCenter; this.serviceAddress = serviceAddress; } //綁定服務名稱和服務對象 public void bind(Object...services){ for(Object service :services ){ RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class); String serviceName = rpcAnnotation.value().getName(); //添加版本號控制 String version = rpcAnnotation.version(); if(version!=null && !version.equals("")){ serviceName = serviceName+"-"+version; } //添加版本號控制 handlerMap.put(serviceName, service);//綁定接口服務名稱及對應的服務 } } //發佈服務 public void publisher(){ ServerSocket serverSocket = null; try { String[] split = serviceAddress.split(":"); serverSocket = new ServerSocket(Integer.parseInt(split[1]));//啓動一個服務監聽 for(String interfaceName : handlerMap.keySet()){ registerCenter.register(interfaceName, serviceAddress); System.out.println("服務註冊成功:"+interfaceName+"->"+serviceAddress); } while(true){ Socket socket = serverSocket.accept(); executorService.execute(new ProcessorHandler(socket,handlerMap)); } } catch (Exception e) { e.printStackTrace(); }finally { if(serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
package zoorpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; import java.util.Map; public class ProcessorHandler implements Runnable { private Socket socket; private Map<String,Object> handlerMap; public ProcessorHandler(Socket socket, Map<String,Object> handlerMap) { this.socket = socket; this.handlerMap = handlerMap; } @Override public void run() { // TODO 處理請求 ObjectInputStream objectInputStream =null; ObjectOutputStream objectOutputStream =null; try { objectInputStream = new ObjectInputStream(socket.getInputStream()); RpcRequest request = (RpcRequest) objectInputStream.readObject(); Object result = invoke(request); objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(result); objectOutputStream.flush(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(objectInputStream!=null){ try { objectInputStream.close(); objectOutputStream.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private Object invoke(RpcRequest request) throws Exception, IllegalArgumentException, InvocationTargetException{ Object[] args = request.getParameters(); Class<?> [] types = new Class[args.length]; for (int i = 0; i < types.length; i++) { types[i] = args[i].getClass(); } //添加版本號控制 String version = request.getVersion(); String serviceName =request.getClassName(); if(version!=null && !version.equals("")){ serviceName =request.getClassName()+"-"+version; } //添加版本號控制 //從handlerMap中,根據客戶端額請求地址,去拿到響應的服務,經過反射發起調用 //Object service = handlerMap.get(request.getClassName()); Object service = handlerMap.get(serviceName);//添加版本號控制 Method method = service.getClass().getMethod(request.getMethodName(), types); return method.invoke(service, args); } }
//傳輸類 package zoorpc; import java.io.Serializable; /** * 傳輸對象 * @author admin * */ public class RpcRequest implements Serializable{ private static final long serialVersionUID = 6351477854838485391L; private String className; private String methodName; private Object[] parameters; private String version; public String getVersion() { return version; } public RpcRequest(String className, String methodName, Object[] parameters, String version) { super(); this.className = className; this.methodName = methodName; this.parameters = parameters; this.version = version; } public void setVersion(String version) { this.version = version; } public RpcRequest(String className, String methodName, Object[] parameters) { super(); this.className = className; this.methodName = methodName; this.parameters = parameters; } public RpcRequest() { super(); // TODO Auto-generated constructor stub } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } }
//發佈服務 package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class ServerDemo { public static void main(String[] args) throws IOException { IHelloWorld service = new HelloWorldServiceImpl(); IHelloWorld service2 = new HelloWorldServiceImpl2(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8080"); server.bind(service,service2); server.publisher(); System.in.read(); } }
客戶端apache
package zoorpc.zk; public interface IDiscovery { /** * 根據請求的服務地址,獲取到服務的調用地址 * @param serviceName * @return */ public String Discovery(String serviceName); }
package zoorpc.zk; import java.util.ArrayList; import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import zoorpc.loadbalance.ILoadBalance; import zoorpc.loadbalance.RandomLoadBalance; public class Discovery implements IDiscovery { private CuratorFramework curatorFramework; List<String> repos = new ArrayList<>(); private String adresses; public Discovery(String adresses) { this.adresses = adresses; curatorFramework = CuratorFrameworkFactory.builder().connectString(adresses) .connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); curatorFramework.start(); } @Override public String Discovery(String serviceName) { String path = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName; ILoadBalance randomLoadBalance = null; try { repos = curatorFramework.getChildren().forPath(path); //動態發現節點的變化 registerWatcher(path); //發現多個服務,作負載均衡 randomLoadBalance = new RandomLoadBalance(); } catch (Exception e) { e.printStackTrace(); } return randomLoadBalance.selectHost(repos);//返回調用的服務地址 } private void registerWatcher(final String path) throws Exception{ PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true); PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { repos = curatorFramework.getChildren().forPath(path); } }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); pathChildrenCache.start(); } }
package zoorpc.zk; public class ZooConfig { public final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181"; public final static String ZK_REGISTER_PATH = "/registrys"; }
package zoorpc; public interface IHelloWorld { public String sayHello(String msg); }
package zoorpc; import java.io.Serializable; /** * 傳輸對象 * @author admin * */ public class RpcRequest implements Serializable{ private static final long serialVersionUID = 6351477854838485391L; private String className; private String methodName; private Object[] parameters; private String version; public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } public RpcRequest(String className, String methodName, Object[] parameters) { super(); this.className = className; this.methodName = methodName; this.parameters = parameters; } public RpcRequest() { super(); // TODO Auto-generated constructor stub } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } }
package zoorpc; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import zoorpc.zk.IDiscovery; public class RpcClientProxy { private IDiscovery discovery; public RpcClientProxy(IDiscovery discovery) { this.discovery = discovery; } public <T> T clientProxy(final Class<T> interfaceCls,String version){ return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class[] {interfaceCls}, new RemoteInvocationHandler(version,discovery)); } }
package zoorpc; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.net.Socket; import zoorpc.zk.IDiscovery; public class RemoteInvocationHandler implements InvocationHandler { private String version;//添加版本號控制 private IDiscovery discovery; public RemoteInvocationHandler(String version,IDiscovery discovery) { this.discovery = discovery; this.version = version; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TODO Auto-generated method stub RpcRequest request = new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setVersion(version); String serviceAddress = discovery.Discovery(request.getClassName()); TcpTransport trans = new TcpTransport(serviceAddress); return trans.send(request); } }
package zoorpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; public class TcpTransport { private String serviceAddress; public TcpTransport(String serviceAddress) { super(); this.serviceAddress = serviceAddress; } Socket newSocket(){ System.out.println("建立一個鏈接"); Socket socket = null; try { String[] split = serviceAddress.split(":"); socket = new Socket(split[0],Integer.parseInt(split[1])); return socket; } catch (Exception e) { // TODO: handle exception throw new RuntimeException("鏈接創建失敗!"); } } public Object send(RpcRequest request){ Socket socket = null; ObjectOutputStream objectOutputStream = null; ObjectInputStream objectInputStream = null; try { socket = newSocket(); objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(request); objectOutputStream.flush(); objectInputStream = new ObjectInputStream(socket.getInputStream()); Object readObject = objectInputStream.readObject(); return readObject; } catch (Exception e) { // TODO: handle exception e.printStackTrace(); throw new RuntimeException("鏈接創建失敗!"); }finally { if(socket!=null){ try { socket.close(); objectOutputStream.close(); objectInputStream.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
package zoorpc.loadbalance; import java.util.List; public interface ILoadBalance { public String selectHost(List<String> repos); }
package zoorpc.loadbalance; import java.util.List; public abstract class LoadBalance implements ILoadBalance{ @Override public String selectHost(List<String> repos) { if(repos.size()<1){ return null; }else if(repos.size() ==1){ return repos.get(0); }else{ return doSelect(repos); } } protected abstract String doSelect(List<String> repos); }
package zoorpc.loadbalance; import java.util.List; import java.util.Random; public class RandomLoadBalance extends LoadBalance { @Override protected String doSelect(List<String> repos) { int len = repos.size(); Random random = new Random(); return repos.get(random.nextInt(len)); } }
package zoorpc; import zoorpc.zk.Discovery; import zoorpc.zk.IDiscovery; import zoorpc.zk.ZooConfig; public class ClientDemo { public static void main(String[] args) { IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR); RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
//IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"");結果:HelloWorld,8080lf IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"2.0");結果:HelloWorld2,8081lf System.out.println(hello.sayHello("lf")); } }
2、模擬集羣負載均衡
新增發佈類:框架
package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class LBServerDemo1 { //模擬集羣 public static void main(String[] args) throws IOException { IHelloWorld service = new HelloWorldServiceImpl(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8080"); server.bind(service); server.publisher(); System.in.read(); } }
package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class LBServerDemo2 { //模擬集羣 public static void main(String[] args) throws IOException { IHelloWorld service = new HelloWorldServiceImpl2(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8081"); server.bind(service); server.publisher(); System.in.read(); } }
修改示例2類的註解dom
package zoorpc;
import anno.RpcAnnotation;
//@RpcAnnotation(value = IHelloWorld.class,version = "2.0")
@RpcAnnotation(value = IHelloWorld.class)
public class HelloWorldServiceImpl2 implements IHelloWorld {
@Override
public String sayHello(String msg) {
// TODO Auto-generated method stub
return "HelloWorld2,8081"+msg;
}
}
運行發佈類1,2socket
linux 下查看節點顯示:ide
[zk: localhost:2181(CONNECTED) 13] ls /registrys/zoorpc.IHelloWorld
[127.0.0.1:8081, 127.0.0.1:8080]
[zk: localhost:2181(CONNECTED) 14]ui
客戶端this
package zoorpc; import zoorpc.zk.Discovery; import zoorpc.zk.IDiscovery; import zoorpc.zk.ZooConfig; public class LBClientDemo { public static void main(String[] args) throws InterruptedException { IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR); RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery); for (int i = 0; i < 10; i++) { IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,null); System.out.println(hello.sayHello("lf")); Thread.sleep(1000); } } }
運行結果:
建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld2,8081lf
實現原理圖:
4、集羣擴容
1、停機擴容,修改配置
2、逐臺擴容,一臺臺重啓