RPC框架簡易實現

       RPC(Remote Procedure Call Protocol)——遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。
       RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。
有多種 RPC模式和執行。最初由 Sun 公司提出。IETF ONC 憲章從新修訂了 Sun 版本,使得 ONC RPC 協議成爲 IETF 標準協議。如今使用最廣泛的模式和執行是開放式軟件基礎的分佈式計算環境(DCE)。
                                                           -------------百度百科
     在簡單瞭解了RPC的相關概念後,咱們經過Java來開發一個簡易版的RPC框架,所涉及的知識包括java的套接字(Socket)+java反射+JDK動態代理
        常見的RPC實現由兩種方式,一種爲TCP(應用層爲HTTP),一種爲面向無鏈接的UDP。這裏咱們使用TCP來實現一個簡易版的RPC框架
        相關內容能夠參考阿里大神(dubbo的開發者之一)的內容:http://javatar.iteye.com/blog/1123915
 
        言歸正傳:所涉及的模塊包括 一、RPC服務器   二、接口    三、實現類   四、服務器提供者模塊    五、客戶端消費者模塊
 
一、RPC服務器+接口代理類
 
package com.jd.rpc;


import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * RpcFramework
 * 包含 RPC服務器+接口代理類
 *
 * @author william.liangf
 */
public class RpcFramework {

    /**
     * 暴露RPC服務
     *
     * @param service 服務實現
     * @param port 服務端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null)
            throw new IllegalArgumentException("service instance == null");
        if (port <= 0 || port > 65535)//端口範圍0~65535
            throw new IllegalArgumentException("Invalid port " + port);
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);
        ServerSocket server = new ServerSocket(port);
        for(;;) {
            try {
                final Socket socket = server.accept();//啓動服務端的Socket,等待客戶端來鏈接
                new Thread(new Runnable() {//每次鏈接都啓動一個全新的線程
                    @Override
                    public void run() {
                        try {
                            try {
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());//獲取socket傳遞過來的數據
                                try {
                                    String methodName = input.readUTF();//接口方法名
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();//接口與參數類型
                                    Object[] arguments = (Object[])input.readObject();//具體參數值
                                    ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());//獲取回傳output
                                    try {
                                        //經過java的反射,動態執行實現類service的方法
                                        Method method = service.getClass().getMethod(methodName, parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        //將調用結果回傳給調用方
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);//關閉
                                    } finally {
                                        output.close();//關閉
                                    }
                                } finally {
                                    input.close();//關閉
                                }
                            } finally {
                                socket.close();//關閉
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 引用服務,即接口代理類
     *
     * @param <T> 接口泛型
     * @param interfaceClass 接口類型
     * @param host 服務器主機名
     * @param port 服務器端口
     * @return 遠程服務
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
        if (interfaceClass == null)
            throw new IllegalArgumentException("Interface class == null");
        if (! interfaceClass.isInterface())
            throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
        if (host == null || host.length() == 0)
            throw new IllegalArgumentException("Host == null!");
        if (port <= 0 || port > 65535)
            throw new IllegalArgumentException("Invalid port " + port);
        System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
        //建立JDK 動態代理類,做爲接口的代理類
        //其中最主要的是在此接口代理類中 須要執行套接字的相關內容,以便發送相關的套接字內容
        //其中最主要的是在代理類中實現套接字請求
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                //套接字鏈接
                Socket socket = new Socket(host, port);
                try {
                    //對象輸出流
                    ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                    try {
                        output.writeUTF(method.getName());//傳入參數 方法名稱
                        output.writeObject(method.getParameterTypes());//傳入參數 參數類型
                        output.writeObject(arguments);//傳入參數 參數對象
                        ObjectInputStream input = new ObjectInputStream(socket.getInputStream());//接收RPC服務器回傳回來的內容
                        try {
                            Object result = input.readObject();
                            if (result instanceof Throwable) {
                                throw (Throwable) result;
                            }
                            return result;//返回結果對象
                        } finally {
                            input.close();//關閉
                        }
                    } finally {
                        output.close();//關閉
                    }
                } finally {
                    socket.close();//關閉
                }
            }
        });
    }


}

  

    
  二、定義接口服務
 
/**
 *  定義接口服務
 *
 * @author william.liangf
 */
public interface HelloService {

    String hello(String name);

}

  

 三、接口服務實現類java

package com.jd.rpc;

/**
 *接口服務實現類
 *
 * @author william.liangf
 */
public class HelloServiceImpl implements HelloService {

    public String hello(String name) {
        return "Hello " + name;
    }

}

 

 三、RPC 服務提供者服務器

package com.jd.rpc;


/**
 * RpcProvider
 * RPC 服務提供者 能夠理解爲生產者
 *
 * @author william.liangf
 */
public class RpcProvider {

    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        RpcFramework.export(service, 1234);
    }

}

  

四、RPC服務調用者網絡

package com.jd.rpc;


/**
 * RpcConsumer
 * RPC服務調用者 能夠理解爲消費者
 *
 * @author william.liangf
 */
public class RpcConsumer {

    public static void main(String[] args) throws Exception {
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
        for (int i = 0; i < Integer.MAX_VALUE; i ++) {
            String hello = service.hello("World" + i);
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }

}

  

 

 

   相關內容能夠參考阿里大神(dubbo的開發者之一)的內容:http://javatar.iteye.com/blog/1123915框架

相關文章
相關標籤/搜索