RPC遠程過程調用學習之路(一):用最原始代碼還原PRC框架

 

RPC: Remote Procedure Call 遠程過程調用,即業務的具體實現不是在本身系統中,須要從其餘系統中進行調用實現,因此在系統間進行數據交互時常常使用。html

rpc的實現方式有不少,能夠經過http和tcp協議進行實現java

經過http協議的主要有:  web

  1. webService    能夠參考我以前的博客  WebService 學習之路(一):瞭解並使用webService

                                                            webService學習之路(二):springMVC集成CXF快速發佈webServicespring

                                                            webService學習之路(三):springMVC集成CXF後調用已知的wsdl接口編程

  1. restful           能夠參考我以前的博客  Restful 介紹及SpringMVC+restful 實例講解

 

而今天要講的是經過TCP協議實現的遠程調用。服務器

 

爲啥已經掌握了webservice和restful等經過http協議實現rpc技術外,還要研究tcp協議實現rpc呢?restful

由於網絡七層協議中,http位於tcp之上,而從傳輸上而言,越底層同等條件下傳輸速度更快網絡

另外影響rpc調用的除了傳輸方式外,另外一個就是序列化,而java的阻塞式IO每每成爲瓶頸,因此這裏設計到了NIO,框架

NIO知識點就多了,請本身搜索學習。socket

 

言歸正傳,今天不借助其餘仁和框架,用簡單的代碼還原rpc的過程。

大體能夠分爲下面幾部(先了解過程,再看代碼更容易理解):

  1. 書寫好服務接口和實現,就是咱們項目中的業務層,看着service,service.imp就熟悉了 o(∩_∩)o
  2. 把1寫好的接口暴露給其餘系統,以便調用
  3. 根據暴露了接口的地址和接口信息,進行調用

是否是感受和調用本地的service同樣, 最終就是要達到的這個效果。

 

文采很差,就要開始貼代碼了:

 

首先是寫接口和接口的實現類,和平時看見的、寫的沒任何區別

接口定義

package com.xiaochangwei.rpc;

public interface RpcTestService {
    String testRpcCall(int count);
}

接口實現類

package com.xiaochangwei.rpc;

import java.text.SimpleDateFormat;
import java.util.Date;

public class RpcTestServiceImpl implements RpcTestService {

    public final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:sss");
    
    @Override
    public String testRpcCall(int count) {
        return dateFormat.format(new Date())+" 調用rpc次數爲:" + count;
    }

}

 

而後就是rpc的精髓了,怎麼把服務暴露給其餘系統的

其實說白了就是使用java自帶的

import java.net.ServerSocket;
import java.net.Socket;

網絡編程相關的東西socket和對應的IO,由於咱們要向服務器發送請求,而後服務器又要返回數據給請求方,

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

同時還要用到反射,由於咱們不可能每一個接口實現類的暴露都去寫一套方法吧,得共通化吧, 先簡單理解爲泛型把    代碼中看見過Class T 吧 o(∩_∩)o 

先簡單理解爲過程和socket調用過程同樣吧:

  1. 根據約定的端口,服務端起一個ServerSocket,並一直監聽該端口
  2. 監聽到有請求時,server端經過inputStream取得請求的相關信息
  3. 根據請求信息調用相應方法處理,並返回結果

 

簡易PRC框架

package com.xiaochangwei.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * rpc簡易框架
 */
public class RpcFramework {

    /**
     * 暴露服務
     */
    @SuppressWarnings("resource")
    public static void export(final Object service, int port) throws Exception {
        if (service == null)
            throw new IllegalArgumentException("服務實例爲空");
        if (port <= 0 || port > 65535)
            throw new IllegalArgumentException("端口號不正確");
        System.out.println("經過端口 [" + port +"] 暴露服務[" + service.getClass().getName() + "]" );
        
        ServerSocket server = new ServerSocket(port);
        while(true){
            try {
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {
                                ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                                try {
                                    String methodName = inputStream.readUTF();
                                    Class<?>[] parameterTypes = (Class<?>[]) inputStream.readObject();
                                    Object[] arguments = (Object[]) inputStream.readObject();
                                    ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                    try {
                                        Method method = service.getClass().getMethod(methodName,parameterTypes);
                                        Object result = method.invoke(service,arguments);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    inputStream.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 引用服務
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host,
            final int port) throws Exception {
        if (interfaceClass == null)
            throw new IllegalArgumentException("接口爲空");
        if (!interfaceClass.isInterface())
            throw new IllegalArgumentException(interfaceClass.getName() + " 不是接口");
        if (host == null || host.length() == 0)
            throw new IllegalArgumentException("主機地址爲空");
        if (port <= 0 || port > 65535)
            throw new IllegalArgumentException("端口不正確" + port);
        System.out.println("從服務器[" + host + ":" + port + "]取得遠程服務[" + interfaceClass.getName() + "]" );
        
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[] { interfaceClass }, new InvocationHandler() {
                    public Object invoke(Object proxy, Method method,Object[] arguments) throws Throwable {
                        Socket socket = new Socket(host, port);
                        try {
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                output.writeUTF(method.getName());
                                output.writeObject(method.getParameterTypes());
                                output.writeObject(arguments);
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                try {
                                    Object result = input.readObject();
                                    if (result instanceof Throwable) {
                                        throw (Throwable) result;
                                    }
                                    return result;
                                } finally {
                                    input.close();
                                }
                            } finally {
                                output.close();
                            }
                        } finally {
                            socket.close();
                        }
                    }
                });
    }

    /**
     * 暴露RPC服務主調函數
     */
    public static void main(String[] args) throws Exception {
        RpcTestService rpcTestService = new RpcTestServiceImpl();
        RpcFramework.export(rpcTestService, 3125);
    }
}

指定其中的main後,咱們的接口就經過指定的端口暴露給其餘系統了

 

其餘系統調用也很簡單

package com.xiaochangwei.rpc;

public class RpcConsumer {
    public static void main(String[] args) throws Exception {
        RpcTestService rpcTestService = RpcFramework.refer(RpcTestService.class,"127.0.0.1", 3125);
        for (int i = 0; i < 10; i++) {
            String response = rpcTestService.testRpcCall(i);
            System.out.println(response);
            Thread.sleep(1000);
        }
    }
}

 

是否是感受和本地調用同樣 o(∩_∩)o 

看下效果吧,先暴露接口,再調用

 

執行調用

測試發現,調用是成功的 o(∩_∩)o  

相關文章
相關標籤/搜索