分佈式架構的基石.簡單的 RPC 框架實現(JAVA)

  前言java

  RPC 的全稱是 Remote Procedure Call,它是一種進程間通訊方式。容許像調用本地服務同樣調用遠程服務。架構

  學習來源:《分佈式系統架構:原理與實踐》 - 李林鋒框架

 

  1.RPC 框架原理socket

  RPC 框架的目標就是讓遠程過程(服務)調用更加簡單、透明,RPC框架負責屏蔽底層的傳輸方式TCP 或者 UDP)、序列化方式XML、JSON、二進制)和通訊細節。分佈式

  框架使用者只須要了解誰在什麼位置,提供了什麼樣的遠程服務接口便可,開發者不須要關心底層通訊細節和調用過程。ide

 

  2.最簡單的 RPC 框架實現post

·  下面經過 JAVA 原生的序列化TCP Socket通訊、動態代理反射機制,實現最簡單的 RPC 框架。它由三部分組成:學習

  • 服務提供者:它運行在服務端,負責提供服務接口定義和服務實現類。(EchoServiceEchoServiceImpl
  • 服務發佈者,它運行在 RPC 服務端,負責將本地服務發佈成遠程服務,供其餘消費者調用。(RPCExporter
  • 本地服務代理,它運行在 RPC 客戶端,經過代理調用遠程服務提供者,而後將結果進行封裝返回給本地消費者。(RPCImporter

 

  下面看具體代碼,首先是服務端接口定義和服務實現類。測試

  代碼清單 :EchoService this

package com.rpc.test;

/**
 * @Description - 調用接口
 * @Author zww
 * @Date 2018/12/10 17:29
 */
public interface EchoService {
    String echo(String ping);
}

  代碼清單:EchoServiceImpl

package com.rpc.test;

/**
 * @Description - 調用接口實現
 * @Author zww
 * @Date 2018/12/10 17:30
 */
public class EchoServiceImpl implements EchoService {
    @Override
    public String echo(String ping) {
        return ping != null ? ping + "挺不錯的。" : "挺不錯的。";
    }
}

  

  RPC 服務端發佈者代碼實現以下:

  代碼清單:RPCExporter

package com.rpc.test;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @Description - 服務端發佈者(提供服務)
 * @Author zww
 * @Date 2018/12/10 17:33
 */
public class RPCExporter {
    //線程池
    static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public static void exporter(String hostName, int port) throws Exception {
        ServerSocket server = new ServerSocket(); //店家
        server.bind(new InetSocketAddress(hostName, port)); //開店地址
        try {
            while (true) { //開啓營業模式
                executor.execute(new ExporterTask(server.accept())); //accept : 來客人了
            }
        } finally {
            server.close();
        }
    }

    //根據約定規則解析請求,返回結果
    private static class ExporterTask implements Runnable {
        Socket client = null; //客戶
        public ExporterTask(Socket client) {
            this.client = client;
        }

        @Override
        public void run() { 
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            try {
                System.out.println("老闆娘:誒,來咯!您要點什麼!");
                inputStream = new ObjectInputStream(client.getInputStream()); //接收請求
                System.out.println("老闆娘:要個回鍋肉!");
                String interfaceName = inputStream.readUTF(); Class<?> service = Class.forName(interfaceName);
                System.out.println("老闆娘:微辣!");
                String methodName = inputStream.readUTF();
                System.out.println("老闆娘:少油!");
                Class<?>[] parameterType = (Class<?>[])inputStream.readObject();
                System.out.println("老闆娘:別放香菜!");
                Object[] arguments = (Object[])inputStream.readObject();
                System.out.println("老闆娘:作菜快點!");
                Method method = service.getMethod(methodName, parameterType); Object result = method.invoke(service.newInstance(), arguments);
                System.out.println("老闆娘:老頭子,聽清沒!");
                System.out.println("老闆悶頭作菜中!!!!");
                System.out.println("老闆娘:帥哥,你的菜好了!");
               outputStream = new ObjectOutputStream(client.getOutputStream()); //返回結果 outputStream.writeObject(result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (outputStream != null) {
                    try {
                        outputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (client != null) {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

  服務發佈者的主要職責

  • 監聽客戶端的 TCP 鏈接,接收到新的客戶端鏈接以後,將其封裝成 Task,由線程池執行。
  • 將客戶端發送的碼流反序列化成對象,反射調用服務實現者,獲取執行結果。
  • 將執行結果反序列化,經過 Socket 發送給客戶端。
  • 遠程服務調用完成後,釋放 Socket 等鏈接資源,防止句柄泄露。

 

  RPC 客戶端本地服務代理代碼:

  代碼清單:RPCImporter

package com.rpc.test;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * @Description -  路人(請求服務)
 * @Author zww
 * @Date 2018/12/11 10:31
 */
public class RPCImporter<S> {

    public S importer(final Class<?> serviceClass, final InetSocketAddress address) {
        //啓用遠端代理
        return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{ serviceClass.getInterfaces()[0] }, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = null;
                ObjectOutputStream outputStream = null;
                ObjectInputStream inputStream = null;
                try {
                    socket = new Socket();
                    socket.connect(address);
                    //使用 TCP 方式請求遠端方法,如下爲約定的傳輸方式
                    System.out.println("路人:老闆娘,點菜咯!");
 outputStream = new ObjectOutputStream(socket.getOutputStream()); //發送請求                     System.out.println("路人:要個回鍋肉");
 outputStream.writeUTF(serviceClass.getName());                     System.out.println("路人:微辣!");
 outputStream.writeUTF(method.getName());                     System.out.println("路人:少油!");
 outputStream.writeObject(method.getParameterTypes());                     System.out.println("路人:別放香菜!");
 outputStream.writeObject(args);                     System.out.println("路人:上菜快點!");
 inputStream = new ObjectInputStream(socket.getInputStream()); //獲取結果                     System.out.println("路人:吧唧吧唧!");
                    return inputStream.readObject();                 } finally {
                    if (socket != null) socket.close();
                }
            }
        });
    }

}

  本地服務代理的主要功能以下:

  • 將本地的接口調用轉換成 JDK 的動態代理,在動態代理中實現接口的遠程調用。
  • 建立 Socket 客戶端,根據指定地址連接遠程服務提供者。
  • 將遠程服務調用所需的接口類、方法名、參數列表、返回參數 等編碼後發送給服務提供者。
  • 同步阻塞服務端返回應答,獲取應答以後返回。

 

  最後:編寫測試代碼,並看看執行結果

package com.rpc.test;

import java.net.InetSocketAddress;

/**
 * 測試類
 */
public class TestApplication {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //啓用服務提供端(設置 地址端口)
                    RPCExporter.exporter("localhost", 8080);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //發起服務請求
        RPCImporter<EchoService> importer = new RPCImporter<>(); //使用遠端代理(訪問 地址端口)
        EchoService echo = importer.importer(EchoServiceImpl.class, new InetSocketAddress("localhost", 8080)); System.out.println(echo.echo("這家店味道咋樣? \n"));
    }
}

  執行測試結果:

Connected to the target VM, address: '127.0.0.1:57656', transport: 'socket'
路人:老闆娘,點菜咯!
老闆娘:誒,來咯!您要點什麼!
路人:要個回鍋肉
路人:微辣!
路人:少油!
路人:別放香菜!
路人:上菜快點!
老闆娘:要個回鍋肉!
老闆娘:微辣!
老闆娘:少油!
老闆娘:別放香菜!
老闆娘:作菜快點!
老闆娘:老頭子,聽清沒!
老闆悶頭作菜中!!!!
老闆娘:帥哥,你的菜好了!
路人:吧唧吧唧!味道不錯
這家店味道咋樣? 
挺不錯的。
相關文章
相關標籤/搜索