精通併發與 Netty (一)如何使用

精通併發與 Netty

Netty 是一個異步的,事件驅動的網絡通訊框架,用於高性能的基於協議的客戶端和服務端的開發。javascript

異步指的是會當即返回,並不知道到底發送過去沒有,成功沒有,通常都會使用監聽器來監聽返回。java

事件驅動是指開發者只須要關注事件對應的回調方法便可,好比 channel active,inactive,read 等等。web

網絡通訊框架就不用解釋了,不少你很是熟悉的組件都使用了 netty,好比 spark,dubbo 等等。編程

初步瞭解 Netty

第一個簡單的例子,使用 Netty 實現一個 http 服務器,客戶端調用一個沒有參數的方法,服務端返回一個 hello world。bootstrap

Netty 裏面大量的代碼都是對線程的處理和 IO 的異步的操做。瀏覽器

package com.paul;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Server {

    public static void main(String[] args) throws InterruptedException {
        //定義兩個線程組,事件循環組,能夠類比與 Tomcat 就是死循環,不斷接收客戶端的鏈接
        // boss 線程組不斷從客戶端接受鏈接,但不處理,由 worker 線程組對鏈接進行真正的處理
        // 一個線程組其實也能完成,推薦使用兩個
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服務端啓動器,能夠輕鬆的啓動服務端的 channel
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //group 方法有兩個,一個接收一個參數,另外一個接收兩個參數
            // childhandler 是咱們本身寫的請求處理器
            serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class)
                    .childHandler(new ServerInitializer());
            //綁定端口
            ChannelFuture future = serverBootstrap.bind(8011).sync();
            //channel 關閉的監聽
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
package com.paul;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面能夠有不少 handler,一層層過濾的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        //HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的組合,編碼和解碼的 h      handler
        pipeline.addLast("httpServerCodec", new HttpServerCodec());
        pipeline.addLast("handler", new ServerHandler());
    }
}
package com.paul;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        if(httpObject instanceof HttpRequest) {
            ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            //單純的調用 write 只會放到緩存區,不會真的發送
            channelHandlerContext.writeAndFlush(response);
        }
    }
}

咱們在 SimpleChannelInboundHandler 裏分析一下,先看它繼承的 ChannelInboundHandlerAdapter 裏面的事件回調方法,包括通道註冊,解除註冊,Active,InActive等等。緩存

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelRegistered();
}

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelUnregistered();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelActive();
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelInactive();
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  ctx.fireChannelRead(msg);
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelReadComplete();
}

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  ctx.fireUserEventTriggered(evt);
}

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelWritabilityChanged();
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  ctx.fireExceptionCaught(cause);
}

執行順序爲 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。服務器

Netty 自己並非遵循 servlet 規範的。Http 是基於請求和響應的無狀態協議。Http 1.1 是有 keep-alived 參數的,若是3秒沒有返回,則服務端主動關閉瞭解,Http 1.0 則是請求完成直接返回。websocket

Netty 的鏈接會被一直保持,咱們須要本身去處理這個功能。網絡

在服務端發送完畢數據後,能夠在服務端關閉 Channel。

ctx.channel.close();

Netty 能作什麼

  1. 能夠看成一個 http 服務器,可是他並無實現 servelt 規範。雖然 Tomcat 底層自己也使用 NIO,可是 Netty 自己的特色決定了它比 Tomcat 的吞吐量更高。相比於 SpringMVC 等框架,Netty 沒提供路由等功能,這也契合和 Netty 的設計思路,它更貼近底層。
  2. Socket 開發,也是應用最爲普遍的領域,底層傳輸的最基礎框架,RPC 框架底層多數採用 Netty。直接採用 Http 固然也能夠,可是效率就低了不少了。
  3. 支持長鏈接的開發,消息推送,聊天,服務端向客戶端推送等等都會採用 WebSocket 協議,就是長鏈接。

Netty 對 Socket 的實現

對於 Http 編程來講,咱們實現了服務端就能夠了,客戶端徹底可使用瀏覽器或者 CURL 工具來充當。可是對於 Socket 編程來講,客戶端也得咱們本身實現。

服務器端:

Server 類於上面 Http 服務器那個同樣,在 ServerInitoalizer 有一些變化

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面能夠有不少 handler,一層層過濾的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        // TCP 粘包 拆包
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符串編碼,解碼
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ServerHandler());

    }
}
public class ServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客戶端:

public class Client {

    public static void main(String[] args) throws InterruptedException {
        //客戶端不須要兩個 group,只須要一個就夠了,直接鏈接服務端發送數據就能夠了
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            //服務器端既可使用 handler 也可使用 childhandler, 客戶端通常使用 handler
            //對於 服務端,handler 是針對 bossgroup的,childhandler 是針對 workergorup 的
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面能夠有不少 handler,一層層過濾的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        // TCP 粘包 拆包
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符串編碼,解碼
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ClientHandler());

    }
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        System.out.println("client output:"+ msg);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("23123");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty 長鏈接實現一個聊天室

Server 端:

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    //定義 channel group 來管理全部 channel
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {


    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "加入\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "離開\n");
        //這個 channel 會被自動從 channelGroup 裏移除

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "上線");

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "離開");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Client 端:

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(;;){
  channel.writeAndFlush(br.readLine() + "\r\n");
}

Netty 心跳

集羣之間各個節點的通訊,主從節點之間須要進行數據同步,每當主節點的數據發生變化時,經過異步的方式將數據同步到從節點,同步方式能夠用日誌等等,所以主從節點之間不是實時一致性而是最終一致性。

節點與節點之間如何進行通訊那?這種主從模式是須要互相之間有長鏈接的,這樣來肯定對方還活着,實現方式是互相之間定時發送心跳數據包。若是發送幾回後對方仍是沒有響應的話,就能夠認爲對方已經掛掉了。

回到客戶端與服務端的模式,有人可能會想,客戶端斷開鏈接後服務端的 handlerRemoved 等方法不是能感知嗎?還要心跳幹什麼哪?

真實狀況其實很是複雜,好比手機客戶端和服務端進行一個長鏈接,客戶端沒有退出應用,客戶端開了飛行模型,或者強制關機,此時雙方是感知不到鏈接已經斷掉了,或者說須要很是長的時間才能感知到,這是咱們不想看到的,這時就須要心跳了。

來看一個示例:

其餘的代碼仍是和上面的同樣,咱們就不列出來了,直接進入主題,看不一樣的地方:

服務端

// Netty 爲了支持心跳的 IdleStateHandler,空閒狀態監測處理器。
     pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));

來看看 IdleStateHandler 的說明

/*
 * Triggers an IdleStateEvent when a Channel has not performed read, write, or both    
 * operation for a while
 * 當一個 channel 一斷時間沒有進行 read,write 就觸發一個 IdleStateEvent
 */
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
  this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
  //三個參數分別爲多長時間沒進行讀,寫或者讀寫操做則觸發 event。
}

觸發 event 後咱們編寫這個 event 對應的處理器。

public class MyHandler extends ChannelInboundHandlerAdapter{
  //觸發某個事件後這個方法就會被調用
  //一個 channelhandlerContext 上下文對象,另外一個是事件
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
        if(evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
        String eventType = null;
        switch(event.state()){
          case READER_IDLE:
            eventType = "讀空閒";
          case WRITER_IDLE:
            eventType = "寫空閒";
          case ALL_IDLE:
            eventType = "讀寫空閒";
        }
      }else{
        //繼續將事件向下一個 handler 傳遞
        ctx.
      }
  }
}

WebSocket 實現與原理分析

WebSocket 是一種規範,是 HTML5 規範的一部分,主要是解決 Http 協議自己存在的問題。能夠實現瀏覽器和服務端的長鏈接,鏈接頭信息只在創建鏈接時發送一次。是在 Http 協議之上構建的,好比請求鏈接實際上是一個 Http 請求,只不過裏面加了一些 WebSocket 信息。也能夠用在非瀏覽器場合,好比 app 上。

Http 是一種無狀態的基於請求和響應的協議,意思是必定是客戶端想服務端發送一個請求,服務端給客戶端一個響應。Http 1.0 在服務端給客戶端響應後鏈接就斷了。Http 1.1 增長可 keep-alive,服務端能夠和客戶端在短期以內保持一個鏈接,某個事件以內服務端和客戶端能夠複用這個連接。在這種狀況下,網頁聊天就是實現不了的,服務端的數據推送是沒法實現的。

之前有一些假的長鏈接技術,好比輪詢,缺點和明顯,這裏就不細說了。

Http 2.0 實現了長鏈接,可是這不在咱們討論範圍以內。

針對服務端,Tomcat 新版本,Spring,和 Netty 都實現了對 Websocket 的支持。

使用 Netty 對 WebSocket 的支持來實現長鏈接

其餘的部分仍是同樣的,先來看服務端的 WebSocketChannelInitializer。

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
   //須要支持 websocket,咱們在 initChannel 是作一點改動
   @Override
   protected void initChannel(SocketChannel ch) throws Exception{
      ChannelPipeline pipeline = ch.pipeline();
      //由於 websocket 是基於 http 的,因此要加入 http 相應的編解碼器
      pipeline.addLast(new HttpServerCodec());
      //以塊的方式進行寫的處理器
      pipeline.addLast(new ChunkedWriteHandler());
      // 進行 http 聚合的處理器,將 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者 
      // FullHttpResponse
      //HttpObjectAggregator 在基於 netty 的 http 編程使用的很是多,粘包拆包。
      pipeline.addLast(new HttpObjectAggregator(8192));
      // 針對 websocket 的類,完成 websocket 構建的全部繁重工做,負責握手,以及心跳(close,ping, 
      // pong)的處理, websocket 經過 frame 幀來傳遞數據。
      // BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame,
      // PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。
      // /ws 是 context_path,websocket 協議標準,ws://server:port/context_path
      pipeline.addLast(new WebSocketServerProcotolHandler("/ws"));
      pipeline.addLast(new TextWebSocketFrameHandler());
   }
}
// websocket 協議須要用幀來傳遞參數
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
     System.out.println("收到消息:"+ msg.text());
     ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器返回"));
   }
   
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
     System.out.println("handlerAdded" + ctx.channel().id.asLongText());
   }
  
   @Override
   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
     System.out.println("handlerRemoved" + ctx.channel().id.asLongText());
   }
  
}

客戶端咱們直接經過瀏覽器的原聲 JS 來寫

<script type="text/javascript">
   var socket;
   if(window.WebSocket){
     socket = new WebSocket("ws://localhost:8899/ws");
     socket.onmessage = function(event){
       alert(event.data);
     }
     socket.onopen = function(event){
       alert("鏈接開啓");
     }
     socket.onclose = function(event){
       alert("鏈接關閉");
     }
   }else{
     alert("瀏覽器不支持 WebSocket");
   }

   function send(message){
     if(!window.WebSocket){
       return;
     }
     if(socket.readyState == WebSocket.OPEN){
       socket.send(message);
     }
   }
</script>

咱們在瀏覽器中經過 F12 看看 Http 協議升級爲 WebSocket 協議的過程。

相關文章
相關標籤/搜索