Netty入門


1、NIO

  Netty框架底層是對NIO的高度封裝,因此想要更好的學習Netty以前,應先了解下什麼是NIO - NIO是non-blocking的簡稱,
在jdk1.4 裏提供的新api,他的他的特性以下:
  * 爲全部的原始類型提供(Buffer)緩存支持,字符集編碼解碼解決方案。
  * Channel :一個新的原始I/O 抽象。支持鎖和內存映射文件的文件訪問接口。提供多路(non-bloking)非阻塞式的高伸縮
性網絡I/O 。
  NIO是一個非阻塞式的I/O,它由一個專門的線程來處理全部的IO事件,並負責分發,而且它只有在事件到達的時候纔會觸發,
而不是去同步的監視事件;線程之間經過wait,notify 等方式通信。保證每次上下文切換都是有意義的。減小無謂的線程切換。
  NIO和IO最大的區別是數據打包和傳輸方式。IO是以流的方式處理數據,而NIO是以塊的方式處理數據。NIO的核心部分由
Channels、Buffers、Selectors三部分組成。java

(一)Channel和Buffer

  正常的狀況下,全部的IO在NIO中都從一個Channel 開始。Channel有點像流。數據能夠從Channel讀到Buffer中,也能夠從
Buffer寫到Channel中。JAVA NIO中的一些主要Channel的實現:FileChannel、DatagramChannel、SocketChannel、
ServerSocketChannel。這些實現類覆蓋了UDP和TCP網絡IO,以及文件IO。
  而Buffer的一些實現類:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer,則覆蓋
了能經過IO發送的基本數據類型:byte,short,int,long,float,double和char。算法

(二)Selector

  Selector容許單線程處理多個Channel。若是你的應用打開了多個鏈接(通道),但每一個鏈接的流量都很低,使用Selector
就會很方便。而要使用Selector,就得向Selector註冊Channel,而後調用它的select()方法。這個方法會一直阻塞到某個註冊的
通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件,事件的例子有如新鏈接進來,數據接收等。bootstrap

2、Netty

(一)Netty入門

  大數據,高訪問場景的互聯網項目或者多系統的協同工做,使用一個服務器根本不能勝任。就須要把系統拆分紅了多個服務,
根據須要部署在多個機器上,這些服務很是靈活,能夠隨着訪問量彈性擴展。可是多個模塊的跨服務通訊,時間和資源都是極大
地浪費。傳統的Blocking IO不能解決,由於會有線程阻塞的問題,而使用非阻塞IO(NIO),則須要耗費太多的精力。而Netty框架
(RPC框架)則很好的解決了這個問題。
  Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、
高可靠性的網絡服務器和客戶端程序。他就是一個程序,是封裝java socket noi的,咱們直接拿來用就行了。
  Netty通訊服務端的步驟:
    一、建立兩個NIO線程組,一個專門用於網絡事件處理(接受客戶端的鏈接),另外一個則進行網絡通訊的讀寫。
    二、建立一個ServerBootstrap對象,配置Netty的一系列參數,例如接受傳出數據的緩存大小等。
    三、建立一個用於實際處理數據的類ChannelInitializer,進行初始化的準備工做,好比設置接受傳出數據的字符集、
格式以及實際處理數據的接口。
    四、綁定端口,執行同步阻塞方法等待服務器端啓動便可。
    五、關閉相應的資源。api

服務端栗子:
  服務端的管理者:緩存

/**
* 服務端處理通道.這裏只是打印一下請求的內容,並不對請求進行任何的響應
* 繼承自ChannelHandlerAdapter, 這個類實現了ChannelHandler接口,
* ChannelHandler提供了許多事件處理的接口方法,而後你能夠覆蓋這些方法。
* @author lcy
*
*/
public class DiscartServiceHandler extends ChannelHandlerAdapter {
/**
* 客戶端收到新消息時,這個方法會被調用
* 
* @param ctx
* 通道處理的上下文信息
* @param msg
* 接受的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// 將接收到的信息轉換爲緩衝區
ByteBuf str = (ByteBuf) msg;
// 打印傳輸過來的信息
System.out.print(str.toString(CharsetUtil.UTF_8));
} finally {
// 釋放ByteBuf對象
ReferenceCountUtil.release(msg);
}
}

/**
* 在異常時觸發
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//輸出錯誤信息
cause.printStackTrace();
ctx.close();
}
}

  服務端:服務器

/**
* 服務端
* @author lcy
*
*/
public class DiscartServer {
private int port;

public DiscartServer(int port) {
super();
this.port = port;
}

public void run() throws Exception {
//(一)設置兩個線程組
//用來接收進來的鏈接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用來處理已經接受的鏈接
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("準備運行的端口" + port);
try {
//(二)輔助工具類,用於服務器通道的一系列配置
ServerBootstrap bootstrap = new ServerBootstrap();
//(三)綁定兩個線程組
// 設置group,這一步是必須的,若是沒有設置group將會報java.lang.IllegalStateException:group not set異常
bootstrap = bootstrap.group(bossGroup, workGroup);
//(四)指定NIO的模式
/***
* ServerSocketChannel以NIO的selector爲基礎進行實現的,用來接收新的鏈接
* 這裏告訴Channel如何獲取新的鏈接.
*/
bootstrap = bootstrap.channel(NioServerSocketChannel.class);
//(五)配置具體的數據處理方式,就是往裏添加規則
bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//與50秒內都沒有與服務端進行通訊的客戶端斷連
arg0.pipeline().addLast(new ReadTimeoutHandler(50));
arg0.pipeline().addLast(new HttpObjectAggregator(1048576));
// 添加實際處理數據的類
arg0.pipeline().addLast(new DiscartServiceHandler());
}
});
//(六)設置TCP緩衝區
bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128);
//保持鏈接
bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//(七)綁定端口,啓動接收進來的鏈接
ChannelFuture sync = bootstrap.bind(port).sync();
//(八) 這裏會一直等待,直到socket被關閉
sync.channel().closeFuture().sync();
} finally {
//(九)關閉資源
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
//服務開啓
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscartServer(port).run();
System.out.println("server:run()");
}
}

客戶端栗子:網絡

  實際處理數據的類:框架

public class ChannelClient extends ChannelInitializer{

@Override
protected void initChannel(Channel arg0) throws Exception {
//與50秒內都沒有與服務端進行通訊的客戶端斷連
arg0.pipeline().addLast(new ReadTimeoutHandler(50));
arg0.pipeline().addLast(new HttpObjectAggregator(1048576));
//設置Channel
arg0.pipeline().addLast(new ChannelHandlerAdapter(){

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// 將接收到的信息轉換爲緩衝區
ByteBuf str = (ByteBuf) msg;
// 打印傳輸過來的信息
System.out.print(str.toString(CharsetUtil.UTF_8));
} finally {
// 釋放ByteBuf對象
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//輸出錯誤信息
cause.printStackTrace();
ctx.close();
}
});

}
}

  客戶端:異步

/**
* 客戶端
* @author lcy
*/
public class Client {
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
//建立一個新的線程組
NioEventLoopGroup workGroup = new NioEventLoopGroup();
//初始化Netty
Bootstrap bootstrap = new Bootstrap();
//指定工做的線程組
bootstrap = bootstrap.group(workGroup);
//指定 Channel的類型。由於是客戶端, 所以使用了 NioSocketChannel。
bootstrap.channel(NioSocketChannel.class);
/**
* 設置連接的一些屬性
*/
//下降延遲,禁用了禁用nagle算法。nagle算法受TCP延遲確認影響,會致使相繼兩次向鏈接發送請求包。
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//保持鏈接檢測對方主機是否崩潰,避免(服務器)永遠阻塞於TCP鏈接的輸入
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
//使用netty默認的解碼器會出現讀取不完整,不會執行channelRead方法。設置這個屬性惋惜保證Netty讀取的完整
bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE);
//設置數據處理器
bootstrap.handler(new ChannelClient());
//同步的連接
Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
channel.writeAndFlush(Unpooled.copiedBuffer("Hello Netty...".getBytes()));
channel.closeFuture().sync();
workGroup.shutdownGracefully();
}
}

(二)Netty的數據通信:

  一、使用長鏈接通道不斷開的形式進行通訊,也就是服務器和客戶端的通道一直處於開啓狀態,若是服務器性能足夠好,
而且客戶端數量也比較多的狀況下,推薦這種方式。
  二、一次性批量提交數據,採用短鏈接方式。也就是說先把數據保存到本地臨時緩存區或者臨時表,當達到界值時進行一
次性批量提交,又或者根據定時任務輪詢提交。
  三、使用一種特殊的長鏈接,在某一指定時間段內,服務器與某臺客戶端沒有任何通訊,則斷開鏈接。下次鏈接則是客戶
端向服務器發送請求的時候,再次創建鏈接。socket

(三)Netty的編解碼器:  

    1. Decoder 解碼器    負責將消息從字節或其餘序列形式轉成指定的消息對象。
  2. Encoder 編碼器    將消息對象轉成字節或其餘序列形式在網絡上傳輸。
  入站」ByteBuf讀取bytes後由 ToIntegerDecoder 進行解碼,而後將解碼後的消息存入List集合中,而後傳遞到ChannelPipeline
中的下一個ChannelInboundHandler。
  解碼器:
    1)ByteToMessageDecoder,需本身判斷ByteBuf讀取前是否有足夠的字節,不然會出現沾包的現象。
    2)ReplayingDecoder,無需本身檢查字節長度,可是使用起來具備侷限性:
      * 不是全部的操做都被ByteBuf支持,若是調用一個不支持的操做會拋出DecoderException。
      * ByteBuf.readableBytes()大部分時間不會返回指望值。
    3)MessageToMessageDecoder(message-to-message)
  解碼器是用來處理入站數據,Netty提供了不少解碼器的實現,能夠根據需求詳細瞭解。
  編碼器:
    1)MessageToByteEncoder
    2)MessageToMessageEncoder 須要將消息編碼成其餘的消息時可使用Netty提供的MessageToMessageEncoder抽象類
來實現。例如將Integer編碼成String。

(四)Netty中解決TCP粘包/拆包問題

  想要解決TCP的粘包/拆包問題,首先要知道什麼是TCP粘包、拆包:
    TCP是一個「流」協議,所謂流就是沒有界限的遺傳數據。你們能夠想象一下,若是河水就比如數據,他們是連成一片的,沒有
分界線,TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的具體狀況進行包的劃分,也就是說,在業務上一個完整的
包可能會被TCP分紅多個包進行發送,也可能把多個小包封裝成一個大的數據包發送出去,這就是所謂的粘包/拆包問題。
  解決方案:
  一、消息定長,例如每一個報文的大小固定爲200個字節,若是不夠,空位補空格。
  二、在包尾部增長特殊字符進行分割,例如加回車等。
  三、將消息分爲消息頭和消息體,在消息頭中包含表示消息總長度的字段,而後進行業務邏輯的處理。
  Netty中解決TCP粘包/拆包的方法:     一、分隔符類:DelimiterBasedFrameDecoder(自定義分隔符)     二、定長:FixedLengthFrameDecoder

相關文章
相關標籤/搜索