本身動手實現RPC服務調用框架

引言

本文利用java自帶的socket編程實現了一個簡單的rpc調用框架,由兩個工程組成分別名爲battercake-provider(服務提供者)、battercake-consumer(服務調用者)。java

設計思路以下:
一、在battercake-provider中,寫一個服務叫BatterCakeService編程

二、在battercake-provider中,啓動RpcProvider,發佈該服務app

三、在battercake-consumer中,啓動測試類RpcTest框架

四、在battercake-consumer中,利用jdk動態代理,得到BatterCakeService的動態代理類BatterCakeService$Proxy0socket

五、在battercake-consumer中,動態代理類BatterCakeService$Proxy0,與battercake-provider創建socket鏈接,battercake-provider針對每個鏈接,都會啓動一個ServerThread處理請求,代理類則發送服務參數等相關信息ide

六、在battercake-consumer中,接收battercake-provider的ServerThread請求返回的結果。微服務

上述過程時序圖以下所示測試

image

接下來上代碼!!
this

服務提供者

本部分的工程爲battercake-provider,項目結構圖以下圖所示spa

image

先上使用的部分的代碼
先建立一個微服務,接口以下

package com.rjzheng.service;public interface BatterCakeService {	/**
	 * 賣煎餅的服務
	 * @param name
	 * @return
	 */
	public String sellBatterCake(String name);
}

實現類以下

package com.rjzheng.service.impl;import com.rjzheng.service.BatterCakeService;public class BatterCakeServiceImpl implements BatterCakeService {	@Override
	public String sellBatterCake(String name) {		// TODO Auto-generated method stub
		return name+"煎餅,賣的特別好";
	}

}

接下來就是發佈服務

package com.rjzheng.start;import com.rjzheng.rpc.RpcProvider;import com.rjzheng.service.BatterCakeService;import com.rjzheng.service.impl.BatterCakeServiceImpl;public class RpcBootStrap {	public static void main(String[] args) throws Exception {
		BatterCakeService batterCakeService =new BatterCakeServiceImpl();		//發佈賣煎餅的服務,註冊在20006端口
		RpcProvider.export(20006,batterCakeService);
	}
}

接下來是rpc框架調用部分的代碼,RpcProvider,該部分代碼能夠總結爲兩步

  1. 將須要發佈的服務存儲在一個內存變量serviceList中

  2. 啓動socket,server.accept()方法阻塞在那,監聽輸入

  3. 針對每個請求,單獨啓動一個線程處理

package com.rjzheng.rpc;import java.net.ServerSocket;import java.net.Socket;import java.util.ArrayList;import java.util.Arrays;import java.util.List;/**
 * RPC服務提供器
 * @author zhengrongjun
 *
 */public class RpcProvider {	
	//存儲註冊的服務列表
	private static List<Object> serviceList;	
	/**
	 * 發佈rpc服務
	 * @param object
	 * @param port
	 * @throws Exception
	 */
	public static void export(int port,Object... services) throws Exception {
		serviceList=Arrays.asList(services);
		ServerSocket server = new ServerSocket(port);
		Socket client = null;		while (true) {			//阻塞等待輸入
			client = server.accept();			//每個請求,啓動一個線程處理
			new Thread(new ServerThread(client,serviceList)).start();
		}
	}
}

接下來ServerThread線程處理類的代碼,ServerThread主要作如下幾個步驟

  1. 讀取客戶端發送的服務名

  2. 判斷服務是否發佈

  3. 若是發佈,則走反射邏輯,動態調用,返回結果

  4. 若是未發佈,則返回提示通知

package com.rjzheng.rpc;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.Socket;import java.util.List;public class ServerThread implements Runnable {	private Socket client = null;	private List<Object> serviceList = null;	public ServerThread(Socket client, List<Object> service) {		this.client = client;		this.serviceList = service;
	}	@Override
	public void run() {
		ObjectInputStream input = null;
		ObjectOutputStream output = null;		try {
			input = new ObjectInputStream(client.getInputStream());
			output = new ObjectOutputStream(client.getOutputStream());			// 讀取客戶端要訪問那個service
			Class serviceClass = (Class) input.readObject();			// 找到該服務類
			Object obj = findService(serviceClass);			if (obj == null) {
				output.writeObject(serviceClass.getName() + "服務未發現");
			} else {				//利用反射調用該方法,返回結果
				try {
					String methodName = input.readUTF();
					Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
					Object[] arguments = (Object[]) input.readObject();
					Method method = obj.getClass().getMethod(methodName, parameterTypes);  
                    Object result = method.invoke(obj, arguments);  
                    output.writeObject(result); 
				} catch (Throwable t) {
					output.writeObject(t);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {			try {
				client.close();
				input.close();
				output.close();
			} catch (IOException e) {				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

	}	private Object findService(Class serviceClass) {		// TODO Auto-generated method stub
		for (Object obj : serviceList) {			boolean isFather = serviceClass.isAssignableFrom(obj.getClass());			if (isFather) {				return obj;
			}
		}		return null;
	}

}

服務消費者

本部分的工程爲battercake-consumer,項目結構圖以下圖所示
image

先上rpc框架調用部分的代碼RpcConsumer,步驟分兩步

  1. 封裝一個代理類處理器

  2. 返回service的代理類對象

package com.rjzheng.rpc;import java.lang.reflect.Proxy;public class RpcConsumer {	
	public static <T> T getService(Class<T> clazz,String ip,int port) {
		ProxyHandler proxyHandler =new ProxyHandler(ip,port);		return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
	}
}

接下來上代理類處理器的代碼,代理類處理步驟分如下幾步

  1. 創建socket鏈接

  2. 封裝請求數據,發送給服務提供者

  3. 返回結果

package com.rjzheng.rpc;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.net.Socket;import com.rjzheng.service.BatterCakeService;public class ProxyHandler implements InvocationHandler {	private String ip;	private int port;	public ProxyHandler(String ip, int port) {		// TODO Auto-generated constructor stub
		this.ip = ip;		this.port = port;
	}	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {		// TODO Auto-generated method stub
		Socket socket = new Socket(this.ip, this.port);
		ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
		ObjectInputStream input = new ObjectInputStream(socket.getInputStream());		try {
			output.writeObject(proxy.getClass().getInterfaces()[0]);
			output.writeUTF(method.getName());
			output.writeObject(method.getParameterTypes());
			output.writeObject(args);
			output.flush();
			Object result = input.readObject();			if(result instanceof Throwable) {				throw (Throwable) result;
			}				return result;
		} finally {
			socket.shutdownOutput();
		}
	}

}

接下來創建一個測試類RpcTest以下(跑該測試類前,記得運行在battercake-provider端的RpcBootstrap類發佈BatterCakeService服務)

package com.rjzheng.start;import com.rjzheng.rpc.RpcConsumer;import com.rjzheng.service.BatterCakeService;public class RpcTest {	public static void main(String[] args) {
		BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
		String result=batterCakeService.sellBatterCake("雙蛋");
		System.out.println(result);
	}
}

輸出結果以下

雙蛋煎餅,賣的特別好

至此,咱們就實現了一個簡易的rpc服務調用框架

相關文章
相關標籤/搜索