JAVA NIO 編程入門(三)

1、前期回顧

上一篇文章JAVA NIO 編程入門(二)學習了NIO的彙集和分散,以及選擇器的使用,並經過一個小demo進行實戰,演示瞭如何進行分散和彙集,以及其主要使用場景,本文將是NIO編程入門最後一篇,進行一個RPC簡單小demo框架的實現,對前面的知識進行總結性的實戰,因爲只是演示性質的demo,因此RPC功能並無考慮很完善,也不涉及到性能等問題考慮。編程

2、什麼是RPC

RPC 英文全稱 Remote Procedure Calls,翻譯過來就是遠程過程調用,是分佈式系統中不一樣節點間流行的通訊方式。舉例:假設有A服務和B服務分別位於不一樣的服務器,A服務想調用B服務像調用本地方法同樣,這個時候就須要藉助RPC方式進行調用。json

3、RPC實現

RPC由三個主要部分組成,服務提供者,服務消費者,服務註冊中心,服務註冊中心提供服務提供者註冊服務。客戶端和服務端的交互協議採用json的形式,方便演示,同時考慮到複雜性,本次RPC不利用匯集和分散進行協議設計。bash

  • RPC調用過程解析

  • 請求實體類源碼
@Data
public class RpcRquest {
    /**請求id*/
    private String requestId;
    /**請求接口名*/
    private String interfaceName;
    /**服務版本**/
    private String serviceVersion;
    /**方法名*/
    private String methodName;
   /**參數類型*/
    private Class<?>[] parameterTypes;
    /**參數*/
    private Object[] parameters;
}
複製代碼
  • 返回實體類源碼
@Data
public class RpcResponse {
    /**請求流水號*/
    private String requestId;
    /**異常*/
    private Exception exception;
    /**返回結果**/
    private Object result;
}
複製代碼
  • 服務發現源碼
public class RpcRegister {
    /**存儲註冊的服務提供實現類*/
    private HashMap<String, Object> registMap = new HashMap<>();
    private  static  RpcRegister register=new RpcRegister();
    public static  RpcRegister buildRegist(){
        return register;
    }
    public RpcRegister regist(String interfaceName,Object obj){
        registMap.put(interfaceName,obj);
        return this;
    }
    public Object findServier(String interfaceName){
        return  registMap.get(interfaceName);
    }
}
複製代碼

這裏利用一個map存提供服務的實例,後續再在服務端只須要經過接口就能夠查找到對應的實現類。服務器

  • 服務提供者源碼
public class ProviderServer implements Runnable {
    /**
     * 服務提供端口
     */
    private int port;


    public ProviderServer(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        try {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                int readyChannels = selector.selectNow();
                if (readyChannels == 0) continue;
                Set selectedKeys = selector.selectedKeys();
                Iterator keyIterator = selectedKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keyIterator.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = serverSocketChannel1.accept();
                        ByteBuffer buf1 = ByteBuffer.allocate(2048);
                        socketChannel.read(buf1);
                        buf1.flip();
                        String reciveStr = new String(buf1.array());
                        if (buf1.hasRemaining()) {
                            System.out.println(">>>服務端收到數據:" + reciveStr);
                            //判斷接受的內容是否有結束符,若是有,說明是一個請求結束。
                            if (reciveStr.contains(RpcConstant.PROTOCOL_END)) {
                                RpcRquest req = JSONObject.parseObject(reciveStr.replace(RpcConstant.PROTOCOL_END, ""), RpcRquest.class);
                                RpcResponse res = new RpcResponse();
                                res.setRequestId(req.getRequestId());
                                System.out.println(req.toString());
                                Class<?> remoteInterface = Class.forName(req.getInterfaceName());
                                Method method = remoteInterface.getMethod(req.getMethodName(), req.getParameterTypes());
                                if (null != method) {
                                    Object obj = method.invoke(RpcRegister.buildRegist().findServier(req.getInterfaceName()), req.getParameters());
                                    res.setException(null);
                                    res.setResult(obj);
                                }
                                buf1.clear();
                                buf1.put(JSONObject.toJSON(res).toString().getBytes());
                                buf1.flip();
                                socketChannel.write(buf1);
                            }
                        }
                        socketChannel.close();
                    } else if (key.isConnectable()) {
                    } else if (key.isReadable()) {
                    } else if (key.isWritable()) {

                    }
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }

    }

}
複製代碼

這部分源碼在《JAVA NIO 編程入門(二)》的基礎上增長了反射的部份內容,主要根據接口調用協議,生成客戶端須要調用的方法,進行調用,而後將結果返回。框架

  • 初始化工廠類
public class RpcInitFactory {
    /**
     * 客戶端鏈接遠程ip地址
     **/
    private String ip;
    /***遠程端口*/
    private int port;


    public RpcInitFactory(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }
}
複製代碼
  • 通用客戶端
@Data
public  class CommonClient {
    private  RpcInitFactory factory;

    public CommonClient(RpcInitFactory factory) {
        this.factory = factory;
    }

    public <T> T invoke(RpcRquest req) {
        RpcResponse response = null;
        req.setRequestId(UUID.randomUUID().toString());
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress(factory.getIp(), factory.getPort()));
            ByteBuffer buf1 = ByteBuffer.allocate(2048);
            buf1.put(JSONObject.toJSON(req).toString().getBytes());
            buf1.put(RpcConstant.PROTOCOL_END.getBytes());
            buf1.flip();
            if (buf1.hasRemaining())
                socketChannel.write(buf1);
            buf1.clear();

            ByteBuffer body = ByteBuffer.allocate(2048);
            socketChannel.read(body);
            body.flip();
            if (body.hasRemaining()) {
                response = JSONObject.parseObject(new String(body.array()), RpcResponse.class);
            }
            body.clear();
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return (T) response;
    }
複製代碼
  • 調用客戶端接口
public interface Idemo {

    /***加法**/
    public Integer add(Integer i,Integer j);
}
複製代碼
  • 客戶端接口實現類
public class DemoRemoteImpl implements Idemo {


    private  CommonClient client;

    public DemoRemoteImpl(CommonClient client) {
        this.client = client;
    }

    @Override
    public Integer add(Integer i, Integer j) {
        //構造rpc請求實體類
        RpcRquest rpcRquest=new RpcRquest();
        //設置版本號
        rpcRquest.setServiceVersion("123");
        //設置調用的接口名稱
        rpcRquest.setInterfaceName(Idemo.class.getName());
        //設置調用方法名稱
        rpcRquest.setMethodName("add");
        //設置參數
        rpcRquest.setParameters(new Integer[] {i,j});
        //設置參數類型
        rpcRquest.setParameterTypes(new Class[] {Integer.class,Integer.class});
        //進行遠程調用
        RpcResponse response=  client.invoke(rpcRquest);
        if (null!=response){
            return Integer.parseInt(response.getResult().toString());
        }
        return null;
    }

}
複製代碼
  • 服務端接口實現類
public class DemoImp implements Idemo{
    @Override
    public Integer add(Integer i, Integer j) {
        return i+j;
    }
}
複製代碼

測試dom

  • 啓動服務端
public static void main(String[] args) {
        ProviderServer server = new ProviderServer(8090);
        RpcRegister.buildRegist().regist(Idemo.class.getName(), new DemoImp());
        new Thread(server).start();
    }
複製代碼
  • 啓動客戶端
public static void main(String[] args) {
        RpcInitFactory initFactory=  new RpcInitFactory("127.0.0.1",8090);
        Idemo demo = new DemoRemoteImpl(new CommonClient(initFactory));
        System.out.println(demo.add(2, 1));
    }
複製代碼
  • 結果:

4、總結

到這裏RPC的小demo功能實現完畢,實際上的RPC框架要比這個複雜的多,真正的RPC框架要考慮性能,高可用,半包,粘包等問題,這裏只是給出了一個RPC框架的實現原理,便於理解RPC框架的實現,並不能真正用於生產環境。socket

推薦閱讀

Java鎖之ReentrantLock(一)分佈式

Java鎖之ReentrantLock(二)ide

Java鎖之ReentrantReadWriteLockpost

JAVA NIO編程入門(一)

JAVA NIO 編程入門(二)

相關文章
相關標籤/搜索