上一篇咱們簡單過了一遍RPC是什麼,三個過程,爲何咱們須要它,它的特性和適用場景,RPC的流程及協議定義還有它的框架的一些小知識。理論常常會看的人昏昏欲睡,不知所云。若是可以結合一些代碼來講明的話,那就方便理解不少了spring
從零開始的高併發(一)--- Zookeeper的基礎概念緩存
從零開始的高併發(二)--- Zookeeper實現分佈式鎖服務器
從零開始的高併發(三)--- Zookeeper集羣的搭建和leader選舉網絡
從零開始的高併發(四)--- Zookeeper的分佈式隊列併發
從零開始的高併發(五)--- Zookeeper的配置中心應用app
從零開始的高併發(六)--- Zookeeper的Master選舉及官網小覽框架
從零開始的高併發(七)--- RPC的介紹,協議及框架dom
其實這個在上一篇的2 - ① 也已經提到過了,若是忘了,不要緊,我再複製過來socket
stub:分佈式計算中的存根是一段代碼,它轉換在遠程過程調用期間Client和server之間傳遞的參數分佈式
1.客戶端處理過程當中調用client stub(就像調用本地方法同樣),傳入參數
2.Client stub將參數編組爲消息,而後經過系統調用向服務端發送消息
3.客戶端本地操做系統將消息從客戶端機器發送到服務端機器
4.服務端操做系統將接收到的數據包傳遞給client stub
5.server stub解組消息爲參數
6.server stub再調用服務端的過程,過程執行結果以反方向的相同步驟響應給客戶端
1.定義過程接口
2.服務端實現接口的整個過程
3.客戶端使用生成的stub代理對象
複製代碼
客戶端生成過程接口的代理對象,經過設計一個客戶端代理工廠,使用JDK動態代理便可生成接口的代理對象
Student類有三個屬性name(String),age(int),sex(String),節省篇幅就不貼代碼了,提供getter,setter和toString方法便可
public interface StudentService {
/**
* 獲取信息
* @return
*/
public Student getInfo();
//打印student的信息並返回一個boolean值
public boolean printInfo(Student student);
}
複製代碼
而且提供一個簡單的實現,其實就是打印一個Student的信息出來而已
@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {
public Student getInfo() {
Student person = new Student();
person.setAge(25);
person.setName("說出你的願望吧~");
person.setSex("男");
return person;
}
public boolean printInfo(Student person) {
if (person != null) {
System.out.println(person);
return true;
}
return false;
}
public static void main(String[] args) {
new Thread(()->{
System.out.println("111");
}).start();;
}
}
複製代碼
首先,客戶端經過咱們的本地代理,得到咱們的StudentService的代理類,此時咱們客戶端本地是確定不存在StudentService的實現的,此時尋址咱們是直接給出來了
public class ClientTest {
@Test
public void test() {
// 本地沒有接口實現,經過代理得到接口實現實例
RpcClientProxy proxy = new RpcClientProxy("192.168.80.1", 9998);
StudentService service = proxy.getProxy(StudentService.class);
System.out.println(service.getInfo());
Student student = new Student();
student.setAge(23);
student.setName("hashmap");
student.setSex("男");
System.out.println(service.printInfo(student));
}
}
複製代碼
此時咱們的關注點轉到客戶端是如何幫咱們進行代理的
/**
* RpcClientProxy
* 客戶端代理服務,客戶端往服務端發起的調用將經過客戶端代理來發起
*/
public class RpcClientProxy implements InvocationHandler{
private String host; // 服務端地址
private int port; // 服務端口號
public RpcClientProxy(String host, int port){
this.host = host;
this.port = port;
}
/**
* 生成業務接口的代理對象,代理對象作的事情,在invoke方法中。
* @param clazz 代理類型(接口)
* @return
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz){
// clazz 不是接口不能使用JDK動態代理
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
}
/**
* 動態代理作的事情,接口的實現不在本地,在網絡中的其餘進程中,咱們經過實現了Rpc客戶端的對象來發起遠程服務的調用。
*/
public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
// 調用前
System.out.println("執行遠程方法前,能夠作些事情");
// 調用遠程服務,須要封裝參數,相似於序列化的過程
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setParams(params);
// 連接服務器調用服務
RpcClient client = new RpcClient();
Object rst = client.start(request, host, port);
// 調用後
System.out.println("執行遠程方法後,也能夠作些事情");
return rst;
}
}
複製代碼
JDK提供了Proxy類來實現咱們的動態代理,能夠經過newProxyInstance(ClassLoader var0, Class<?>[] var1, InvocationHandler var2)方法來實例化一個代理對象,此時咱們傳入的參數clazz是規定必須爲一個接口的,若是不是接口就不能使用JDK動態代理
而第三個參數RpcClientProxy.this則是newProxyInstance()方法雖然幫咱們建立好了實例,可是建立實例完成後的具體動做必須由這個InvocationHandler來提供
InvocationHandler這個接口裏面僅僅只有一個 Object invoke(Object var1, Method var2, Object[] var3) throws Throwable,這個方法的參數相信不難理解,第一個是代理對象,第二個是執行的方法,第三個是所需的參數集
回到咱們剛剛的代碼,在我執行System.out.println(service.getInfo())這條語句的時候,咱們的邏輯就會跳到invoke()的實現中來,在invoke()方法的註釋中也把過程很詳細的說明了,首先咱們須要調用遠程服務了,進行一個參數的封裝,以後就進行一個網絡鏈接把這些參數發送給咱們的服務端,此時咱們須要用到RpcClient了
在start()方法中,咱們的RpcRequest request是實現了Serializable接口的,因此此時封裝好的數據會轉換成一個二進制而後被flush()過去,此時咱們消息已經發送了,須要等待服務端的響應,響應咱們就須要經過咱們的服務端ObjectOutputStream來接收一個輸入流
/**
* RpcClient
* Rpc客戶端,表明業務代碼做爲客戶端,往遠端服務發起請求。
*/
public class RpcClient {
/**
* 經過網絡IO,打開遠端服務鏈接,將請求數據寫入網絡中,並得到響應結果。
*
* @param request 將要發送的請求數據
* @param host 遠端服務域名或者ip地址
* @param port 遠端服務端口號
* @return 服務端響應結果
* @throws Throwable 拋出的異常
*/
public Object start(RpcRequest request, String host, int port) throws Throwable{
// 打開遠端服務鏈接
Socket server = new Socket(host, port);
ObjectInputStream oin = null;
ObjectOutputStream oout = null;
try {
// 1. 服務端輸出流,寫入請求數據,發送請求數據
oout = new ObjectOutputStream(server.getOutputStream());
oout.writeObject(request);
oout.flush();
// 2. 服務端輸入流,獲取返回數據,轉換參數類型
// 相似於反序列化的過程
oin = new ObjectInputStream(server.getInputStream());
Object res = oin.readObject();
RpcResponse response = null;
if(!(res instanceof RpcResponse)){
throw new InvalidClassException("返回參數不正確,應當爲:"+RpcResponse.class+" 類型");
}else{
response = (RpcResponse) res;
}
// 3. 返回服務端響應結果
if(response.getError() != null){ // 服務器產生異常
throw response.getError();
}
return response.getResult();
}finally{
try { // 清理資源,關閉流
if(oin != null) oin.close();
if(oout != null) oout.close();
if(server != null) server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
複製代碼
/**
* RpcRequest
* Rpc請求對象,請求遠端服務服務的內容,在網絡上進行傳輸。
*/
public class RpcRequest implements Serializable{
// 須要請求的類名
private String className;
// 需求請求的方法名
private String methodName;
// 請求方法的參數類型
private Class<?>[] paramTypes;
// 請求的參數值
private Object[] params;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
}
複製代碼
同時也是實現了JDK默認的序列化Serializable
/**
* RpcResponse
* Rpc服務端響應結果包裝類,在網絡上進行傳輸。
*/
public class RpcResponse implements Serializable {
// 可能拋出的異常
private Throwable error;
// 響應的內容或結果
private Object result;
public Throwable getError() {
return error;
}
public void setError(Throwable error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
複製代碼
public class ServerTest {
@Test
public void startServer() {
RpcServer server = new RpcServer();
server.start(9998, "rpc.simple.RpcServer");
}
public static void main(String[] args) {
}
}
複製代碼
給到一個端口號,參數中帶有一個包,功能是掃描某個包下的服務
建立一個Map類型的集合services存放掃描到提供rpc服務的類,此時由於沒有放在註冊中心上因此就不存在尋址了。後面將會把它放入zookeeper的註冊中心
getService()下,咱們在ServerTest不是提供了一個包名嗎,此時咱們先去找到了它們全部的classes(請參考getClasses()方法),getClasses()中咱們其實主要是先根據提供的包名往下找,要是目錄都有問題的話就拋出異常,若是沒問題,就開始遍歷此目錄下的全部文件,遍歷出來的結果若是發現這個文件是class文件,就把其實例化,而且進行判斷是否存在一個自定義註解@service,標註了這個註解的類就是RPC服務的實現類。若是存在這個註解,那就是咱們須要找的rpc服務,就把它裝到一個結果集classes中,若是目錄下面仍然是目錄,那就本身調用本身,直到看到class文件爲止
當咱們把全部的class都找到了,回到getService()方法下,就都集中放於一個classList中,而後把它們Map化,就是把接口的名稱做爲key,把實例做爲value(services.put(cla.getAnnotation(Service.class).value().getName(), obj))。
最後再回到start(),進行完服務掃描以後還會有一個RpcServerHandler來進行處理
/**
* RpcServer
* Rpc服務提供者
*/
public class RpcServer {
/**
* 啓動指定的網絡端口號服務,並監聽端口上的請求數據。得到請求數據之後將請求信息委派給服務處理器,放入線程池中執行。
* @param port 監聽端口
* @param clazz 服務類所在包名,多個用英文逗號隔開
*/
public void start(int port, String clazz) {
ServerSocket server = null;
try {
// 1. 建立服務端指定端口的socket鏈接
server = new ServerSocket(port);
// 2. 獲取全部rpc服務類
Map<String, Object> services = getService(clazz);
// 3. 建立線程池
Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
while(true){
// 4. 獲取客戶端鏈接
Socket client = server.accept();
// 5. 放入線程池中執行
RpcServerHandler service = new RpcServerHandler(client, services);
executor.execute(service);
}
} catch (IOException e) {
e.printStackTrace();
}finally{
//關閉監聽
if(server != null)
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 實例化全部rpc服務類,也可用於暴露服務信息到註冊中心。
* @param clazz 服務類所在包名,多個用英文逗號隔開
* @return
*/
public Map<String,Object> getService(String clazz){
try {
Map<String, Object> services = new HashMap<String, Object>();
// 獲取全部服務類
String[] clazzes = clazz.split(",");
List<Class<?>> classes = new ArrayList<Class<?>>();
for(String cl : clazzes){
List<Class<?>> classList = getClasses(cl);
classes.addAll(classList);
}
// 循環實例化
for(Class<?> cla:classes){
Object obj = cla.newInstance();
services.put(cla.getAnnotation(Service.class).value().getName(), obj);
}
return services;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取包下全部有@Sercive註解的類
* @param pckgname
* @return
* @throws ClassNotFoundException
*/
public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
// 須要查找的結果
List<Class<?>> classes = new ArrayList<Class<?>>();
// 找到指定的包目錄
File directory = null;
try {
ClassLoader cld = Thread.currentThread().getContextClassLoader();
if (cld == null)
throw new ClassNotFoundException("沒法獲取到ClassLoader");
String path = pckgname.replace('.', '/');
URL resource = cld.getResource(path);
if (resource == null)
throw new ClassNotFoundException("沒有這樣的資源:" + path);
directory = new File(resource.getFile());
} catch (NullPointerException x) {
throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一個有效的資源");
}
if (directory.exists()) {
// 獲取包目錄下的全部文件
String[] files = directory.list();
File[] fileList = directory.listFiles();
// 獲取包目錄下的全部文件
for (int i = 0; fileList != null && i < fileList.length; i++) {
File file = fileList[i];
//判斷是不是Class文件
if (file.isFile() && file.getName().endsWith(".class")) {
Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
if(clazz.getAnnotation(Service.class) != null){
classes.add(clazz);
}
}else if(file.isDirectory()){ //若是是目錄,遞歸查找
List<Class<?>> result = getClasses(pckgname+"."+file.getName());
if(result != null && result.size() != 0){
classes.addAll(result);
}
}
}
} else{
throw new ClassNotFoundException(pckgname + "不是一個有效的包名");
}
return classes;
}
}
複製代碼
和剛剛的RpcClient很是相似,都是序列化和反序列化的過程,主要是第三步中得到了實例和方法及其參數後,再調用invoke()方法而後把結果放入response的過程
/**
* RpcServerHandler
* 服務端請求處理,處理來自網絡IO的服務請求,並響應結果給網絡IO。
*/
public class RpcServerHandler implements Runnable {
// 客戶端網絡請求socket,能夠從中得到網絡請求信息
private Socket clientSocket;
// 服務端提供處理請求的類集合
private Map<String, Object> serviceMap;
/**
* @param client 客戶端socket
* @param services 全部服務
*/
public RpcServerHandler(Socket client, Map<String, Object> services) {
this.clientSocket = client;
this.serviceMap = services;
}
/**
* 讀取網絡中客戶端請求的信息,找到請求的方法,執行本地方法得到結果,寫入網絡IO輸出中。
*
*/
public void run() {
ObjectInputStream oin = null;
ObjectOutputStream oout = null;
RpcResponse response = new RpcResponse();
try {
// 1. 獲取流以待操做
oin = new ObjectInputStream(clientSocket.getInputStream());
oout = new ObjectOutputStream(clientSocket.getOutputStream());
// 2. 從網絡IO輸入流中請求數據,強轉參數類型
Object param = oin.readObject();
RpcRequest request = null;
if(!(param instanceof RpcRequest)){
response.setError(new Exception("參數錯誤"));
oout.writeObject(response);
oout.flush();
return;
}else{
// 反序列化RpcRequest
request = (RpcRequest) param;
}
// 3. 查找並執行服務方法
Object service = serviceMap.get(request.getClassName());
Class<?> clazz= service.getClass();
Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
Object result = method.invoke(service, request.getParams());
// 4. 返回RPC響應,序列化RpcResponse
response.setResult(result);
// 序列化結果
oout.writeObject(response);
oout.flush();
return;
} catch (Exception e) {
try { //異常處理
if(oout != null){
response.setError(e);
oout.writeObject(response);
oout.flush();
}
} catch (Exception e1) {
e1.printStackTrace();
}
return;
}finally{
try { // 回收資源,關閉流
if(oin != null) oin.close();
if(oout != null) oout.close();
if(clientSocket != null) clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
複製代碼
先開啓ServerTest再開啓ClientTest,簡單快捷,注意別去右鍵跑main方法便可
設計客戶端的時候,在ClientStubInvocationHandler中須要完成的兩件事爲編組消息和發送網絡請求,而將請求的內容編組爲消息這件事就交由客戶端的stub代理,它除了消息協議和網絡層的事務之外,可能還存在一個服務信息發現,此外消息協議可能也是會存在變化的,咱們也須要去支持多種協議,這個實際上是和框架對協議的支持廣度有關的。好比dubbo相對於spring cloud而言對協議的支持就相對靈活一些
此時咱們須要得知某服務用的是什麼協議,因此咱們須要引入一個服務發現者
咱們想要作到支持多種協議,類該如何設計(面向接口,策略模式,組合)
此時咱們的協議須要抽象出來,對於協議的內容須要進行編組和解組,好比咱們上面提供的JSON和HTTP兩種不一樣的實現,而此時客戶端的存根裏面就不只僅只是須要服務發現者,還須要咱們對於這個協議的支持
主要看regist()方法,咱們在註冊的時候把服務信息進行了拼接,並建立成臨時節點,父節點爲持久節點。servicePath是相似於dubbo的一個目錄結構,一個根目錄/rpc+服務名稱serviceName+service,獲取服務的方法loadServiceResouces()也不難,根據這些地址獲取它們下面的子節點,把全部的url加載出來給到調用者
public class RegistCenter {
ZkClient client = new ZkClient("localhost:2181");
private String centerRootPath = "/rpc";
public RegistCenter() {
client.setZkSerializer(new MyZkSerializer());
}
public void regist(ServiceResource serviceResource) {
String serviceName = serviceResource.getServiceName();
String uri = JsonMapper.toJsonString(serviceResource);
try {
uri = URLEncoder.encode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = centerRootPath + "/"+serviceName+"/service";
if(! client.exists(servicePath)) {
client.createPersistent(servicePath, true);
}
String uriPath = servicePath+"/"+uri;
client.createEphemeral(uriPath);
}
/**
* 加載配置中心中服務資源信息
* @param serviceName
* @return
*/
public List<ServiceResource> loadServiceResouces(String serviceName) {
String servicePath = centerRootPath + "/"+serviceName+"/service";
List<String> children = client.getChildren(servicePath);
List<ServiceResource> resources = new ArrayList<ServiceResource>();
for(String ch : children) {
try {
String deCh = URLDecoder.decode(ch, "UTF-8");
ServiceResource r = JsonMapper.fromJsonString(deCh, ServiceResource.class);
resources.add(r);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return resources;
}
private void sub(String serviceName, ChangeHandler handler) {
/*
String path = centerRootPath + "/"+serviceName+"/service";
client.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
handler();
}
});
client.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
handler();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
handler();
}
});
*/
}
interface ChangeHandler {
/**
* 發生變化後給一個完整的屬性對象
* @param resource
*/
void itemChange(ServiceResource resource);
}
}
複製代碼
/**
* ClientStubProxyFactory
* 客戶端存根代理工廠
*/
public class ClientStubProxyFactory {
private ServiceInfoDiscoverer sid;
private Map<String, MessageProtocol> supportMessageProtocols;
private NetClient netClient;
private Map<Class<?>, Object> objectCache = new HashMap<>();
/**
*
*
* @param <T>
* @param interf
* @return
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> interf) {
T obj = (T) this.objectCache.get(interf);
if (obj == null) {
obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
new ClientStubInvocationHandler(interf));
this.objectCache.put(interf, obj);
}
return obj;
}
public ServiceInfoDiscoverer getSid() {
return sid;
}
public void setSid(ServiceInfoDiscoverer sid) {
this.sid = sid;
}
public Map<String, MessageProtocol> getSupportMessageProtocols() {
return supportMessageProtocols;
}
public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
this.supportMessageProtocols = supportMessageProtocols;
}
public NetClient getNetClient() {
return netClient;
}
public void setNetClient(NetClient netClient) {
this.netClient = netClient;
}
/**
* ClientStubInvocationHandler
* 客戶端存根代理調用實現
* @date 2019年4月12日 下午2:38:30
*/
private class ClientStubInvocationHandler implements InvocationHandler {
private Class<?> interf;
public ClientStubInvocationHandler(Class<?> interf) {
super();
this.interf = interf;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 一、得到服務信息
String serviceName = this.interf.getName();
ServiceInfo sinfo = sid.getServiceInfo(serviceName);
if (sinfo == null) {
throw new Exception("遠程服務不存在!");
}
// 二、構造request對象
Request req = new Request();
req.setServiceName(sinfo.getName());
req.setMethod(method.getName());
req.setPrameterTypes(method.getParameterTypes());
req.setParameters(args);
// 三、協議層編組
// 得到該方法對應的協議
MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
// 編組請求
byte[] data = protocol.marshallingRequest(req);
// 四、調用網絡層發送請求
byte[] repData = netClient.sendRequest(data, sinfo);
// 5解組響應消息
Response rsp = protocol.unmarshallingResponse(repData);
// 六、結果處理
if (rsp.getException() != null) {
throw rsp.getException();
}
return rsp.getReturnValue();
}
}
}
複製代碼
ClientStub中有兩個引用,一個是服務發現接口ServiceInfoDiscoverer,做用爲根據服務名得到遠程服務信息,提供一個ServiceInfo getServiceInfo(String name)方法,還有就是對於不一樣協議的支持supportMessageProtocols,MessageProtocol咱們也是定義了一個接口,這個接口就須要比較詳細了,編碼成二級制,和解碼成Request等,對於response也是一樣這麼個過程
/**
* 通訊協議接口
* MessageProtocol
*/
public interface MessageProtocol {
/**
* 編組請求消息
* @param req
* @return
*/
byte[] marshallingRequest(Request req);
/**
* 解編組請求消息
* @param data
* @return
*/
Request unmarshallingRequest(byte[] data);
/**
* 編組響應消息
* @param rsp
* @return
*/
byte[] marshallingResponse(Response rsp);
/**
* 解編組響應消息
* @param data
* @return
*/
Response unmarshallingResponse(byte[] data);
}
複製代碼
此時又存在一些問題,單純依靠編組和解組的方法是不夠的,編組和解組的操做對象是請求,響應,可是它們的內容是不一樣的,此時咱們又須要定義框架標準的請求響應類
request有具體的服務名,服務方法,消息頭,參數類型和參數,一樣的response也有狀態(經過枚舉),消息頭,返回值及類型以及是否存在異常。
此時協議層擴展爲4個方法
將消息協議獨立爲一層,客戶端和服務端都須要使用
網絡層的工做主要是發送請求和得到響應,此時咱們若是須要發起網絡請求一定先要知道服務地址,此時咱們利用下圖中serviceInfo對象做爲必須依賴,setRequest()方法裏面會存在發送數據,還有發送給誰,此時給出了BIO和Netty兩種實現
因此咱們須要的三個依賴就都出來了,一個是服務發現者,一個是協議支持,再而後就是咱們網絡層的NetClient
紫色表明客戶端代理部分,淺綠色屬於服務發現,淺藍色屬於協議部分
由於這些代碼和主要的思路已經沒有瓜葛了,只是一些功能代碼,因此能夠直接忽略了。若是實在是想本身跑一下,也能夠問我要一個小樣。
能夠和內容二的RpcClientProxy作一個對比,在原有的基礎上加上了三個依賴ServiceInfoDiscoverer,supportMessageProtocols,netClient
在ClientStubProxyFactory中對Object作了一個緩存,若是已經存在這個緩存就直接返回,沒有的話加入到緩存中而後new出來,只是一個小小的不一樣。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 一、得到服務信息
String serviceName = this.interf.getName();
ServiceInfo sinfo = sid.getServiceInfo(serviceName);
if (sinfo == null) {
throw new Exception("遠程服務不存在!");
}
// 二、構造request對象
Request req = new Request();
req.setServiceName(sinfo.getName());
req.setMethod(method.getName());
req.setPrameterTypes(method.getParameterTypes());
req.setParameters(args);
// 三、協議層編組
// 得到該方法對應的協議
MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
// 編組請求
byte[] data = protocol.marshallingRequest(req);
// 四、調用網絡層發送請求
byte[] repData = netClient.sendRequest(data, sinfo);
// 五、解組響應消息
Response rsp = protocol.unmarshallingResponse(repData);
// 六、結果處理
if (rsp.getException() != null) {
throw rsp.getException();
}
return rsp.getReturnValue();
}
複製代碼
首先是服務發現,在咱們執行 ① 中提到的getProxy()方法時,此時代理的接口已經直接告訴咱們了,因此咱們就直接得到了接口信息interf,而後調用getName()方法獲取接口的名稱,經過接口名,調用服務發現者ServiceInfo提供的getServiceInfo()方法就能獲取服務的具體信息,而後放入請求參數request裏面,接下來給request的各個屬性賦值
以後咱們就開始尋找這個服務所對應的協議,得到協議以後能夠獲取協議支持對象,以後進行編組請求,轉換成二進制,經過netClient發送過去,順帶連同服務端信息給出去。獲取結果repData進行解組(二進制回到response),以後進行結果處理。
以前也提到了,服務發現者ServiceInfoDiscoverer是做爲一個接口提供了getServiceInfo()方法的
有兩種不一樣的實現,本地實現咱們能夠本身搞一個配置文件加載進來,把相關的服務信息弄進去得了
zookeeper的服務發現實現以下,相似於咱們一開始在2 - ① 中補充的zookeeper的內容
public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {
ZkClient client = new ZkClient("localhost:2181");
private String centerRootPath = "/rpc";
public ZookeeperServiceInfoDiscoverer() {
client.setZkSerializer(new MyZkSerializer());
}
public void regist(ServiceInfo serviceResource) {
String serviceName = serviceResource.getName();
String uri = JSON.toJSONString(serviceResource);
try {
uri = URLEncoder.encode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = centerRootPath + "/"+serviceName+"/service";
if(! client.exists(servicePath)) {
client.createPersistent(servicePath, true);
}
String uriPath = servicePath+"/"+uri;
client.createEphemeral(uriPath);
}
/**
* 加載配置中心中服務資源信息
* @param serviceName
* @return
*/
public List<ServiceInfo> loadServiceResouces(String serviceName) {
String servicePath = centerRootPath + "/"+serviceName+"/service";
List<String> children = client.getChildren(servicePath);
List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
for(String ch : children) {
try {
String deCh = URLDecoder.decode(ch, "UTF-8");
ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
resources.add(r);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return resources;
}
@Override
public ServiceInfo getServiceInfo(String name) {
List<ServiceInfo> list = loadServiceResouces(name);
ServiceInfo info = list.get(0);
list.forEach((e)->{
if(e != info) {
info.addAddress(e.getAddress().get(0));
}
});
return info;
}
}
複製代碼
這裏只實現了JSON的,經過fastJSON來實現
public class JSONMessageProtocol implements MessageProtocol {
@Override
public byte[] marshallingRequest(Request req) {
Request temp = new Request();
temp.setServiceName(req.getServiceName());
temp.setMethod(req.getMethod());
temp.setHeaders(req.getHeaders());
temp.setPrameterTypes(req.getPrameterTypes());
if (req.getParameters() != null) {
Object[] params = req.getParameters();
Object[] serizeParmas = new Object[params.length];
for (int i = 0; i < params.length; i++) {
serizeParmas[i] = JSON.toJSONString(params[i]);
}
temp.setParameters(serizeParmas);
}
return JSON.toJSONBytes(temp);
}
@Override
public Request unmarshallingRequest(byte[] data) {
Request req = JSON.parseObject(data, Request.class);
if(req.getParameters() != null) {
Object[] serizeParmas = req.getParameters();
Object[] params = new Object[serizeParmas.length];
for(int i = 0; i < serizeParmas.length; i++) {
Object param = JSON.parseObject(serizeParmas[i].toString(), Object.class);
params[i] = param;
}
req.setParameters(params);
}
return req;
}
@Override
public byte[] marshallingResponse(Response rsp) {
Response resp = new Response();
resp.setHeaders(rsp.getHeaders());
resp.setException(rsp.getException());
resp.setReturnValue(rsp.getReturnValue());
resp.setStatus(rsp.getStatus());
return JSON.toJSONBytes(resp);
}
@Override
public Response unmarshallingResponse(byte[] data) {
return JSON.parseObject(data, Response.class);
}
}
複製代碼
public class BioNetClient implements NetClient {
@Override
public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
List<String> addressList = sinfo.getAddress();
int randNum = new Random().nextInt(addressList.size());
String address = addressList.get(randNum);
String[] addInfoArray = address.split(":");
try {
return startSend(data, addInfoArray[0], Integer.valueOf(addInfoArray[1]));
} catch (Throwable e) {
e.printStackTrace();
}
return null;
}
/**
* 經過網絡IO,打開遠端服務鏈接,將請求數據寫入網絡中,並得到響應結果。
*
* @param requestData 將要發送的請求數據
* @param host 遠端服務域名或者ip地址
* @param port 遠端服務端口號
* @return 服務端響應結果
* @throws Throwable 拋出的異常
*/
private byte[] startSend(byte[] requestData, String host, int port) throws Throwable{
// 打開遠端服務鏈接
Socket serverSocket = new Socket(host, port);
InputStream in = null;
OutputStream out = null;
try {
// 1. 服務端輸出流,寫入請求數據,發送請求數據
out = serverSocket.getOutputStream();
out.write(requestData);
out.flush();
// 2. 服務端輸入流,獲取返回數據,轉換參數類型
// 相似於反序列化的過程
in = serverSocket.getInputStream();
byte[] res = new byte[1024];
int readLen = -1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while((readLen = in.read(res)) > 0) {
baos.write(res, 0, readLen);
}
return baos.toByteArray();
}finally{
try { // 清理資源,關閉流
if(in != null) in.close();
if(out != null) out.close();
if(serverSocket != null) serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
複製代碼
public class NettyNetClient implements NetClient {
private SendHandler sendHandler;
private Map<String, SendHandler> sendHandlerMap = new ConcurrentHashMap<String, SendHandler>();
@Override
public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
try {
List<String> addressList = sinfo.getAddress();
int randNum = new Random().nextInt(addressList.size());
String address = addressList.get(randNum);
String[] addInfoArray = address.split(":");
SendHandler handler = sendHandlerMap.get(address);
if(handler == null) {
sendHandler = new SendHandler(data);
new Thread(()->{
try {
connect(addInfoArray[0], Integer.valueOf(addInfoArray[1]));
} catch (NumberFormatException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
byte[] respData = (byte[]) sendHandler.rspData();
return respData;
} catch (NumberFormatException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void connect(String host, int port) throws Exception {
// 配置客戶端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//EchoClientHandler handler = new EchoClientHandler();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(sendHandler);
}
});
// 啓動客戶端鏈接
ChannelFuture f = b.connect(host, port).sync();
// 等待客戶端鏈接關閉
f.channel().closeFuture().sync();
} finally {
// 釋放線程組資源
group.shutdownGracefully();
}
}
}
複製代碼
能夠自行模擬一個消費者和一個生產者進行測試,這裏就不貼出來了
以後會繼續dubbo的內容
下一篇:從零開始的高併發(九)--- dubbo的核心功能及協議