Netty快速入門java
Netty 是一個基於 JAVA NIO 類庫的異步通訊框架,它的架構特色是:異步非阻塞、基於事件驅動、高性能、高可靠性和高可定製性。算法
1.分佈式開源框架中dubbo、Zookeeper,RocketMQ底層rpc通信使用就是netty。編程
2.遊戲開發中,底層使用netty通信。json
在本小節,咱們總結下爲何不建議開發者直接使用JDK的NIO類庫進行開發的緣由:bootstrap
1) NIO的類庫和API繁雜,使用麻煩,你須要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;數組
2) 須要具有其它的額外技能作鋪墊,例如熟悉Java多線程編程,由於NIO編程涉及到Reactor模式,你必須對多線程和網路編程很是熟悉,才能編寫出高質量的NIO程序;緩存
3) 可靠性能力補齊,工做量和難度都很是大。例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處 n理等等,NIO編程的特色是功能開發相對容易,可是可靠性能力補齊工做量和難度都很是大;安全
4) JDK NIO的BUG,例如臭名昭著的epoll bug,它會致使Selector空輪詢,最終致使CPU 100%。官方聲稱在JDK1.6版本的update18修復了該問題,可是直到JDK1.7版本該問題仍舊存在,只不過該bug發生機率下降了一些而已,它並無被根本解決。該BUG以及與該BUG相關的問題單以下:服務器
class ServerHandler extends SimpleChannelHandler { /** * 通道關閉的時候觸發 */ @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); } /** * 必須是鏈接已經創建,關閉通道的時候纔會觸發. */ @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelDisconnected(ctx, e); System.out.println("channelDisconnected"); } /** * 捕獲異常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { super.exceptionCaught(ctx, e); System.out.println("exceptionCaught"); } /** * 接受消息 */ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { super.messageReceived(ctx, e); // System.out.println("messageReceived"); System.out.println("服務器端收到客戶端消息:"+e.getMessage()); //回覆內容 ctx.getChannel().write("好的"); } } // netty 服務器端 public class NettyServer { public static void main(String[] args) { // 建立服務類對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 建立兩個線程池 分別爲監聽監聽端口 ,nio監聽 ExecutorService boos = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); // 設置工程 並把兩個線程池加入中 serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, worker)); // 設置管道工廠 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); //將數據轉換爲string類型. pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("serverHandler", new ServerHandler()); return pipeline; } }); // 綁定端口號 serverBootstrap.bind(new InetSocketAddress(9090)); System.out.println("netty server啓動...."); } }
package com.itmayiedu; import java.net.InetSocketAddress; import java.util.Scanner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; class ClientHandler extends SimpleChannelHandler { /** * 通道關閉的時候觸發 */ @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); } /** * 必須是鏈接已經創建,關閉通道的時候纔會觸發. */ @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelDisconnected(ctx, e); System.out.println("channelDisconnected"); } /** * 捕獲異常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { super.exceptionCaught(ctx, e); System.out.println("exceptionCaught"); } /** * 接受消息 */ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { super.messageReceived(ctx, e); // System.out.println("messageReceived"); System.out.println("服務器端向客戶端回覆內容:"+e.getMessage()); //回覆內容 // ctx.getChannel().write("好的"); } } public class NettyClient { public static void main(String[] args) { System.out.println("netty client啓動..."); // 建立客戶端類 ClientBootstrap clientBootstrap = new ClientBootstrap(); // 線程池 ExecutorService boos = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, worker)); clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); // 將數據轉換爲string類型. pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("clientHandler", new ClientHandler()); return pipeline; } }); //鏈接服務端 ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 9090)); Channel channel = connect.getChannel(); System.out.println("client start"); Scanner scanner= new Scanner(System.in); while (true) { System.out.println("請輸輸入內容..."); channel.write(scanner.next()); } } }
Maven座標網絡
<dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> <version>3.3.0.Final</version> </dependency>
Netty5.0用法
class ServerHandler extends ChannelHandlerAdapter { /** * 當通道被調用,執行該方法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接收數據 String value = (String) msg; System.out.println("Server msg:" + value); // 回覆給客戶端 「您好!」 String res = "好的..."; ctx.writeAndFlush(Unpooled.copiedBuffer(res.getBytes())); } } public class NettyServer { public static void main(String[] args) throws InterruptedException { System.out.println("服務器端已經啓動...."); // 1.建立2個線程,一個負責接收客戶端鏈接, 一個負責進行 傳輸數據 NioEventLoopGroup pGroup = new NioEventLoopGroup(); NioEventLoopGroup cGroup = new NioEventLoopGroup(); // 2. 建立服務器輔助類 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) // 3.設置緩衝區與發送區大小 .option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8080).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
建立客戶端
class ClientHandler extends ChannelHandlerAdapter { /** * 當通道被調用,執行該方法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接收數據 String value = (String) msg; System.out.println("client msg:" + value); } } public class NettyClient { public static void main(String[] args) throws InterruptedException { System.out.println("客戶端已經啓動...."); // 建立負責接收客戶端鏈接 NioEventLoopGroup pGroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8080).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes())); // 等待客戶端端口號關閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); } }
Maven座標
<dependencies> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>1.3.19.GA</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>1.3.18.GA</version> <scope>test</scope> </dependency> </dependencies>
TCP粘包、拆包問題解決方案
一個完整的業務可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這個就是TCP的拆包和封包問題。
下面能夠看一張圖,是客戶端向服務端發送包:
1. 第一種狀況,Data1和Data2都分開發送到了Server端,沒有產生粘包和拆包的狀況。
2. 第二種狀況,Data1和Data2數據粘在了一塊兒,打成了一個大的包發送到Server端,這個狀況就是粘包。
3. 第三種狀況,Data2被分離成Data2_1和Data2_2,而且Data2_1在Data1以前到達了服務端,這種狀況就產生了拆包。
因爲網絡的複雜性,可能數據會被分離成N多個複雜的拆包/粘包的狀況,因此在作TCP服務器的時候就須要首先解決拆包/
解決辦法
消息定長,報文大小固定長度,不夠空格補全,發送和接收方遵循相同的約定,這樣即便粘包了經過接收方編程實現獲取定長報文也能區分。
sc.pipeline().addLast(new FixedLengthFrameDecoder(10)); |
包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符做爲報文分隔符,接收方經過特殊分隔符切分報文區分。
ByteBuf buf = Unpooled.copiedBuffer("_mayi".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); |
將消息分爲消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段
序列化(serialization)就是將對象序列化爲二進制形式(字節數組),通常也將序列化稱爲編碼(Encode),主要用於網絡傳輸、數據持久化等;
反序列化(deserialization)則是將從網絡、磁盤等讀取的字節數組還原成原始對象,以便後續業務的進行,通常也將反序列化稱爲解碼(Decode),主要用於網絡傳輸對象的解碼,以便完成遠程調用。
我知道的第一種序列化協議就是Java默認提供的序列化機制,須要序列化的Java對象只須要實現 Serializable / Externalizable 接口並生成序列化ID,這個類就可以經過 ObjectInput 和 ObjectOutput 序列化和反序列化,若對Java默認的序列化協議不瞭解,或是遺忘了,請參考:序列化詳解
可是Java默認提供的序列化有不少問題,主要有如下幾個缺點:
沒法跨語言:我認爲這對於Java序列化的發展是致命的「失誤」,由於Java序列化後的字節數組,其它語言沒法進行反序列化。;
序列化後的碼流太大::相對於目前主流的序列化協議,Java序列化後的碼流太大;
序列化的性能差:因爲Java序列化採用同步阻塞IO,相對於目前主流的序列化協議,它的效率很是差。
影響序列化性能的關鍵因素
序列化後的碼流大小(網絡帶寬的佔用);
序列化的性能(CPU資源佔用);
是否支持跨語言(異構系統的對接和開發語言切換)。
(1)定義:
XML(Extensible Markup Language)是一種經常使用的序列化和反序列化協議, 它歷史悠久,從1998年的1.0版本被普遍使用至今。
(2)優勢
人機可讀性好
可指定元素或特性的名稱
(3)缺點
序列化數據只包含數據自己以及類的結構,不包括類型標識和程序集信息。
類必須有一個將由 XmlSerializer 序列化的默認構造函數。
只能序列化公共屬性和字段
不能序列化方法
文件龐大,文件格式複雜,傳輸佔帶寬
(4)使用場景
當作配置文件存儲數據
實時數據轉換
(1)定義:
JSON(JavaScript Object Notation, JS 對象標記) 是一種輕量級的數據交換格式。它基於 ECMAScript (w3c制定的js規範)的一個子集, JSON採用與編程語言無關的文本格式,可是也使用了類C語言(包括C, C++, C#, Java, JavaScript, Perl, Python等)的習慣,簡潔和清晰的層次結構使得 JSON 成爲理想的數據交換語言。
(2)優勢
先後兼容性高
數據格式比較簡單,易於讀寫
序列化後數據較小,可擴展性好,兼容性好
與XML相比,其協議比較簡單,解析速度比較快
(3)缺點
數據的描述性比XML差
不適合性能要求爲ms級別的狀況
額外空間開銷比較大
(4)適用場景(可替代XML)
跨防火牆訪問
可調式性要求高的狀況
基於Web browser的Ajax請求
傳輸數據量相對小,實時性要求相對低(例如秒級別)的服務
(1)定義
Fastjson是一個Java語言編寫的高性能功能完善的JSON庫。它採用一種「假定有序快速匹配」的算法,把JSON Parse的性能提高到極致。
(2)優勢
接口簡單易用
目前java語言中最快的json庫
(3)缺點
過於注重快,而偏離了「標準」及功能性
代碼質量不高,文檔不全
(4)適用場景
協議交互
Web輸出
Android客戶端
(1)定義:
Thrift並不只僅是序列化協議,而是一個RPC框架。它可讓你選擇客戶端與服務端之間傳輸通訊協議的類別,即文本(text)和二進制(binary)傳輸協議, 爲節約帶寬,提供傳輸效率,通常狀況下使用二進制類型的傳輸協議。
(2)優勢
序列化後的體積小, 速度快
支持多種語言和豐富的數據類型
對於數據字段的增刪具備較強的兼容性
支持二進制壓縮編碼
(3)缺點
使用者較少
跨防火牆訪問時,不安全
不具備可讀性,調試代碼時相對困難
不能與其餘傳輸層協議共同使用(例如HTTP)
沒法支持向持久層直接讀寫數據,即不適合作數據持久化序列化協議
(4)適用場景
分佈式系統的RPC解決方案
(1)定義:
Avro屬於Apache Hadoop的一個子項目。 Avro提供兩種序列化格式:JSON格式或者Binary格式。Binary格式在空間開銷和解析性能方面能夠和Protobuf媲美,Avro的產生解決了JSON的冗長和沒有IDL的問題
(2)優勢
支持豐富的數據類型
簡單的動態語言結合功能
具備自我描述屬性
提升了數據解析速度
快速可壓縮的二進制數據形式
能夠實現遠程過程調用RPC
支持跨編程語言實現
(3)缺點
對於習慣於靜態類型語言的用戶不直觀
(4)適用場景
在Hadoop中作Hive、Pig和MapReduce的持久化數據格式
(1)定義
protocol buffers 由谷歌開源而來,在谷歌內部久經考驗。它將數據結構以.proto文件進行描述,經過代碼生成工具能夠生成對應數據結構的POJO對象和Protobuf相關的方法和屬性。
(2)優勢
序列化後碼流小,性能高
結構化數據存儲格式(XML JSON等)
經過標識字段的順序,能夠實現協議的前向兼容
結構化的文檔更容易管理和維護
(3)缺點
須要依賴於工具生成代碼
支持的語言相對較少,官方只支持Java 、C++ 、Python
(4)適用場景
對性能要求高的RPC調用
具備良好的跨防火牆的訪問屬性
適合應用層對象的持久化
protostuff 基於protobuf協議,但不須要配置proto文件,直接導包即
Jboss marshaling 能夠直接序列化java類, 無須實java.io.Serializable接口
Message pack 一個高效的二進制序列化格式
Hessian 採用二進制協議的輕量級remoting onhttp工具
kryo 基於protobuf協議,只支持java語言,須要註冊(Registration),而後序列化(Output),反序列化(Input)
時間
空間
分析上圖知:
XML序列化(Xstream)不管在性能和簡潔性上比較差。
Thrift與Protobuf相比在時空開銷方面都有必定的劣勢。
Protobuf和Avro在兩方面表現都很是優越。
不一樣的場景適用的序列化協議:
對於公司間的系統調用,若是性能要求在100ms以上的服務,基於XML的SOAP協議是一個值得考慮的方案。
基於Web browser的Ajax,以及Mobile app與服務端之間的通信,JSON協議是首選。對於性能要求不過高,或者以動態類型語言爲主,或者傳輸數據載荷很小的的運用場景,JSON也是很是不錯的選擇。
對於調試環境比較惡劣的場景,採用JSON或XML可以極大的提升調試效率,下降系統開發成本。
當對性能和簡潔性有極高要求的場景,Protobuf,Thrift,Avro之間具備必定的競爭關係。
對於T級別的數據的持久化應用場景,Protobuf和Avro是首要選擇。若是持久化後的數據存儲在Hadoop子項目裏,Avro會是更好的選擇。
因爲Avro的設計理念偏向於動態類型語言,對於動態語言爲主的應用場景,Avro是更好的選擇。
對於持久層非Hadoop項目,以靜態類型語言爲主的應用場景,Protobuf會更符合靜態類型語言工程師的開發習慣。
若是須要提供一個完整的RPC解決方案,Thrift是一個好的選擇。
若是序列化以後須要支持不一樣的傳輸層協議,或者須要跨防火牆訪問的高性能場景,Protobuf能夠優先考慮。
public final class MarshallingCodeCFactory { /** * 建立Jboss Marshalling解碼器MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 建立Jboss Marshalling編碼器MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }