什麼是netty?先看下百度百科的解釋:java
爲何好多大公司都在使用netty框架?主要是基於netty框架的如下幾個特色決定的:編程
1)健壯性,2)功能齊全,3)可定製,4)擴展性bootstrap
傳統的RPC性能差,主要是因爲客戶端和遠程調用採用了同步阻塞IO線程,當客戶端的併發壓力增大後,同步阻塞會因爲頻繁的等待致使I/O線程堵塞,線程沒法高效的工做,IO處理能力天然會下降。影響性能的三個因素:第一,IO模型,IO模型在必定程度上決定了框架的性能。第2、協議,如:HTTP、TCP/IP等,協議選擇的不一樣,性能模型也不一樣,一般狀況下,內部私有協議的性能比較優,這是因爲內部設計決定的。第3、線程,數據報文的接收、讀取、編碼、解碼等,線程模型的不一樣,性能也不一樣。相比於傳統的RPC框架,netty的優勢主要體如今如下幾個方面:服務器
Netty是一個基於三層網絡架構模型的框架,三層網絡架構分析包括調度層、鏈條傳遞層以及業務邏輯層。網絡
NIO線程池組件{多線程
監聽網絡讀寫鏈接架構
業務調度處理併發
NIO,AIO,配合NIO通道NioSocketChannel組件框架
}異步
Netty經過內部select巡查機制,可以實現IO多路複用,經過把多個IO阻塞複用到同一個select的阻塞上,從而可以使系統即便在單線程的狀況下,也可以同時處理多個請求。這樣就使得netty實現了IO多路複用的優點,與傳統多線程相比,大大減小了系統的開銷,由於系統沒必要建立新的線程和銷燬線程了,減小了系統的維護難度,節省了資源。
ByteBuffer池化支持,不用手動切換標誌位,實現零拷貝。傳統的Socket讀寫,基本是使用堆內存進行,即jvm事先會把堆內存拷貝到內存中,而後再寫入Socket,而netty採用的是DIRECT BUFFERS,不須要通過jvm內存拷貝,在堆外內存直接進行Socket讀寫,這樣就少了一次緩衝區的內存拷貝,從而實現零拷貝。
2.Pipleline職責鏈條傳遞
攔截處理向前向後事件,外部傳入的消息包對象,有POJO信息抽象,上層也只須要處理邏輯,相似SpringIOC處理BeanDefince。不一樣的Handler節點的功能也不一樣,一般狀況下須要編碼解碼等,它能夠完成外部協議到內部POJO對象的轉化,這樣上層只須要關注業務邏輯,不須要知道底層的協議和線程模型,從而實現解耦。
3.構建邏輯業務處理單元
底層的協議隔離,上層處理邏輯框架並不須要關心底層協議是什麼。Netty框架的分層設計使得開發人員不須要關注協議框架的實現,只須要關注服務層的業務邏輯開發便可,實現了簡單化。
以前有個項目是基於傳統Socket和線程池的技術實現的,可是在高併發的時候發現併發能力不足,壓測的時候發現TPS達不到理想值,因此通過考慮,決定使用netty框架來解決此問題。一樣,netty框架也分爲客戶端和服務端,通過整理,先寫一個demo初探netty框架,下面是代碼的實現過程。
首先是服務端,服務端包含兩個方面,第1、服務端Server的主要做用就是經過輔助引導程序,設置NIO的鏈接方式處理客戶端請求,經過綁定特定端口、設定解碼方式以及監聽來實現整個線程的處理請求;第2、服務端Handler須要繼承ChannelInboundHandlerAdapter類,handler類的主要做用是讀取客戶端數據,處理業務,拋出異常,響應客戶端請求。代碼以下:
服務端Server:
public class Server { private static Log logger = LogFactory.getLog(Server.class); private int port; public Server(int port) { super(); this.port = port; } public void start(){ ServerBootstrap b = new ServerBootstrap();//引導輔助程序 EventLoopGroup group = new NioEventLoopGroup();//經過nio方式來接收鏈接和處理請求 try { b.group(group); b.channel(NioServerSocketChannel.class);//設置nio類型的channnel b.localAddress(new InetSocketAddress(port));//設置監聽端口 //b.option(ChannelOption.SO_BACKLOG, 2048); b.childHandler(new ChannelInitializer<SocketChannel>() {//有鏈接到達時會建立一個channel @Override protected void initChannel(SocketChannel ch) throws Exception { //註冊handler ch.pipeline().addLast(new ByteArrayDecoder()); ch.pipeline().addLast(new ByteArrayEncoder()); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new ServerHandler()); } });//.option(ChannelOption.SO_BACKLOG, 2048).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind().sync();//配置完成,開始綁定server,經過調用sync同步方法阻塞直到綁定成功 logger.info(Server.class.getName()+"開始監聽:"+f.channel().localAddress()); f.channel().closeFuture().sync();//應用程序會一直等待直到channel關閉 } catch (Exception e) { e.printStackTrace(); } finally { try { //關閉EventLoopGroup,釋放掉全部資源包括建立的線程 group.shutdownGracefully().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
服務端Handler
public class ServerHandler extends ChannelInboundHandlerAdapter { private static Log logger=LogFactory.getLog(ServerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx){ logger.info(ctx.channel().localAddress().toString()+"通道活躍...."); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.error(ctx.channel().localAddress().toString()+"通道不活躍...."); } /** * * 讀取客戶端傳過來的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //業務處理類 logger.info("開始業務處理...."); new SocketController(ctx,msg).run(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //出現異常,關閉連 logger.error("服務端出現異常:"+cause.getMessage(),cause); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("服務端完成請求!"); ctx.flush(); } }
客戶端代碼
客戶端主要是用來向服務端發送數據,一樣包含兩個方面,第1、Client主要經過設定端口和IP和服務器創建鏈接,進行數據包的編碼;第2、ClientHandler 須要繼承 SimpleChannelInboundHandler<ByteBuf>類,針對不一樣的傳輸方式,繼承不一樣的類,handler類一樣處理業務請求,響應服務端的請求。代碼以下:
客戶端Client:
public class Client { private static Log logger=LogFactory.getLog(Client.class); private String host; private int port; public Client(String host, int port) { super(); this.host = host; this.port = port; } public void connect(){ EventLoopGroup workGroup=new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.group(workGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { logger.info("客戶端觸發鏈接......"); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new ClientHandler()); } }); //客戶端開始鏈接 try { logger.info("鏈接到服務器......"); ChannelFuture future=bootstrap.connect(host,port).sync(); //等待鏈接關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ workGroup.shutdownGracefully(); } } }
客戶端Handler:
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private static Log logger=LogFactory.getLog(ClientHandler.class); /** * 向服務端發送消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info(ctx.channel().localAddress().toString()+"客戶點活躍..."); //向服務端寫字符串 logger.info("客戶端鏈接服務端,開始發送數據....."); String string ="hello server!"; System.out.println("發送數據爲:"+string); ByteBuf buf=ctx.alloc().buffer(4*string.length()); buf.writeBytes(string.getBytes()); ctx.writeAndFlush(buf); logger.info("發送完畢..."); } /** * 讀取服務端返回來的消息 */ @Override protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception { logger.info("開始接受服務端數據"); byte[] b=new byte[in.readableBytes()]; in.readBytes(b); String string=new String(b); logger.info("服務端發送的數據爲:"+string); in.release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.info("客戶端異常:"+cause.getMessage(),cause); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("客戶端完成請求...."); ctx.flush(); } }
服務端啓動:
public class ServerMain { private static Log logger=LogFactory.getLog(ServerMain.class); private static Server server =new Server(55550); public static void main(String[] args) { logger.info("服務端啓動......."); server.start(); } }
客戶端啓動類:
public class Test { private static Client client = new Client("127.0.0.1", 55550); public static void main(String[] args) throws UnknownHostException, IOException { client.connect(); } }
測試結果:
服務端:
客戶端:
以上只是一個netty框架初探的小Demo,學習使用netty框架的開始,這裏面涉及到了不少的技術以及很是多的組件,好比:Channels、Callbacks、Futures、Events和handlers等等,須要進一步的學習,另外,消息的編碼解碼、粘包、拆包的方式方法、消息格式的轉換以及報文格式大小限制都須要進一步的研究學習。