Netty框架底層是對NIO的高度封裝,因此想要更好的學習Netty以前,應先了解下什麼是NIO - NIO是non-blocking的簡稱,
在jdk1.4 裏提供的新api,他的他的特性以下:
* 爲全部的原始類型提供(Buffer)緩存支持,字符集編碼解碼解決方案。
* Channel :一個新的原始I/O 抽象。支持鎖和內存映射文件的文件訪問接口。提供多路(non-bloking)非阻塞式的高伸縮
性網絡I/O 。
NIO是一個非阻塞式的I/O,它由一個專門的線程來處理全部的IO事件,並負責分發,而且它只有在事件到達的時候纔會觸發,
而不是去同步的監視事件;線程之間經過wait,notify 等方式通信。保證每次上下文切換都是有意義的。減小無謂的線程切換。
NIO和IO最大的區別是數據打包和傳輸方式。IO是以流的方式處理數據,而NIO是以塊的方式處理數據。NIO的核心部分由
Channels、Buffers、Selectors三部分組成。java
正常的狀況下,全部的IO在NIO中都從一個Channel 開始。Channel有點像流。數據能夠從Channel讀到Buffer中,也能夠從
Buffer寫到Channel中。JAVA NIO中的一些主要Channel的實現:FileChannel、DatagramChannel、SocketChannel、
ServerSocketChannel。這些實現類覆蓋了UDP和TCP網絡IO,以及文件IO。
而Buffer的一些實現類:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer,則覆蓋
了能經過IO發送的基本數據類型:byte,short,int,long,float,double和char。算法
Selector容許單線程處理多個Channel。若是你的應用打開了多個鏈接(通道),但每一個鏈接的流量都很低,使用Selector
就會很方便。而要使用Selector,就得向Selector註冊Channel,而後調用它的select()方法。這個方法會一直阻塞到某個註冊的
通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件,事件的例子有如新鏈接進來,數據接收等。bootstrap
大數據,高訪問場景的互聯網項目或者多系統的協同工做,使用一個服務器根本不能勝任。就須要把系統拆分紅了多個服務,
根據須要部署在多個機器上,這些服務很是靈活,能夠隨着訪問量彈性擴展。可是多個模塊的跨服務通訊,時間和資源都是極大
地浪費。傳統的Blocking IO不能解決,由於會有線程阻塞的問題,而使用非阻塞IO(NIO),則須要耗費太多的精力。而Netty框架
(RPC框架)則很好的解決了這個問題。
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、
高可靠性的網絡服務器和客戶端程序。他就是一個程序,是封裝java socket noi的,咱們直接拿來用就行了。
Netty通訊服務端的步驟:
一、建立兩個NIO線程組,一個專門用於網絡事件處理(接受客戶端的鏈接),另外一個則進行網絡通訊的讀寫。
二、建立一個ServerBootstrap對象,配置Netty的一系列參數,例如接受傳出數據的緩存大小等。
三、建立一個用於實際處理數據的類ChannelInitializer,進行初始化的準備工做,好比設置接受傳出數據的字符集、
格式以及實際處理數據的接口。
四、綁定端口,執行同步阻塞方法等待服務器端啓動便可。
五、關閉相應的資源。api
服務端栗子:
服務端的管理者:緩存
/** * 服務端處理通道.這裏只是打印一下請求的內容,並不對請求進行任何的響應 * 繼承自ChannelHandlerAdapter, 這個類實現了ChannelHandler接口, * ChannelHandler提供了許多事件處理的接口方法,而後你能夠覆蓋這些方法。 * @author lcy * */ public class DiscartServiceHandler extends ChannelHandlerAdapter { /** * 客戶端收到新消息時,這個方法會被調用 * * @param ctx * 通道處理的上下文信息 * @param msg * 接受的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // 將接收到的信息轉換爲緩衝區 ByteBuf str = (ByteBuf) msg; // 打印傳輸過來的信息 System.out.print(str.toString(CharsetUtil.UTF_8)); } finally { // 釋放ByteBuf對象 ReferenceCountUtil.release(msg); } } /** * 在異常時觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //輸出錯誤信息 cause.printStackTrace(); ctx.close(); } }
服務端:服務器
/** * 服務端 * @author lcy * */ public class DiscartServer { private int port; public DiscartServer(int port) { super(); this.port = port; } public void run() throws Exception { //(一)設置兩個線程組 //用來接收進來的鏈接 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 用來處理已經接受的鏈接 NioEventLoopGroup workGroup = new NioEventLoopGroup(); System.out.println("準備運行的端口" + port); try { //(二)輔助工具類,用於服務器通道的一系列配置 ServerBootstrap bootstrap = new ServerBootstrap(); //(三)綁定兩個線程組 // 設置group,這一步是必須的,若是沒有設置group將會報java.lang.IllegalStateException:group not set異常 bootstrap = bootstrap.group(bossGroup, workGroup); //(四)指定NIO的模式 /*** * ServerSocketChannel以NIO的selector爲基礎進行實現的,用來接收新的鏈接 * 這裏告訴Channel如何獲取新的鏈接. */ bootstrap = bootstrap.channel(NioServerSocketChannel.class); //(五)配置具體的數據處理方式,就是往裏添加規則 bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel arg0) throws Exception { //與50秒內都沒有與服務端進行通訊的客戶端斷連 arg0.pipeline().addLast(new ReadTimeoutHandler(50)); arg0.pipeline().addLast(new HttpObjectAggregator(1048576)); // 添加實際處理數據的類 arg0.pipeline().addLast(new DiscartServiceHandler()); } }); //(六)設置TCP緩衝區 bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128); //保持鏈接 bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); //(七)綁定端口,啓動接收進來的鏈接 ChannelFuture sync = bootstrap.bind(port).sync(); //(八) 這裏會一直等待,直到socket被關閉 sync.channel().closeFuture().sync(); } finally { //(九)關閉資源 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } //服務開啓 public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscartServer(port).run(); System.out.println("server:run()"); } }
客戶端栗子:網絡
實際處理數據的類:框架
public class ChannelClient extends ChannelInitializer{ @Override protected void initChannel(Channel arg0) throws Exception { //與50秒內都沒有與服務端進行通訊的客戶端斷連 arg0.pipeline().addLast(new ReadTimeoutHandler(50)); arg0.pipeline().addLast(new HttpObjectAggregator(1048576)); //設置Channel arg0.pipeline().addLast(new ChannelHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // 將接收到的信息轉換爲緩衝區 ByteBuf str = (ByteBuf) msg; // 打印傳輸過來的信息 System.out.print(str.toString(CharsetUtil.UTF_8)); } finally { // 釋放ByteBuf對象 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //輸出錯誤信息 cause.printStackTrace(); ctx.close(); } }); } }
客戶端:異步
/** * 客戶端 * @author lcy */ public class Client { @SuppressWarnings("resource") public static void main(String[] args) throws Exception { //建立一個新的線程組 NioEventLoopGroup workGroup = new NioEventLoopGroup(); //初始化Netty Bootstrap bootstrap = new Bootstrap(); //指定工做的線程組 bootstrap = bootstrap.group(workGroup); //指定 Channel的類型。由於是客戶端, 所以使用了 NioSocketChannel。 bootstrap.channel(NioSocketChannel.class); /** * 設置連接的一些屬性 */ //下降延遲,禁用了禁用nagle算法。nagle算法受TCP延遲確認影響,會致使相繼兩次向鏈接發送請求包。 bootstrap.option(ChannelOption.TCP_NODELAY, true); //保持鏈接檢測對方主機是否崩潰,避免(服務器)永遠阻塞於TCP鏈接的輸入 bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //使用netty默認的解碼器會出現讀取不完整,不會執行channelRead方法。設置這個屬性惋惜保證Netty讀取的完整 bootstrap.option(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE); //設置數據處理器 bootstrap.handler(new ChannelClient()); //同步的連接 Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel(); channel.writeAndFlush(Unpooled.copiedBuffer("Hello Netty...".getBytes())); channel.closeFuture().sync(); workGroup.shutdownGracefully(); } }
一、使用長鏈接通道不斷開的形式進行通訊,也就是服務器和客戶端的通道一直處於開啓狀態,若是服務器性能足夠好,
而且客戶端數量也比較多的狀況下,推薦這種方式。
二、一次性批量提交數據,採用短鏈接方式。也就是說先把數據保存到本地臨時緩存區或者臨時表,當達到界值時進行一
次性批量提交,又或者根據定時任務輪詢提交。
三、使用一種特殊的長鏈接,在某一指定時間段內,服務器與某臺客戶端沒有任何通訊,則斷開鏈接。下次鏈接則是客戶
端向服務器發送請求的時候,再次創建鏈接。socket
1. Decoder 解碼器 負責將消息從字節或其餘序列形式轉成指定的消息對象。
2. Encoder 編碼器 將消息對象轉成字節或其餘序列形式在網絡上傳輸。
入站」ByteBuf讀取bytes後由 ToIntegerDecoder 進行解碼,而後將解碼後的消息存入List集合中,而後傳遞到ChannelPipeline
中的下一個ChannelInboundHandler。
解碼器:
1)ByteToMessageDecoder,需本身判斷ByteBuf讀取前是否有足夠的字節,不然會出現沾包的現象。
2)ReplayingDecoder,無需本身檢查字節長度,可是使用起來具備侷限性:
* 不是全部的操做都被ByteBuf支持,若是調用一個不支持的操做會拋出DecoderException。
* ByteBuf.readableBytes()大部分時間不會返回指望值。
3)MessageToMessageDecoder(message-to-message)
解碼器是用來處理入站數據,Netty提供了不少解碼器的實現,能夠根據需求詳細瞭解。
編碼器:
1)MessageToByteEncoder
2)MessageToMessageEncoder 須要將消息編碼成其餘的消息時可使用Netty提供的MessageToMessageEncoder抽象類
來實現。例如將Integer編碼成String。
想要解決TCP的粘包/拆包問題,首先要知道什麼是TCP粘包、拆包:
TCP是一個「流」協議,所謂流就是沒有界限的遺傳數據。你們能夠想象一下,若是河水就比如數據,他們是連成一片的,沒有
分界線,TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的具體狀況進行包的劃分,也就是說,在業務上一個完整的
包可能會被TCP分紅多個包進行發送,也可能把多個小包封裝成一個大的數據包發送出去,這就是所謂的粘包/拆包問題。
解決方案:
一、消息定長,例如每一個報文的大小固定爲200個字節,若是不夠,空位補空格。
二、在包尾部增長特殊字符進行分割,例如加回車等。
三、將消息分爲消息頭和消息體,在消息頭中包含表示消息總長度的字段,而後進行業務邏輯的處理。
Netty中解決TCP粘包/拆包的方法: 一、分隔符類:DelimiterBasedFrameDecoder(自定義分隔符) 二、定長:FixedLengthFrameDecoder