【遠程調用框架】如何實現一個簡單的RPC框架(四)優化二:改變底層通訊框架

【如何實現一個簡單的RPC框架】系列文章:java

【遠程調用框架】如何實現一個簡單的RPC框架(一)想法與設計 
【遠程調用框架】如何實現一個簡單的RPC框架(二)實現與使用 
【遠程調用框架】如何實現一個簡單的RPC框架(三)優化一:利用動態代理改變用戶服務調用方式 
【遠程調用框架】如何實現一個簡單的RPC框架(四)優化二:改變底層通訊框架 
【遠程調用框架】如何實現一個簡單的RPC框架(五)優化三:軟負載中心設計與實現 
第一個優化以及第二個優化修改後的工程代碼可下載資源 如何實現一個簡單的RPC框架編程

 

 

二、優化二: 改變底層通訊框架

簡單socket通訊BIO方式-》-》NIO方式-》使用netty服務框架 
關於這部分,能夠提早閱讀下博客《Java NIO BIO AIO總結》bootstrap

2.1 目的

問題描述:在目前的服務框架版本中,服務發佈端和服務調用端採用的IO通訊模式爲BIO,即便用最基礎的Java Socket編程的方式。看過咱們以前實現介紹部分的讀者都知道,服務端一直在監聽請求,每當有一個請求發來,則會建立一個新的線程去處理該請求,以下代碼:api

while (true){
    Socket socket = serverSocket.accept();
    new Thread(new ServerProcessThread(socket)).start();//開啓新的線程進行鏈接請求的處理
}
  • 1
  • 2
  • 3
  • 4

而ServerProcessThread線程完成了服務的調用及結果的返回工做,這樣的方法,有如下兩個弊端:數組

  • (1)對於每個socket鏈接都建立一個線程去維護,當鏈接逐漸增多的狀況下,建立的線程數隨之增長,對虛擬機形成必定壓力。
  • (2)這種方式爲阻塞式IO,即數據的讀寫是阻塞的,在沒有有效可讀/可寫數據的狀況下,線程會一直阻塞,形成資源的浪費。 
    所以爲了解決上述兩個弊端,咱們改變這種IO模式。服務器

  • step1.使用selector+channel+buffer實現NIO模式(參考博客《Java NIO BIO AIO總結》
    NIO的模式有兩個特色: 
    (1)不用對全部鏈接都建立新的線程去維護,selector線程能夠管理多個數據通道; 
    (2)IO數據讀寫是非阻塞的,只有當出現有效讀寫數據時纔會出發相應的事件進行讀寫,節約資源。網絡

  • step2.使用netty/mina的框架來實現。

2.2 實現

2.2.1 NIO模式

關於NIO模式的基本客戶端與服務端的實現代碼在博客《Java NIO BIO AIO總結》中已經進行了介紹。這裏我對LCRPC框架代碼的改造即利用該博客中的代碼。僅做爲NIO通訊模式的使用示例,由於還有好多能夠修改的地方。併發

  • (1)服務發佈端的代碼修改 
    這裏咱們爲接口IProviderService添加一個方法:startListenByNIO,該方法實現採用NIO的模式進行服務的監聽,與之對應的是該接口中的startListen方法使用BIO的模式進行服務的監聽,即咱們初版本中的內容。startListenByNIO方法的實現代碼以下:
@Override
public boolean startLisetenByNIO() {
    new Thread(new NIOServerThread()).start();
    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5

該方法開啓新的線程,採用NIO的模式進行服務的監聽。線程類NIOServerThread的代碼與博客《Java NIO BIO AIO總結》介紹的一致,只是read事件的觸發方法代碼有所改動。該類的代碼以下:app

public class NIOServerThread extends NIOBase implements Runnable{


    @Override
    public void run() {
        try {
            initSelector();//初始化通道管理器Selector
            initServer(Constant.IP,Constant.PORT);//初始化ServerSocketChannel,開啓監聽
            listen();//輪詢處理Selector選中的事件
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

    }

    /**
     * 初始化 該線程中的通道管理器Selector
     */
    public void initSelector() throws IOException {
        this.selector = Selector.open();
    }


    /**
     * 採用輪詢的方式監聽selector上是否有須要處理的事件,若是有,則循環處理
     * 這裏主要監聽鏈接事件以及讀事件
     */
    public void listen() throws IOException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
        System.out.println("監聽成功,可開始進行服務註冊!");
        //輪詢訪問select
        while(true){
            //當註冊的事件到達時,方法返回;不然將一直阻塞
            selector.select();
            //得到selector中選中的項的迭代器,選中的項爲註冊的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            //循環處理註冊事件
            /**
             * 一共有四種事件:
             * 1. 服務端接收客戶端鏈接事件: SelectionKey.OP_ACCEPT
             * 2. 客戶端鏈接服務端事件:    SelectionKey.OP_CONNECT
             * 3. 讀事件:                SelectionKey.OP_READ
             * 4. 寫事件:                SelectionKey.OP_WRITE
             */
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //手動刪除已選的key,以防重複處理
                iterator.remove();
                //判斷事件性質
                if (key.isAcceptable()){//服務端接收客戶端鏈接事件
                    accept(key);
                }else if (key.isReadable()){//讀事件
                    read(key);
                }
            }
        }
    }

    /**
     * 得到一個ServerSocket通道,並經過port對其進行初始化
     * @param port    監聽的端口號
     */
    private void initServer(String ip,int port) throws IOException {
        //step1. 得到一個ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //step2. 初始化工做
        serverSocketChannel.configureBlocking(false);//設置通道爲非阻塞
        serverSocketChannel.socket().bind(new InetSocketAddress(ip,port));

        //step3. 將該channel註冊到Selector上,併爲該通道註冊SelectionKey.OP_ACCEPT事件
        //這樣一來,當有"服務端接收客戶端鏈接"事件到達時,selector.select()方法會返回,不然將一直阻塞
        serverSocketChannel.register(this.selector,SelectionKey.OP_ACCEPT);
    }


    /**
     * 當監聽到服務端接收客戶端鏈接事件後的處理函數
     * @param key 事件key,能夠從key中獲取channel,完成事件的處理
     */
    public void accept(SelectionKey key) throws IOException {

        //step1. 獲取serverSocketChannel
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        //step2. 得到和客戶端鏈接的socketChannel
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);//設置爲非阻塞
        //step3. 註冊該socketChannel
        socketChannel.register(selector,SelectionKey.OP_READ);//爲了接收客戶端的消息,註冊讀事件
    }

    public void read(SelectionKey key) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        byte[] result = getReadData(key);
        if (result == null) return;
        SocketChannel socketChannel = (SocketChannel) key.channel();

        LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(result);
        IProviderService providerService = new ProviderServiceImpl();
        //將結果寫回
        socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO))));
//        socketChannel.close();//關閉
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113

類NIOBase爲基類。代碼以下:框架

public class NIOBase {

    // 線程中的通道管理器
    public Selector selector;


    /**
     * 初始化 該線程中的通道管理器Selector
     */
    public void initSelector() throws IOException {
        this.selector = Selector.open();
    }



    public byte[] getReadData(SelectionKey key) throws IOException {

        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        int len = socketChannel.read(byteBuffer);
        if (len == -1){
            socketChannel.close();
            return null;//說明鏈接已經斷開
        }
        int lenth = 0;
        List<byte[]> list = new ArrayList<>();
        while (len > 0){
            lenth += len;
            byteBuffer.flip();
            byte[] arr = new byte[len];

            byteBuffer.get(arr,0,len);
            list.add(arr);
            byteBuffer.clear();
            len = socketChannel.read(byteBuffer);
        }

        byte[] result = new byte[lenth];
        int l = 0;
        for (int i = 0;i<list.size();i++){
            for (int j = 0;j<list.get(i).length;j++){
                result[l + j] = list.get(i)[j];
            }
            l += list.get(i).length;
        }
        return result;
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

getReadData方法讀取客戶端發送的所有數據。利用幫助類ObjectAndByteUtil對客戶端發送的數據進行序列化爲reqeust對象。同時爲接口IProviderService添加方法getFuncCallData,利用request對象調用相應服務方法,獲得方法的返回值,反序列化後發送給客戶端,該方法的代碼與初版本一致。 
幫助類ObjectAndByteUtil負責利用反/序列化技術進行字節數組與對象之間的轉化,代碼以下:

package whu.edu.lcrpc.util;

import java.io.*;

/**
 * Created by apple on 17/3/30.
 */

public class ObjectAndByteUtil {

    /**
     * 對象轉數組
     * @param obj
     * @return
     */
    public static byte[] toByteArray (Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
            bytes = bos.toByteArray ();
            oos.close();
            bos.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
        return bytes;
    }

    /**
     * 數組轉對象
     * @param bytes
     * @return
     */
    public static Object toObject (byte[] bytes) {
        Object obj = null;
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
            ObjectInputStream ois = new ObjectInputStream (bis);
            obj = ois.readObject();
            ois.close();
            bis.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        return obj;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

爲了採用NIO的方法開啓服務發佈端的服務監聽,咱們修改LCRPCProviderImpl類中對startListen函數的調用改成方法startListenByNIO,使得服務端採用NIO的方式發佈服務。

  • (2)客戶端代碼修改 
    客戶端的代碼大部分與博客《》中的一致,只不過仍是在read事件出發函數中,有所修改,主要流程就是讀取到服務端返回的數據後進行序列化,代碼以下:
public Object read(SelectionKey key) throws IOException {

    //step1. 獲得事件發生的通道
    byte[] result = getReadData(key);
    if (result == null) return null;

    Object object = ObjectAndByteUtil.toObject(result);

    return object;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

咱們爲接口IConsumerService添加方法sendDataByNIO,採用NIO的方式將服務調用端的請求信息序列化後發送給服務端,該函數代碼以下:

public Object sendDataByNIO(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException {
    NIOClient nioClient = new NIOClient(requestDO,ip);
    return nioClient.run();
}
  • 1
  • 2
  • 3
  • 4

類NIOClient代碼以下,其中run函數開啓輪詢,當所註冊事件發生時,觸發相應的方法。並在read事件觸發後結束輪訓。

public class NIOClient extends NIOBase{


    private LCRPCRequestDO requestDO;//客戶端對應的請求DO,發送給服務端
    private String ip;

    public NIOClient(LCRPCRequestDO requestDO,String ip){
        this.requestDO = requestDO;
        this.ip = ip;
    }

    public Object run() {
        try {
            initSelector();//初始化通道管理器
            initClient(ip,Constant.PORT);//初始化客戶端鏈接scoketChannel
            return listen();//開始輪詢處理事件

        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    public Object listen() throws IOException {
        //輪詢訪問select
        boolean flag = true;
        Object result = null;
        while(flag){
            //當註冊的事件到達時,方法返回;不然將一直阻塞
            selector.select();
            //得到selector中選中的項的迭代器,選中的項爲註冊的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            //循環處理註冊事件
            /**
             * 一共有四種事件:
             * 1. 服務端接收客戶端鏈接事件: SelectionKey.OP_ACCEPT
             * 2. 客戶端鏈接服務端事件:    SelectionKey.OP_CONNECT
             * 3. 讀事件:                SelectionKey.OP_READ
             * 4. 寫事件:                SelectionKey.OP_WRITE
             */
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //手動刪除已選的key,以防重複處理
                iterator.remove();
                //判斷事件性質
                if (key.isReadable()){//讀事件
                    result = read(key);
                    flag = false;
                    break;
                }else if (key.isConnectable()) {//客戶端鏈接事件
                    connect(key);
                }
            }
        }

        return result;

    }

    /**
     * 得到一個SocketChannel,並對該channel作一些初始化工做,並註冊到
     * @param ip
     * @param port
     */
    public void initClient(String ip,int port) throws IOException {
        //step1. 得到一個SocketChannel
        SocketChannel socketChannel = SocketChannel.open();



        //step2. 初始化該channel
        socketChannel.configureBlocking(false);//設置通道爲非阻塞


        //step3. 客戶端鏈接服務器,其實方法執行並無實現鏈接,須要再listen()方法中調用channel.finishConnect()方法才能完成鏈接
        socketChannel.connect(new InetSocketAddress(ip,port));

        //step4. 註冊該channel到selector中,併爲該通道註冊SelectionKey.OP_CONNECT事件和SelectionKey.OP_READ事件
        socketChannel.register(this.selector,SelectionKey.OP_CONNECT|SelectionKey.OP_READ);
    }

    /**
     * 當監聽到客戶端鏈接事件後的處理函數
     * @param key 事件key,能夠從key中獲取channel,完成事件的處理
     */
    public void connect(SelectionKey key) throws IOException {
        //step1. 獲取事件中的channel
        SocketChannel socketChannel = (SocketChannel) key.channel();


        //step2. 若是正在鏈接,則完成鏈接
        if (socketChannel.isConnectionPending()){
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);//將鏈接設置爲非阻塞
        //step3. 鏈接後,能夠給服務端發送消息
        socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(requestDO)));

    }

    public Object read(SelectionKey key) throws IOException {

        //step1. 獲得事件發生的通道
        byte[] result = getReadData(key);
        if (result == null) return null;

        Object object = ObjectAndByteUtil.toObject(result);

        return object;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

爲了使得客戶端採用NIO方式進行通信,咱們修改MyInvokeHandler類:

//            result = consumerService.sendData(serviceAddress,requestDO);//採用BIO的方式
            result = consumerService.sendDataByNIO(serviceAddress,requestDO);//採用NIO的方式
  • 1
  • 2

至此,NIO通訊模式代碼修改完畢。在測試的過程當中,遇到了一個問題,就是在服務調用端發出一個服務調用請求後,服務發佈端一直在觸發read事件,查閱資料後,瞭解到這種NIO的實現方式中,客戶端或者服務端其中一方將鏈接關閉後,會一直觸發另外一方的read事件,這時read會回傳-1,若沒有即便正確處理斷線(關閉channel),read事件會一直觸發,所以在getData函數讀取數據時,添加以下代碼:

if (len == -1){
    socketChannel.close();
    return null;//說明鏈接已經斷開
}
  • 1
  • 2
  • 3
  • 4

至此,問題得以解決。 
服務註冊查找中心以及服務端客戶端的代碼都不須要改變,分別運行後,獲得與初版相同的結果。(因爲服務端咱們採用一個selector管理全部channel,而且沒有開啓新的線程去處理數據,所以客戶端會以同步的方式獲得四次服務調用結果)

2.2.2 netty/mina

目前爲止咱們的代碼中,通訊部分採用了NIO和BIO兩種模式。BIO模式採用socket編程實現,NIO部分採用selector channel buffer編程實現。可是不管哪種,都只是簡單的幫助咱們瞭解兩種通訊模式的基本概念,以及如何用最簡單得編程方式實現。咱們在代碼中,也有很是多的異常,網絡等狀況沒有考慮,在實際生產中,也毫不會使用這種最基本最底層的編程方式來完成遠程得通訊。所以,咱們這裏引入Netty開源框架來實現通訊。他幫助咱們考慮了多種情況,使得咱們以簡單的代碼完成高質量的遠程通訊,專一於其餘業務邏輯等的實現。 
在分佈式應用系統開發中,服務化的應用之間進行遠程通訊時使用。Netty是在Java NIO的基礎上封裝的用於客戶端服務端網絡應用程序開發的框架,幫助用戶考慮在分佈式、高併發、高性能開發中遇到的多種情況,使得用戶使用更容易的網絡編程接口完成網絡通訊,專一於其餘業務邏輯的開發。 
(1)關於Netty 
(如下內容摘自知乎的帖子《通俗地講,Netty 能作什麼?》) 
netty是一套在java NIO的基礎上封裝的便於用戶開發網絡應用程序的api. 
Netty是什麼?

1)本質:JBoss作的一個Jar包

2)目的:快速開發高性能、高可靠性的網絡服務器和客戶端程序

3)優勢:提供異步的、事件驅動的網絡應用程序框架和工具 
通俗的說:一個好使的處理Socket的東東

(2)爲何選擇netty 
如下內容摘抄自《Netty權威指南》 
在上述優化中,咱們使用JDK爲咱們提供的NIO的類庫來修改LCRPC框架的遠程通訊方式。如下總結了不選擇Java原聲NIO編程的緣由: 
這裏寫圖片描述
因爲上述緣由,在大多數場景下,不建議你們直接食用JDK的NIO類庫,除非精通NIO編程或者有特殊的需求。在絕大多數的業務場景中,咱們可使用NIO框架Netty來進行NIO編程,他既能夠做爲客戶端也能夠做爲服務端,同時也支持UDP和異步文件傳輸,功能很是強大。 
如下總結了爲何選擇Netty做爲基礎通訊框架: 
這裏寫圖片描述

(3)LCRPC服務框架優化:使用netty替換底層網絡通信

與NIO的修改方式大體相同

增長四個netty服務端與客戶端的類; 
netty服務端開啓監聽的類NettyServer:

package whu.edu.lcrpc.io.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import whu.edu.lcrpc.util.Constant;

/**
 * Created by apple on 17/4/10.
 */
public class NettyServer {
    public void bind() throws InterruptedException {

        //配置服務端的NIO的線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        //建立服務啓動的輔助類
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,wokerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChildChannelHandler());
            //綁定端口,同步等待成功
            ChannelFuture f = b.bind(Constant.PORT).sync();
            System.out.println("已經開始監聽,能夠註冊服務了");
            //等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new NettyServerHandler());
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

netty服務端hanlder類NettyServerhandler:

package whu.edu.lcrpc.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import whu.edu.lcrpc.entity.LCRPCRequestDO;
import whu.edu.lcrpc.service.IProviderService;
import whu.edu.lcrpc.service.impl.ProviderServiceImpl;
import whu.edu.lcrpc.util.ObjectAndByteUtil;

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Date;

/**
 * Created by apple on 17/4/10.
 */
public class NettyServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        if (req == null) return;

        LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(req);
        IProviderService providerService = new ProviderServiceImpl();
        ByteBuf resp = Unpooled.copiedBuffer(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO)));
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

netty客戶端鏈接類NettyClient:

package whu.edu.lcrpc.io.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import whu.edu.lcrpc.util.Constant;
import whu.edu.lcrpc.util.ObjectAndByteUtil;

import java.io.UnsupportedEncodingException;

/**
 * Created by apple on 17/4/10.
 */


public class NettyClient {


    private Object reqObj;
    private String ip;
    public NettyClient(Object reqObj, String ip){
        this.reqObj = reqObj;
        this.ip = ip;
    }

    public Object connect() throws InterruptedException, UnsupportedEncodingException {
        //配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            byte[] req = ObjectAndByteUtil.toByteArray(reqObj);
            NettyClientHandler nettyClientHandler = new NettyClientHandler(req);
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(nettyClientHandler);
                        }
                    });
            //發起異步鏈接操做
            ChannelFuture f = b.connect(ip, Constant.PORT).sync();
            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
            //拿到異步請求結果,返回
            Object responseObj = ObjectAndByteUtil.toObject(nettyClientHandler.response);
            return responseObj;

        }finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

netty客戶端handler類NettyClientHandler:

package whu.edu.lcrpc.io.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by apple on 17/4/11.
 */
public class NettyClientHandler extends ChannelHandlerAdapter {


    private  ByteBuf firstMessage;
    public byte[] response;
    public NettyClientHandler(byte[] req){

        //將請求寫入緩衝區
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        response = new byte[buf.readableBytes()];
        buf.readBytes(response);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

然後修改LCRPC中本來的代碼,採用netty來進行遠程通訊。

首先在接口IConsumerService中增長函數sendDataByNetty,該函數採用netty的方式向服務發佈端發送數據。函數實現以下:

@Override
public Object sendDataByNetty(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException, InterruptedException {
    NettyClient nettyClient = new NettyClient(requestDO,ip);
    return nettyClient.connect();
}
  • 1
  • 2
  • 3
  • 4
  • 5

然後在接口IProviderService增長函數startListenByNetty,該函數採用netty的方式開啓服務監聽。

@Override
public boolean startListenByNetty() {
        new Thread(()->{
            NettyServer nettyServer = new NettyServer();
            try {
                nettyServer.bind();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

而後在代理handler類MyInvocationHandler中, 
修改

result = consumerService.sendDataByNIO(serviceAddress,requestDO);//採用NIO的方式
  • 1

result = consumerService.sendDataByNetty(serviceAddress,requestDO); //採用netty的方式
  • 1

採用netty的方式調用服務。 
而且在類LCRPCProviderImpl中使用方法startListenByNetty開啓服務的監聽。 
客戶端以及服務端的測試工程代碼均不須要改變,進行測試後,輸出結果不變。

須要注意的是:上述關於Netty的使用沒有考慮到TCL粘包/拆包的問題!

三、優化三:服務框架工做日誌

這個優化未完待續~

相關文章
相關標籤/搜索