zookeeper(5)--基於watcher原理實現帶註冊中心的RPC框架

1、帶版本控制的註冊中心RPC框架java

  server端linux

  

//註冊中心接口
public interface IRegisterCenter {
	
	public void register(String serviceName,String serviceAddress);
}

 

//實現類
package zoorpc.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class RegisterCenter implements IRegisterCenter {
	
	private CuratorFramework curatorFramework;
	
	
	public RegisterCenter() {
		curatorFramework = CuratorFrameworkFactory.builder().connectString(ZooConfig.CONNECTION_STR)
				.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		curatorFramework.start();
	}
	@Override
	public void register(String serviceName, String serviceAddress) {
		
		// 註冊相應服務
		String Servicepath = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
		try {
			
			//判斷服務/registrys/product-service/是否存在,不然建立
			if (curatorFramework.checkExists().forPath(Servicepath) == null) {
				curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
				.forPath(Servicepath,"0".getBytes());
			}
			//建立服務iP節點
			String adressPath = Servicepath+"/"+serviceAddress;
			String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
			.forPath(adressPath,"0".getBytes());
			System.out.println("服務節點建立成功:"+rsNode);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

  

//常量類
package zoorpc.zk;

public class ZooConfig {
	
	final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181";
	final static String ZK_REGISTER_PATH = "/registrys";
}

  

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation {
	
	/**
	 * 對外發布的接口地址
	 * @return
	 */
	Class<?> value();
	
	//多版本功能擴展
	String version() default "";
}

  

//服務接口
public interface IHelloWorld {
	
	public String sayHello(String msg);
}

  

//服務接口實現類1,不帶版本控制
package zoorpc;

import anno.RpcAnnotation;

@RpcAnnotation(IHelloWorld.class)
public class HelloWorldServiceImpl implements IHelloWorld {

	@Override
	public String sayHello(String msg) {
		// TODO Auto-generated method stub
		return "HelloWorld,8080"+msg;
	}

}

  

//服務接口實現類2,帶版本控制
import anno.RpcAnnotation; @RpcAnnotation(value = IHelloWorld.class,version = "2.0") public class HelloWorldServiceImpl2 implements IHelloWorld { @Override public String sayHello(String msg) { // TODO Auto-generated method stub
        return "HelloWorld2,8081"+msg; } }
//服務發佈類
package zoorpc;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import anno.RpcAnnotation;
import zoorpc.zk.IRegisterCenter;

public class RpcServer {
	
	private static final ExecutorService  executorService = Executors.newCachedThreadPool();
	
	private IRegisterCenter registerCenter;//註冊中心
	private String serviceAddress;//服務發佈地址
	//存放服務名稱和服務對象之間的關係
	Map<String,Object> handlerMap = new HashMap<String,Object>();
	
	public RpcServer(IRegisterCenter registerCenter, String serviceAddress) {
		this.registerCenter = registerCenter;
		this.serviceAddress = serviceAddress;
	}
	//綁定服務名稱和服務對象
	public void bind(Object...services){
		for(Object service :services ){
			RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class);
			String serviceName = rpcAnnotation.value().getName();
			//添加版本號控制
			String version = rpcAnnotation.version();
			if(version!=null && !version.equals("")){
				serviceName = serviceName+"-"+version;
			}
			//添加版本號控制
			handlerMap.put(serviceName, service);//綁定接口服務名稱及對應的服務
		}
	}
	//發佈服務
	public void publisher(){
		ServerSocket serverSocket = null;
		try {
			String[] split = serviceAddress.split(":");
			serverSocket = new ServerSocket(Integer.parseInt(split[1]));//啓動一個服務監聽
			for(String interfaceName : handlerMap.keySet()){
				registerCenter.register(interfaceName, serviceAddress);
				System.out.println("服務註冊成功:"+interfaceName+"->"+serviceAddress);
			}
			while(true){
				Socket socket = serverSocket.accept();
				executorService.execute(new ProcessorHandler(socket,handlerMap));
			}
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			if(serverSocket!=null){
				try {
					serverSocket.close();
					
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
		
	}
}

  

package zoorpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Map;

public class ProcessorHandler implements Runnable {
	
	private Socket socket;
	private Map<String,Object> handlerMap;
	
	
	public ProcessorHandler(Socket socket, Map<String,Object> handlerMap) {
		this.socket = socket;
		this.handlerMap = handlerMap;
	}


	@Override
	public void run() {
		// TODO 處理請求
		ObjectInputStream objectInputStream =null;
		ObjectOutputStream objectOutputStream =null;
		try {
			objectInputStream = new ObjectInputStream(socket.getInputStream());
			RpcRequest request = (RpcRequest) objectInputStream.readObject();
			Object result = invoke(request);
			objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
			objectOutputStream.writeObject(result);
			objectOutputStream.flush();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally{
			if(objectInputStream!=null){
				try {
					objectInputStream.close();
					objectOutputStream.close();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	private Object invoke(RpcRequest request) throws Exception, IllegalArgumentException, InvocationTargetException{
		Object[] args = request.getParameters();
		Class<?> [] types = new Class[args.length];
		for (int i = 0; i < types.length; i++) {
			types[i] = args[i].getClass();
		}
		//添加版本號控制
		String version = request.getVersion();
		String serviceName =request.getClassName();
		if(version!=null && !version.equals("")){
			serviceName =request.getClassName()+"-"+version;
		}
		//添加版本號控制
		//從handlerMap中,根據客戶端額請求地址,去拿到響應的服務,經過反射發起調用
		//Object service = handlerMap.get(request.getClassName());
		Object service = handlerMap.get(serviceName);//添加版本號控制
		Method method = service.getClass().getMethod(request.getMethodName(), types);
		return method.invoke(service, args);
	}
}

  

//傳輸類
package zoorpc;

import java.io.Serializable;
/**
 * 傳輸對象
 * @author admin
 *
 */
public class RpcRequest implements Serializable{
	
	private static final long serialVersionUID = 6351477854838485391L;
	private String className;
	private String methodName;
	private Object[] parameters;
	private String version;
	
	public String getVersion() {
		return version;
	}
	
	public RpcRequest(String className, String methodName, Object[] parameters, String version) {
		super();
		this.className = className;
		this.methodName = methodName;
		this.parameters = parameters;
		this.version = version;
	}

	public void setVersion(String version) {
		this.version = version;
	}

	public RpcRequest(String className, String methodName, Object[] parameters) {
		super();
		this.className = className;
		this.methodName = methodName;
		this.parameters = parameters;
	}
	
	public RpcRequest() {
		super();
		// TODO Auto-generated constructor stub
	}

	public String getClassName() {
		return className;
	}
	public void setClassName(String className) {
		this.className = className;
	}
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Object[] getParameters() {
		return parameters;
	}
	public void setParameters(Object[] parameters) {
		this.parameters = parameters;
	}
	
}

  

//發佈服務
package zoorpc;

import java.io.IOException;

import zoorpc.zk.IRegisterCenter;
import zoorpc.zk.RegisterCenter;

public class ServerDemo {
	
	public static void main(String[] args) throws IOException {
		IHelloWorld service = new HelloWorldServiceImpl();
		IHelloWorld service2 = new HelloWorldServiceImpl2();
		IRegisterCenter registerCenter = new RegisterCenter();
		RpcServer server  = new RpcServer(registerCenter,"127.0.0.1:8080");
		server.bind(service,service2);
		server.publisher();
		System.in.read();
	}
}

  客戶端apache

package zoorpc.zk; public interface IDiscovery { /** * 根據請求的服務地址,獲取到服務的調用地址 * @param serviceName * @return */
    public String Discovery(String serviceName); }
package zoorpc.zk;

import java.util.ArrayList;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import zoorpc.loadbalance.ILoadBalance;
import zoorpc.loadbalance.RandomLoadBalance;

public class Discovery implements IDiscovery {
	
private CuratorFramework curatorFramework;
	
	List<String> repos = new ArrayList<>();
	private String adresses;
	public Discovery(String adresses) {
		this.adresses = adresses;
		curatorFramework = CuratorFrameworkFactory.builder().connectString(adresses)
				.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		curatorFramework.start();
	}
	@Override
	public String Discovery(String serviceName) {
		String path = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
		ILoadBalance randomLoadBalance = null;
		try {
			repos = curatorFramework.getChildren().forPath(path);
			//動態發現節點的變化
			registerWatcher(path);
			//發現多個服務,作負載均衡
			randomLoadBalance = new RandomLoadBalance();
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	
		return randomLoadBalance.selectHost(repos);//返回調用的服務地址
	}

	private void registerWatcher(final String path) throws Exception{
		PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
		PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
			
			@Override
			public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
				repos = curatorFramework.getChildren().forPath(path);
			}
		};
		pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
		pathChildrenCache.start();
	}
}

  

package zoorpc.zk; public class ZooConfig { public final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181"; public final static String ZK_REGISTER_PATH = "/registrys"; }
package zoorpc;

public interface IHelloWorld {
	
	public String sayHello(String msg);
}

  

package zoorpc; import java.io.Serializable; /** * 傳輸對象 * @author admin * */
public class RpcRequest implements Serializable{ private static final long serialVersionUID = 6351477854838485391L; private String className; private String methodName; private Object[] parameters; private String version; public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } public RpcRequest(String className, String methodName, Object[] parameters) { super(); this.className = className; this.methodName = methodName; this.parameters = parameters; } public RpcRequest() { super(); // TODO Auto-generated constructor stub
 } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } }
package zoorpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import zoorpc.zk.IDiscovery;

public class RpcClientProxy {
	
	private IDiscovery discovery;
	
	public RpcClientProxy(IDiscovery discovery) {
		this.discovery = discovery;
	}

	public <T> T clientProxy(final Class<T> interfaceCls,String version){
		
		return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
				new Class[] {interfaceCls},
                 new RemoteInvocationHandler(version,discovery));
		
	} 
}

  

package zoorpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.Socket;

import zoorpc.zk.IDiscovery;



public class RemoteInvocationHandler implements InvocationHandler {
	
	private String version;//添加版本號控制
	private IDiscovery discovery;
	
	public RemoteInvocationHandler(String version,IDiscovery discovery) {
		this.discovery = discovery;
		this.version = version;
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		// TODO Auto-generated method stub
		RpcRequest request = new RpcRequest();
		request.setClassName(method.getDeclaringClass().getName());
		request.setMethodName(method.getName());
		request.setParameters(args);
		request.setVersion(version);
		String serviceAddress = discovery.Discovery(request.getClassName());
		TcpTransport trans = new TcpTransport(serviceAddress);
		return trans.send(request);
	}

}

  

package zoorpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;


public class TcpTransport {
	
	private String serviceAddress;
	
	public TcpTransport(String serviceAddress) {
		super();
		this.serviceAddress = serviceAddress;
	}
	
	Socket newSocket(){
		System.out.println("建立一個鏈接");
		Socket socket = null;
		try {
			String[] split = serviceAddress.split(":");
			socket = new Socket(split[0],Integer.parseInt(split[1]));
			return socket;
		} catch (Exception e) {
			// TODO: handle exception
			throw new RuntimeException("鏈接創建失敗!");
		}
	}
	
	public Object send(RpcRequest request){
		Socket socket = null;
		ObjectOutputStream objectOutputStream = null;
		ObjectInputStream objectInputStream = null;
		try {
			socket = newSocket();
			
			objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
			objectOutputStream.writeObject(request);
			objectOutputStream.flush();
			
			objectInputStream = new ObjectInputStream(socket.getInputStream());
			Object readObject = objectInputStream.readObject();
			return readObject;
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
			throw new RuntimeException("鏈接創建失敗!");
		}finally {
			if(socket!=null){
				try {
					socket.close();
					objectOutputStream.close();
					objectInputStream.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}

}

  

package zoorpc.loadbalance;

import java.util.List;

public interface ILoadBalance {
	
	public String selectHost(List<String> repos);
		
}

  

package zoorpc.loadbalance; import java.util.List; public abstract class LoadBalance implements ILoadBalance{ @Override public String selectHost(List<String> repos) { if(repos.size()<1){ return null; }else if(repos.size() ==1){ return repos.get(0); }else{ return doSelect(repos); } } protected abstract String doSelect(List<String> repos); }
package zoorpc.loadbalance; import java.util.List; import java.util.Random; public class RandomLoadBalance extends LoadBalance { @Override protected String doSelect(List<String> repos) { int len = repos.size(); Random random = new Random(); return repos.get(random.nextInt(len)); } }
package zoorpc;

import zoorpc.zk.Discovery;
import zoorpc.zk.IDiscovery;
import zoorpc.zk.ZooConfig;

public class ClientDemo {
	public static void main(String[] args) {
		IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR);
		RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
         //IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"");結果:HelloWorld,8080lf IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"2.0");結果:HelloWorld2,8081lf System.out.println(hello.sayHello("lf")); } }

  2、模擬集羣負載均衡

   新增發佈類:框架

  

package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class LBServerDemo1 { //模擬集羣
    public static void main(String[] args) throws IOException { IHelloWorld service = new HelloWorldServiceImpl(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8080"); server.bind(service); server.publisher(); System.in.read(); } }
package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class LBServerDemo2 { //模擬集羣
    public static void main(String[] args) throws IOException { IHelloWorld service = new HelloWorldServiceImpl2(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8081"); server.bind(service); server.publisher(); System.in.read(); } }

修改示例2類的註解dom

package zoorpc;

import anno.RpcAnnotation;

//@RpcAnnotation(value = IHelloWorld.class,version = "2.0")
@RpcAnnotation(value = IHelloWorld.class)
public class HelloWorldServiceImpl2 implements IHelloWorld {

	@Override
	public String sayHello(String msg) {
		// TODO Auto-generated method stub
		return "HelloWorld2,8081"+msg;
	}

}

  運行發佈類1,2socket

  linux 下查看節點顯示:ide

[zk: localhost:2181(CONNECTED) 13] ls /registrys/zoorpc.IHelloWorld
[127.0.0.1:8081, 127.0.0.1:8080]
[zk: localhost:2181(CONNECTED) 14]ui

 客戶端this

package zoorpc; import zoorpc.zk.Discovery; import zoorpc.zk.IDiscovery; import zoorpc.zk.ZooConfig; public class LBClientDemo { public static void main(String[] args) throws InterruptedException { IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR); RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery); for (int i = 0; i < 10; i++) { IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,null); System.out.println(hello.sayHello("lf")); Thread.sleep(1000); } } }

運行結果:

建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld2,8081lf 建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld,8080lf 建立一個鏈接 HelloWorld2,8081lf

實現原理圖:

 4、集羣擴容

  1、停機擴容,修改配置

  2、逐臺擴容,一臺臺重啓

相關文章
相關標籤/搜索