從零開始的高併發(八)--- RPC框架的簡單實現

前言

前情概要

上一篇咱們簡單過了一遍RPC是什麼,三個過程,爲何咱們須要它,它的特性和適用場景,RPC的流程及協議定義還有它的框架的一些小知識。理論常常會看的人昏昏欲睡,不知所云。若是可以結合一些代碼來講明的話,那就方便理解不少了spring

以往連接

從零開始的高併發(一)--- Zookeeper的基礎概念緩存

從零開始的高併發(二)--- Zookeeper實現分佈式鎖服務器

從零開始的高併發(三)--- Zookeeper集羣的搭建和leader選舉網絡

從零開始的高併發(四)--- Zookeeper的分佈式隊列併發

從零開始的高併發(五)--- Zookeeper的配置中心應用app

從零開始的高併發(六)--- Zookeeper的Master選舉及官網小覽框架

從零開始的高併發(七)--- RPC的介紹,協議及框架dom

內容一:RPC的流程和任務

1. RPC的流程

其實這個在上一篇的2 - ① 也已經提到過了,若是忘了,不要緊,我再複製過來socket

stub:分佈式計算中的存根是一段代碼,它轉換在遠程過程調用期間Client和server之間傳遞的參數分佈式

1.客戶端處理過程當中調用client stub(就像調用本地方法同樣),傳入參數

2.Client stub將參數編組爲消息,而後經過系統調用向服務端發送消息

3.客戶端本地操做系統將消息從客戶端機器發送到服務端機器

4.服務端操做系統將接收到的數據包傳遞給client stub

5.server stub解組消息爲參數

6.server stub再調用服務端的過程,過程執行結果以反方向的相同步驟響應給客戶端

2. 從使用者的角度開始分析

1.定義過程接口
2.服務端實現接口的整個過程
3.客戶端使用生成的stub代理對象
複製代碼

內容二:RPC框架的設計及實現

1. 準備一個Student的實體類及基礎接口

客戶端生成過程接口的代理對象,經過設計一個客戶端代理工廠,使用JDK動態代理便可生成接口的代理對象

① 定義一個StudentService接口

Student類有三個屬性name(String),age(int),sex(String),節省篇幅就不貼代碼了,提供getter,setter和toString方法便可

public interface StudentService {
	/**
	 * 獲取信息
	 * @return
	 */
	public Student getInfo();
	
	//打印student的信息並返回一個boolean值
	public boolean printInfo(Student student);
}
複製代碼

而且提供一個簡單的實現,其實就是打印一個Student的信息出來而已

@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {

	public Student getInfo() {
		Student person = new Student();
    	person.setAge(25);
		person.setName("說出你的願望吧~");
		person.setSex("男");
		return person;
	}

	public boolean printInfo(Student person) {
		if (person != null) {
			System.out.println(person);
			return true;
		}
		return false;
	}
	
	public static void main(String[] args) {
		new Thread(()->{
			System.out.println("111");
		}).start();;
	}
}
複製代碼

2.客戶端的搭建

① 從測試類去了解所需

首先,客戶端經過咱們的本地代理,得到咱們的StudentService的代理類,此時咱們客戶端本地是確定不存在StudentService的實現的,此時尋址咱們是直接給出來了

public class ClientTest {
	@Test
	public void test() {
		// 本地沒有接口實現,經過代理得到接口實現實例
		RpcClientProxy proxy = new RpcClientProxy("192.168.80.1", 9998);
		StudentService service = proxy.getProxy(StudentService.class);
		
		System.out.println(service.getInfo());
		
		Student student = new Student();
		student.setAge(23);
		student.setName("hashmap");
		student.setSex("男");
		System.out.println(service.printInfo(student));
	}
}
複製代碼

此時咱們的關注點轉到客戶端是如何幫咱們進行代理的

② 實現了InvocationHandler接口的RpcClientProxy

/**
 * RpcClientProxy
 * 客戶端代理服務,客戶端往服務端發起的調用將經過客戶端代理來發起
 */
public class RpcClientProxy implements InvocationHandler{
	private String host;	// 服務端地址
	private int port;	// 服務端口號
	
	public RpcClientProxy(String host, int port){
		this.host = host;
		this.port = port;
	}
	
	/**
	 * 生成業務接口的代理對象,代理對象作的事情,在invoke方法中。
	 * @param clazz 代理類型(接口)
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> clazz){
		// clazz 不是接口不能使用JDK動態代理
		return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
	}
	
	/**
	 * 動態代理作的事情,接口的實現不在本地,在網絡中的其餘進程中,咱們經過實現了Rpc客戶端的對象來發起遠程服務的調用。
	 */
	public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
		// 調用前
		System.out.println("執行遠程方法前,能夠作些事情");
		
		// 調用遠程服務,須要封裝參數,相似於序列化的過程
		RpcRequest request = new RpcRequest();
		request.setClassName(method.getDeclaringClass().getName());
		request.setMethodName(method.getName());
		request.setParamTypes(method.getParameterTypes());
		request.setParams(params);
		// 連接服務器調用服務
		RpcClient client = new RpcClient();
		Object rst = client.start(request, host, port);
		
		// 調用後
		System.out.println("執行遠程方法後,也能夠作些事情");
		return rst;
	}
}
複製代碼

JDK提供了Proxy類來實現咱們的動態代理,能夠經過newProxyInstance(ClassLoader var0, Class<?>[] var1, InvocationHandler var2)方法來實例化一個代理對象,此時咱們傳入的參數clazz是規定必須爲一個接口的,若是不是接口就不能使用JDK動態代理

而第三個參數RpcClientProxy.this則是newProxyInstance()方法雖然幫咱們建立好了實例,可是建立實例完成後的具體動做必須由這個InvocationHandler來提供

InvocationHandler這個接口裏面僅僅只有一個 Object invoke(Object var1, Method var2, Object[] var3) throws Throwable,這個方法的參數相信不難理解,第一個是代理對象,第二個是執行的方法,第三個是所需的參數集

回到咱們剛剛的代碼,在我執行System.out.println(service.getInfo())這條語句的時候,咱們的邏輯就會跳到invoke()的實現中來,在invoke()方法的註釋中也把過程很詳細的說明了,首先咱們須要調用遠程服務了,進行一個參數的封裝,以後就進行一個網絡鏈接把這些參數發送給咱們的服務端,此時咱們須要用到RpcClient了

③ RpcClient

在start()方法中,咱們的RpcRequest request是實現了Serializable接口的,因此此時封裝好的數據會轉換成一個二進制而後被flush()過去,此時咱們消息已經發送了,須要等待服務端的響應,響應咱們就須要經過咱們的服務端ObjectOutputStream來接收一個輸入流

/**
 * RpcClient
 * Rpc客戶端,表明業務代碼做爲客戶端,往遠端服務發起請求。
 */
public class RpcClient {
	
	/**
	 * 經過網絡IO,打開遠端服務鏈接,將請求數據寫入網絡中,並得到響應結果。
	 * 
	 * @param request 將要發送的請求數據
	 * @param host 遠端服務域名或者ip地址
	 * @param port 遠端服務端口號
	 * @return 服務端響應結果
	 * @throws Throwable 拋出的異常
	 */
	public Object start(RpcRequest request, String host, int port) throws Throwable{
		// 打開遠端服務鏈接
		Socket server = new Socket(host, port);
		
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		
		try {
			// 1. 服務端輸出流,寫入請求數據,發送請求數據
			oout = new ObjectOutputStream(server.getOutputStream());
			oout.writeObject(request);
			oout.flush();
			
			// 2. 服務端輸入流,獲取返回數據,轉換參數類型
			// 相似於反序列化的過程
			oin = new ObjectInputStream(server.getInputStream());
			Object res = oin.readObject();
			RpcResponse response = null;
			if(!(res instanceof RpcResponse)){
				throw new InvalidClassException("返回參數不正確,應當爲:"+RpcResponse.class+" 類型");
			}else{
				response = (RpcResponse) res;
			}
			
			// 3. 返回服務端響應結果
			if(response.getError() != null){ // 服務器產生異常
				throw response.getError();
			}
			return response.getResult();
		}finally{
			try {	// 清理資源,關閉流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(server != null) server.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}
複製代碼

④ 進行參數封裝的RpcRequest

/**
 * RpcRequest
 * Rpc請求對象,請求遠端服務服務的內容,在網絡上進行傳輸。
 */
public class RpcRequest implements Serializable{
	// 須要請求的類名
	private String className;
	
	// 需求請求的方法名
    private String methodName;
    
    // 請求方法的參數類型
    private Class<?>[] paramTypes;
    
    // 請求的參數值
    private Object[] params;
	
	public String getClassName() {
		return className;
	}
	public void setClassName(String className) {
		this.className = className;
	}
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getParamTypes() {
		return paramTypes;
	}
	public void setParamTypes(Class<?>[] paramTypes) {
		this.paramTypes = paramTypes;
	}
	public Object[] getParams() {
		return params;
	}
	public void setParams(Object[] params) {
		this.params = params;
	}
 
}
複製代碼

⑤ Rpc服務端響應結果包裝RpcResponse

同時也是實現了JDK默認的序列化Serializable

/**
 * RpcResponse
 * Rpc服務端響應結果包裝類,在網絡上進行傳輸。
 */
public class RpcResponse implements Serializable {
	// 可能拋出的異常
	private Throwable error;
	// 響應的內容或結果
        private Object result;
    
	public Throwable getError() {
		return error;
	}
	public void setError(Throwable error) {
		this.error = error;
	}
	public Object getResult() {
		return result;
	}
	public void setResult(Object result) {
		this.result = result;
	}
    
    
}
複製代碼

3.服務端的搭建

① 服務端的模擬ServerTest

public class ServerTest {

	@Test
	public void startServer() {
		RpcServer server = new RpcServer();
		server.start(9998, "rpc.simple.RpcServer");
	}
	
	public static void main(String[] args) {
	}
}
複製代碼

給到一個端口號,參數中帶有一個包,功能是掃描某個包下的服務

② start()方法的實現

建立一個Map類型的集合services存放掃描到提供rpc服務的類,此時由於沒有放在註冊中心上因此就不存在尋址了。後面將會把它放入zookeeper的註冊中心

getService()下,咱們在ServerTest不是提供了一個包名嗎,此時咱們先去找到了它們全部的classes(請參考getClasses()方法),getClasses()中咱們其實主要是先根據提供的包名往下找,要是目錄都有問題的話就拋出異常,若是沒問題,就開始遍歷此目錄下的全部文件,遍歷出來的結果若是發現這個文件是class文件,就把其實例化,而且進行判斷是否存在一個自定義註解@service,標註了這個註解的類就是RPC服務的實現類。若是存在這個註解,那就是咱們須要找的rpc服務,就把它裝到一個結果集classes中,若是目錄下面仍然是目錄,那就本身調用本身,直到看到class文件爲止

當咱們把全部的class都找到了,回到getService()方法下,就都集中放於一個classList中,而後把它們Map化,就是把接口的名稱做爲key,把實例做爲value(services.put(cla.getAnnotation(Service.class).value().getName(), obj))。

最後再回到start(),進行完服務掃描以後還會有一個RpcServerHandler來進行處理

/**
 * RpcServer
 * Rpc服務提供者
 */
public class RpcServer {
	
	/**
	 * 啓動指定的網絡端口號服務,並監聽端口上的請求數據。得到請求數據之後將請求信息委派給服務處理器,放入線程池中執行。
	 * @param port 監聽端口
	 * @param clazz 服務類所在包名,多個用英文逗號隔開
	 */
	public void start(int port, String clazz) {
		ServerSocket server = null;
		try {
			// 1. 建立服務端指定端口的socket鏈接
			server = new ServerSocket(port);
			// 2. 獲取全部rpc服務類
			Map<String, Object> services = getService(clazz);
			// 3. 建立線程池
			Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
			while(true){
				// 4. 獲取客戶端鏈接
				Socket client = server.accept();
				// 5. 放入線程池中執行
				RpcServerHandler service = new RpcServerHandler(client, services);
				executor.execute(service);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			//關閉監聽
			if(server != null)
				try {
					server.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
		}
	}
	
	/**
	 * 實例化全部rpc服務類,也可用於暴露服務信息到註冊中心。
	 * @param clazz 服務類所在包名,多個用英文逗號隔開
	 * @return
	 */
	public Map<String,Object> getService(String clazz){
		try {
			Map<String, Object> services = new HashMap<String, Object>();
			// 獲取全部服務類
			String[] clazzes = clazz.split(",");
			List<Class<?>> classes = new ArrayList<Class<?>>();
			for(String cl : clazzes){
				List<Class<?>> classList = getClasses(cl);
				classes.addAll(classList);
			}
			// 循環實例化
			for(Class<?> cla:classes){
				Object obj = cla.newInstance();
				services.put(cla.getAnnotation(Service.class).value().getName(), obj);
			}
			return services;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 獲取包下全部有@Sercive註解的類
	 * @param pckgname
	 * @return
	 * @throws ClassNotFoundException
	 */
	public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
		// 須要查找的結果
		List<Class<?>> classes = new ArrayList<Class<?>>();
		// 找到指定的包目錄
		File directory = null;
		try {
			ClassLoader cld = Thread.currentThread().getContextClassLoader();
			if (cld == null)
				throw new ClassNotFoundException("沒法獲取到ClassLoader");
			String path = pckgname.replace('.', '/');
			URL resource = cld.getResource(path);
			if (resource == null)
				throw new ClassNotFoundException("沒有這樣的資源:" + path);
			directory = new File(resource.getFile());
		} catch (NullPointerException x) {
			throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一個有效的資源");
		}
		if (directory.exists()) {
			// 獲取包目錄下的全部文件
			String[] files = directory.list();
			File[] fileList = directory.listFiles();
			// 獲取包目錄下的全部文件
			for (int i = 0; fileList != null && i < fileList.length; i++) {
				File file = fileList[i];
				//判斷是不是Class文件
				if (file.isFile() && file.getName().endsWith(".class")) {
					Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
					if(clazz.getAnnotation(Service.class) != null){
						classes.add(clazz);
					}
				}else if(file.isDirectory()){ //若是是目錄,遞歸查找
					List<Class<?>> result = getClasses(pckgname+"."+file.getName());
					if(result != null && result.size() != 0){
						classes.addAll(result);
					}
				}
			}
		} else{
			throw new ClassNotFoundException(pckgname + "不是一個有效的包名");
		}
		return classes;
	}
}
複製代碼

③ 進行處理的RpcServerHandler

和剛剛的RpcClient很是相似,都是序列化和反序列化的過程,主要是第三步中得到了實例和方法及其參數後,再調用invoke()方法而後把結果放入response的過程

/**
 * RpcServerHandler
 * 服務端請求處理,處理來自網絡IO的服務請求,並響應結果給網絡IO。
 */
public class RpcServerHandler implements Runnable {
	
	// 客戶端網絡請求socket,能夠從中得到網絡請求信息
	private Socket clientSocket;
	
	// 服務端提供處理請求的類集合
	private Map<String, Object> serviceMap;
	
	/**
	 * @param client 客戶端socket
	 * @param services 全部服務
	 */
	public RpcServerHandler(Socket client, Map<String, Object> services) {
		this.clientSocket = client;
		this.serviceMap = services;
	}
 
 
	/**
	 * 讀取網絡中客戶端請求的信息,找到請求的方法,執行本地方法得到結果,寫入網絡IO輸出中。
	 * 
	 */
	public void run() {
		ObjectInputStream oin = null;
		ObjectOutputStream oout = null;
		RpcResponse response = new RpcResponse();
		try {
			// 1. 獲取流以待操做
			oin = new ObjectInputStream(clientSocket.getInputStream());
			oout = new ObjectOutputStream(clientSocket.getOutputStream());
			
			// 2. 從網絡IO輸入流中請求數據,強轉參數類型
			Object param = oin.readObject();
			RpcRequest  request = null;
			if(!(param instanceof RpcRequest)){
				response.setError(new Exception("參數錯誤"));
				oout.writeObject(response);
				oout.flush();
				return;
			}else{
				// 反序列化RpcRequest
				request = (RpcRequest) param;
			}
			
			// 3. 查找並執行服務方法
			Object service = serviceMap.get(request.getClassName());
			Class<?> clazz= service.getClass();
			Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
			Object result = method.invoke(service, request.getParams());
			
			// 4. 返回RPC響應,序列化RpcResponse
			response.setResult(result);
			// 序列化結果
			oout.writeObject(response);
			oout.flush();
			return;
		} catch (Exception e) {
			try {	//異常處理
				if(oout != null){
					response.setError(e);
					oout.writeObject(response);
					oout.flush();
				}
			} catch (Exception e1) {
				e1.printStackTrace();
			}
			return;
		}finally{
			try {	// 回收資源,關閉流
				if(oin != null) oin.close();
				if(oout != null) oout.close();
				if(clientSocket != null) clientSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
 
}
複製代碼

4.運行結果

先開啓ServerTest再開啓ClientTest,簡單快捷,注意別去右鍵跑main方法便可

內容三:優化客戶端的舉措

1.發現者的引入

設計客戶端的時候,在ClientStubInvocationHandler中須要完成的兩件事爲編組消息和發送網絡請求,而將請求的內容編組爲消息這件事就交由客戶端的stub代理,它除了消息協議和網絡層的事務之外,可能還存在一個服務信息發現,此外消息協議可能也是會存在變化的,咱們也須要去支持多種協議,這個實際上是和框架對協議的支持廣度有關的。好比dubbo相對於spring cloud而言對協議的支持就相對靈活一些

此時咱們須要得知某服務用的是什麼協議,因此咱們須要引入一個服務發現者

2.協議層

咱們想要作到支持多種協議,類該如何設計(面向接口,策略模式,組合)

此時咱們的協議須要抽象出來,對於協議的內容須要進行編組和解組,好比咱們上面提供的JSON和HTTP兩種不一樣的實現,而此時客戶端的存根裏面就不只僅只是須要服務發現者,還須要咱們對於這個協議的支持

① 補充:如何從zookeeper中獲取註冊信息

主要看regist()方法,咱們在註冊的時候把服務信息進行了拼接,並建立成臨時節點,父節點爲持久節點。servicePath是相似於dubbo的一個目錄結構,一個根目錄/rpc+服務名稱serviceName+service,獲取服務的方法loadServiceResouces()也不難,根據這些地址獲取它們下面的子節點,把全部的url加載出來給到調用者

public class RegistCenter {
	ZkClient client = new ZkClient("localhost:2181");
	
	private String centerRootPath = "/rpc";
	
	public RegistCenter() {
		client.setZkSerializer(new MyZkSerializer());
	}
	
	public void regist(ServiceResource serviceResource) {
		String serviceName = serviceResource.getServiceName();
		String uri = JsonMapper.toJsonString(serviceResource);
		try {
			uri = URLEncoder.encode(uri, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		if(! client.exists(servicePath)) {
			client.createPersistent(servicePath, true);
		}
		String uriPath = servicePath+"/"+uri;
		client.createEphemeral(uriPath);
	}
	
	/**
	 * 加載配置中心中服務資源信息
	 * @param serviceName
	 * @return
	 */
	public List<ServiceResource> loadServiceResouces(String serviceName) {
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		List<String> children = client.getChildren(servicePath);
		List<ServiceResource> resources = new ArrayList<ServiceResource>();
		for(String ch : children) {
			try {
				String deCh = URLDecoder.decode(ch, "UTF-8");
				ServiceResource r = JsonMapper.fromJsonString(deCh, ServiceResource.class);
				resources.add(r);
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
		return resources;
	}
	
	private void sub(String serviceName, ChangeHandler handler) {
		/*
		String path = centerRootPath + "/"+serviceName+"/service";
		client.subscribeChildChanges(path, new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
				handler();
			}
		});
		client.subscribeDataChanges(path, new IZkDataListener() {
			@Override
			public void handleDataDeleted(String dataPath) throws Exception {
				handler();
			}
			
			@Override
			public void handleDataChange(String dataPath, Object data) throws Exception {
				handler();
			}
		});
		*/
	}
	
	interface ChangeHandler {
		/**
		 * 發生變化後給一個完整的屬性對象
		 * @param resource
		 */
		void itemChange(ServiceResource resource);
	}
}
複製代碼

② ClientStubProxyFactory

/**
 * ClientStubProxyFactory
  *   客戶端存根代理工廠
 */
public class ClientStubProxyFactory {

	private ServiceInfoDiscoverer sid;

	private Map<String, MessageProtocol> supportMessageProtocols;

	private NetClient netClient;

	private Map<Class<?>, Object> objectCache = new HashMap<>();
	
	/**
	 * 
	 * 
	 * @param <T>
	 * @param interf
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> interf) {
		T obj = (T) this.objectCache.get(interf);
		if (obj == null) {
			obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
					new ClientStubInvocationHandler(interf));
			this.objectCache.put(interf, obj);
		}

		return obj;
	}

	public ServiceInfoDiscoverer getSid() {
		return sid;
	}

	public void setSid(ServiceInfoDiscoverer sid) {
		this.sid = sid;
	}

	public Map<String, MessageProtocol> getSupportMessageProtocols() {
		return supportMessageProtocols;
	}

	public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
		this.supportMessageProtocols = supportMessageProtocols;
	}

	public NetClient getNetClient() {
		return netClient;
	}

	public void setNetClient(NetClient netClient) {
		this.netClient = netClient;
	}
	
	/**
	 * ClientStubInvocationHandler
	 * 客戶端存根代理調用實現
	 * @date 2019年4月12日 下午2:38:30
	 */
	private class ClientStubInvocationHandler implements InvocationHandler {
		private Class<?> interf;

		public ClientStubInvocationHandler(Class<?> interf) {
			super();
			this.interf = interf;
		}

		@Override
		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

			// 一、得到服務信息
			String serviceName = this.interf.getName();
			ServiceInfo sinfo = sid.getServiceInfo(serviceName);

			if (sinfo == null) {
				throw new Exception("遠程服務不存在!");
			}

			// 二、構造request對象
			Request req = new Request();
			req.setServiceName(sinfo.getName());
			req.setMethod(method.getName());
			req.setPrameterTypes(method.getParameterTypes());
			req.setParameters(args);

			// 三、協議層編組
			// 得到該方法對應的協議
			MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
			// 編組請求
			byte[] data = protocol.marshallingRequest(req);

			// 四、調用網絡層發送請求
			byte[] repData = netClient.sendRequest(data, sinfo);

			// 5解組響應消息
			Response rsp = protocol.unmarshallingResponse(repData);

			// 六、結果處理
			if (rsp.getException() != null) {
				throw rsp.getException();
			}

			return rsp.getReturnValue();
		}
	}
}
複製代碼

ClientStub中有兩個引用,一個是服務發現接口ServiceInfoDiscoverer,做用爲根據服務名得到遠程服務信息,提供一個ServiceInfo getServiceInfo(String name)方法,還有就是對於不一樣協議的支持supportMessageProtocols,MessageProtocol咱們也是定義了一個接口,這個接口就須要比較詳細了,編碼成二級制,和解碼成Request等,對於response也是一樣這麼個過程

/**
 * 通訊協議接口
 * MessageProtocol
 */
public interface MessageProtocol {
	/**
	 * 編組請求消息
	 * @param req
	 * @return
	 */
	byte[] marshallingRequest(Request req);
	
	/**
	 * 解編組請求消息
	 * @param data
	 * @return
	 */
	Request unmarshallingRequest(byte[] data);
	
	/**
	 * 編組響應消息
	 * @param rsp
	 * @return
	 */
	byte[] marshallingResponse(Response rsp);
	
	/**
	 * 解編組響應消息
	 * @param data
	 * @return
	 */
	Response unmarshallingResponse(byte[] data);
}
複製代碼

此時又存在一些問題,單純依靠編組和解組的方法是不夠的,編組和解組的操做對象是請求,響應,可是它們的內容是不一樣的,此時咱們又須要定義框架標準的請求響應類

request有具體的服務名,服務方法,消息頭,參數類型和參數,一樣的response也有狀態(經過枚舉),消息頭,返回值及類型以及是否存在異常。

此時協議層擴展爲4個方法

將消息協議獨立爲一層,客戶端和服務端都須要使用

3. 網絡層

網絡層的工做主要是發送請求和得到響應,此時咱們若是須要發起網絡請求一定先要知道服務地址,此時咱們利用下圖中serviceInfo對象做爲必須依賴,setRequest()方法裏面會存在發送數據,還有發送給誰,此時給出了BIO和Netty兩種實現

因此咱們須要的三個依賴就都出來了,一個是服務發現者,一個是協議支持,再而後就是咱們網絡層的NetClient

4. 總圖

紫色表明客戶端代理部分,淺綠色屬於服務發現,淺藍色屬於協議部分

5.代碼部分(可直接無視)

由於這些代碼和主要的思路已經沒有瓜葛了,只是一些功能代碼,因此能夠直接忽略了。若是實在是想本身跑一下,也能夠問我要一個小樣。

① 依舊是回到咱們的ClientStubProxyFactory

能夠和內容二的RpcClientProxy作一個對比,在原有的基礎上加上了三個依賴ServiceInfoDiscoverer,supportMessageProtocols,netClient

在ClientStubProxyFactory中對Object作了一個緩存,若是已經存在這個緩存就直接返回,沒有的話加入到緩存中而後new出來,只是一個小小的不一樣。

② invoke()方法的改變

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

	// 一、得到服務信息
	String serviceName = this.interf.getName();
	ServiceInfo sinfo = sid.getServiceInfo(serviceName);

	if (sinfo == null) {
		throw new Exception("遠程服務不存在!");
	}

	// 二、構造request對象
	Request req = new Request();
	req.setServiceName(sinfo.getName());
	req.setMethod(method.getName());
	req.setPrameterTypes(method.getParameterTypes());
	req.setParameters(args);

	// 三、協議層編組
	// 得到該方法對應的協議
	MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
	// 編組請求
	byte[] data = protocol.marshallingRequest(req);

	// 四、調用網絡層發送請求
	byte[] repData = netClient.sendRequest(data, sinfo);

	// 五、解組響應消息
	Response rsp = protocol.unmarshallingResponse(repData);

	// 六、結果處理
	if (rsp.getException() != null) {
		throw rsp.getException();
	}

	return rsp.getReturnValue();
}
複製代碼

首先是服務發現,在咱們執行 ① 中提到的getProxy()方法時,此時代理的接口已經直接告訴咱們了,因此咱們就直接得到了接口信息interf,而後調用getName()方法獲取接口的名稱,經過接口名,調用服務發現者ServiceInfo提供的getServiceInfo()方法就能獲取服務的具體信息,而後放入請求參數request裏面,接下來給request的各個屬性賦值

以後咱們就開始尋找這個服務所對應的協議,得到協議以後能夠獲取協議支持對象,以後進行編組請求,轉換成二進制,經過netClient發送過去,順帶連同服務端信息給出去。獲取結果repData進行解組(二進制回到response),以後進行結果處理。

③ 服務發現者的實現

以前也提到了,服務發現者ServiceInfoDiscoverer是做爲一個接口提供了getServiceInfo()方法的

有兩種不一樣的實現,本地實現咱們能夠本身搞一個配置文件加載進來,把相關的服務信息弄進去得了

zookeeper的服務發現實現以下,相似於咱們一開始在2 - ① 中補充的zookeeper的內容

public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {
	ZkClient client = new ZkClient("localhost:2181");
	
	private String centerRootPath = "/rpc";
	
	public ZookeeperServiceInfoDiscoverer() {
		client.setZkSerializer(new MyZkSerializer());
	}
	
	public void regist(ServiceInfo serviceResource) {
		String serviceName = serviceResource.getName();
		String uri = JSON.toJSONString(serviceResource);
		try {
			uri = URLEncoder.encode(uri, "UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		if(! client.exists(servicePath)) {
			client.createPersistent(servicePath, true);
		}
		String uriPath = servicePath+"/"+uri;
		client.createEphemeral(uriPath);
	}
	
	/**
	 * 加載配置中心中服務資源信息
	 * @param serviceName
	 * @return
	 */
	public List<ServiceInfo> loadServiceResouces(String serviceName) {
		String servicePath = centerRootPath + "/"+serviceName+"/service";
		List<String> children = client.getChildren(servicePath);
		List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
		for(String ch : children) {
			try {
				String deCh = URLDecoder.decode(ch, "UTF-8");
				ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
				resources.add(r);
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
		return resources;
	}
	
	@Override
	public ServiceInfo getServiceInfo(String name) {
		List<ServiceInfo> list = loadServiceResouces(name);
		ServiceInfo info = list.get(0);
		list.forEach((e)->{
			if(e != info) {
				info.addAddress(e.getAddress().get(0));
			}
		});
		return info;
	}

}
複製代碼

④ 協議支持相關

這裏只實現了JSON的,經過fastJSON來實現

public class JSONMessageProtocol implements MessageProtocol {

	@Override
	public byte[] marshallingRequest(Request req) {
		Request temp = new Request();
		temp.setServiceName(req.getServiceName());
		temp.setMethod(req.getMethod());
		temp.setHeaders(req.getHeaders());
		temp.setPrameterTypes(req.getPrameterTypes());

		if (req.getParameters() != null) {
			Object[] params = req.getParameters();
			Object[] serizeParmas = new Object[params.length];
			for (int i = 0; i < params.length; i++) {
				serizeParmas[i] = JSON.toJSONString(params[i]);
			}

			temp.setParameters(serizeParmas);
		}

		return JSON.toJSONBytes(temp);
	}

	@Override
	public Request unmarshallingRequest(byte[] data) {
		Request req = JSON.parseObject(data, Request.class);
		if(req.getParameters() != null) {
			Object[] serizeParmas = req.getParameters();
			Object[] params = new Object[serizeParmas.length];
			for(int i = 0; i < serizeParmas.length; i++) {
				Object param = JSON.parseObject(serizeParmas[i].toString(), Object.class);
				params[i] = param;
			}
			req.setParameters(params);
		}
		return req;
	}

	@Override
	public byte[] marshallingResponse(Response rsp) {
		Response resp = new Response();
		resp.setHeaders(rsp.getHeaders());
		resp.setException(rsp.getException());
		resp.setReturnValue(rsp.getReturnValue());
		resp.setStatus(rsp.getStatus());
		return JSON.toJSONBytes(resp);
	}

	@Override
	public Response unmarshallingResponse(byte[] data) {
		return JSON.parseObject(data, Response.class);
	}

}
複製代碼

⑤ NetClient相關

分爲BIO和Netty兩種模式,netty中使用了EventLoopGroup

BIO:
public class BioNetClient implements NetClient {
	
	@Override
	public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
		List<String> addressList = sinfo.getAddress();
		int randNum = new Random().nextInt(addressList.size());
		String address = addressList.get(randNum);
		String[] addInfoArray = address.split(":");
		try {
			return startSend(data, addInfoArray[0], Integer.valueOf(addInfoArray[1]));
		} catch (Throwable e) {
			e.printStackTrace();
		}
		return null;
	}
	
	/**
	 * 經過網絡IO,打開遠端服務鏈接,將請求數據寫入網絡中,並得到響應結果。
	 * 
	 * @param requestData 將要發送的請求數據
	 * @param host 遠端服務域名或者ip地址
	 * @param port 遠端服務端口號
	 * @return 服務端響應結果
	 * @throws Throwable 拋出的異常
	 */
	private byte[] startSend(byte[] requestData, String host, int port) throws Throwable{
		// 打開遠端服務鏈接
		Socket serverSocket = new Socket(host, port);
		
		InputStream in = null;
		OutputStream out = null;
		
		try {
			// 1. 服務端輸出流,寫入請求數據,發送請求數據
			out = serverSocket.getOutputStream();
			out.write(requestData);
			out.flush();
			
			// 2. 服務端輸入流,獲取返回數據,轉換參數類型
			// 相似於反序列化的過程
			in = serverSocket.getInputStream();
			byte[] res = new byte[1024];
			int readLen = -1;
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			while((readLen = in.read(res)) > 0) {
				baos.write(res, 0, readLen);
			}
			return baos.toByteArray();
		}finally{
			try {	// 清理資源,關閉流
				if(in != null) in.close();
				if(out != null) out.close();
				if(serverSocket != null) serverSocket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
複製代碼
netty模式:
public class NettyNetClient implements NetClient {
	private SendHandler sendHandler;
	private Map<String, SendHandler> sendHandlerMap = new ConcurrentHashMap<String, SendHandler>();
	
	@Override
	public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
		try {
			List<String> addressList = sinfo.getAddress();
			int randNum = new Random().nextInt(addressList.size());
			String address = addressList.get(randNum);
			String[] addInfoArray = address.split(":");
			SendHandler handler = sendHandlerMap.get(address);
			if(handler == null) {
				sendHandler = new SendHandler(data);
				new Thread(()->{
					try {
						connect(addInfoArray[0], Integer.valueOf(addInfoArray[1]));
					} catch (NumberFormatException e) {
						e.printStackTrace();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}).start();
			}
			byte[] respData = (byte[]) sendHandler.rspData();
			return respData;
		} catch (NumberFormatException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	public void connect(String host, int port) throws Exception {
        // 配置客戶端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //EchoClientHandler handler = new EchoClientHandler();
            
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(sendHandler);
                 }
             });

            // 啓動客戶端鏈接
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客戶端鏈接關閉
            f.channel().closeFuture().sync();
        } finally {
            // 釋放線程組資源
            group.shutdownGracefully();
        }
    }
}
複製代碼

⑥ 運行結果

能夠自行模擬一個消費者和一個生產者進行測試,這裏就不貼出來了

finally

以後會繼續dubbo的內容

下一篇:從零開始的高併發(九)--- dubbo的核心功能及協議

相關文章
相關標籤/搜索