徒手擼一個簡單的RPC框架(2)——項目改造

在上一篇的徒手擼一個簡單的RPC框架中再最後的服務器和客戶端鏈接的時候只是簡單的寫了Socket鏈接,以爲有些不妥。正好最近學習了Netty,在平時工做中沒機會運用,因而本身就給本身出需求將以前的項目改造一下。git

Netty是什麼?

在學習Netty以前呢咱們首先得了解IO和NIOgithub

IO模型

IO編程模型簡單來講就是上一篇我寫的服務端與客戶端的鏈接,客戶端與服務端創建鏈接通訊後,必須等待服務端返回信息才能進行下一步的動做,這期間線程一直是等待狀態。IO模型在客戶端較少的狀況下是沒問題的,可是一旦有大量客戶端與服務端進行鏈接,那麼就會出問題。咱們簡單的分析一下緣由。編程

  1. 首先我以前寫的代碼實際上是有問題的,爲何呢?由於每次鏈接通訊一次我就將其鏈接關閉了。Socket鏈接時TCP,雙方每次創建鏈接時都會耗費時間和資源,不能每通訊一次就關閉鏈接。就比如你和別人打電話你說一句對方說一句,而後掛電話,而後再打過去。確定是不能這麼作的
  2. 若是是想要保持通訊,那麼程序中就得將其監聽的代碼放入while循環中用專門的線程來維護。可是線程是操做系統中很是寶貴的資源,每一個操做系統能建立的線程也是有限的。
  3. CPU頻繁的在線程之間切換是很是損耗性能的。
  4. 咱們能夠看到以前編寫的代碼中客戶端與服務端交流的媒介是字節流,效率不高。

IO模型有這麼多的問題,因而在JDK1.4中提出了NIO的概念,就是爲了解決以上的問題bootstrap

NIO模型

咱們一一對應上面的問題來看NIO用什麼技術來解決的bash

第一個問題

第一個問題是代碼的問題,咱們就不討論了服務器

第二個問題

線程有限的問題:NIO中提出了Selector概念,IO中是每一個鏈接都會由一個線程阻塞來維護,NIO中用Selector來管理這些鏈接,若是有消息的傳入或傳出,那麼就創建相應的線程了處理。這樣服務器只須要阻塞一個Selector線程,就能夠管理多個鏈接了。框架

具體的Selector文章能夠看我以前的NIO中選擇器Selector,裏面有介紹詳細的Selector用法。異步

這裏舉個例子應該就明白的,比如你去釣魚,IO就是一人一個魚竿,等着魚上來,中間哪也不能去,而NIO就是一我的能守着好幾個魚竿。ide

這就是NIO模型解決操做系統中線程有限的問題。函數

第三個問題

CPU在線程之間頻繁切換,因爲NIO中只管理了一個Selector線程,那麼這個問題也就相應的解決了

第四個問題

NIO中提出了ChannelBuffer的概念,就比如在嚮往的生活第一季中摘玉米中,是用竹筐一次一次背快呢?仍是接一輛車子來回運送快?固然是車子來回運送快了,而這裏的Buffer就比如車子。具體的ChannelBuffer的解釋能夠看我以前的文章Java中IO和NIOJAVA中NIO再深刻

Netty

那麼爲何就和Netty扯上關係了呢?其實我以爲NIO之於Netty的關係就比如Servlet之於Tomcat的關係,Netty只是對於NIO進行了進一步的封裝,讓使用者更加簡便的編程。

改造

此次改造分爲服務端和客戶端的改造

服務端

接下來咱們就利用Netty將咱們的服務器端與客戶端鏈接通訊部分進行改造一下,首先咱們先加上對於Netty的依賴

compile 'io.netty:netty-all:4.1.6.Final'
複製代碼

而後編寫服務端的代碼,服務端的代碼很是簡單

ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
        .group(boos, worker)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                        //得到實現類處理事後的返回值
                        String invokeMethodMes = CommonDeal.getInvokeMethodMes(msg);
                        ByteBuf encoded = ctx.alloc().buffer(4 * invokeMethodMes.length());
                        encoded.writeBytes(invokeMethodMes.getBytes());
                        ctx.writeAndFlush(encoded);
                    }
                });
            }
        }).bind(20006);

複製代碼

這和咱們日常寫的Socket鏈接有些區別,能夠看到咱們建了兩個NioEventLoopGroup一個boss一個worker,爲何會有兩個呢?

從這個圖裏面咱們能夠看到,boss是專門用來對外鏈接的,worker則是像NIO中Selector用來處理各類讀寫的請求。

客戶端

其實難點就是在客戶端,由於Netty是異步事件驅動的框架,什麼是異步呢?

客戶端與服務端的任何I/O操做都將當即返回,等待服務端處理完成之後會調用指定的回調函數進行處理。在這個過程當中客戶端一直沒有阻塞。因此咱們在客戶端與服務端請求處理時,若是得到異步處理的結果呢?Netty提供有一種獲取異步回調結果的,可是那是添加監聽器。而咱們的RPC調用在最後返回結果的時候必須得阻塞等待結果的返回,因此咱們須要本身寫一個簡單的獲取異步回調結果的程序。想法以下。

  1. 想要得到服務端返回的消息時,阻塞等待。
  2. Netty客戶端讀取到客戶端消息時,喚醒等待的線程

那麼咱們就圍繞這兩步來進行編碼。

客戶端想要獲取服務端消息時如何等待呢?這裏咱們就能夠用wait()

public Response getMessage(){
    synchronized (object){

        while (!success){
            try {
                object.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        return response;
    }
}

複製代碼

那麼讀到消息之後如何喚醒呢?

public void setMessage(Response response){
    synchronized (object){
        this.response = response;
        this.success = true;
        object.notify();
    }
}

複製代碼

這樣就解決了咱們上面提出的兩個問題了。接下來編寫客戶端的代碼

private final Map<Long,MessageFuture> futureMap = new ConcurrentHashMap<>();
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void connect(String requestJson,Long threadId){
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline().
                        addLast(new StringDecoder()).
                        addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                Gson gson = new Gson();
                                Response response = gson.fromJson(msg, Response.class);
                                MessageFuture messageFuture = futureMap.get(threadId);
                                messageFuture.setMessage(response);
                            }

                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                futureMap.put(threadId,new MessageFuture());
                                countDownLatch.countDown();
                                ByteBuf encoded = ctx.alloc().buffer(4 * requestJson.length());
                                encoded.writeBytes(requestJson.getBytes());
                                ctx.writeAndFlush(encoded);
                            }
                        });
            }
        }).connect("127.0.0.1", 20006);
    }

    public Response getResponse(Long threadId){
        MessageFuture messageFuture = null;
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        messageFuture = futureMap.get(threadId);
        return messageFuture.getMessage();
    }

複製代碼

這裏面咱們用到了CountDownLatch類,即等待發送完消息之後通知我能獲取數據了。這裏面的代碼和服務端的差很少,其中有區別的地方就是在發送數據的時候將線程ID和MessageFuture放入Map中,在獲得服務端發送的數據時取出並放入獲得的Response。

總結

到目前爲止咱們就完成了咱們的項目改造,只是簡單的應用了一下Netty的客戶端和服務端的通訊,由於在學習的過程當中若是沒有運用的話,那麼感受記憶沒有那麼牢靠,因此就有了這次的項目改造的計劃。雖然完成了簡單的通訊,可是我知道還有些地方須要優化,例如用synchronized在之後學習了AQS之後但願也可以學以至用將這裏給改一下。

完整的項目地址

徒手擼一個簡單的RPC框架

徒手擼一個簡單的IOC

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息