Netty概述:
一、netty是基於Java NIO的網絡應用框架,client-server框架
二、Netty是一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持,
做爲一個異步NIO框架,Netty的全部IO操做都是異步非阻塞的,
經過Future-Listener機制,用戶能夠方便的主動獲取或者經過通知機制得到IO操做結果。
三、做爲當前最流行的NIO框架,Netty在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,
一些業界著名的開源組件也基於Netty的NIO框架構建。
java
Netty建立步驟:web
NIO通信服務端步驟:
一、建立ServerSocketChannel,爲它配置非阻塞模式
二、綁定監聽,配置TCP參數,錄入backlog大小等
三、建立一個獨立的IO線程,用於輪詢多路複用器Selector
四、建立Selector,將以前的ServerSocketChannel註冊到Selector上,並設置監聽標識位SelectionKey.ACCEPT
五、啓動IO線程,在循環體中執行Selector.select()方法,輪詢就緒的通道
六、當輪詢處處於就緒的通道時,須要進行判斷操做位,若是是ACCEPT狀態,說明是新的客戶端介入,則調用accept方法接受新的客戶端。
七、設置新接入客戶端的一些參數,並將其通道繼續註冊到Selector之中。設置監聽標識等
八、若是輪詢的通道操做位是READ,則進行讀取,構造Buffer對象等
九、更細節的還有數據沒發送完成繼續發送的問題
Netty實現通信的步驟:
一、建立兩個NIO線程組,一個專門用來網絡事件處理(接受客戶端鏈接),另外一個則進行網絡通信讀寫
二、建立一個ServerBootstrap對象,配置Netty的一系列參數,例如接受傳入數據的緩存大小等。
三、建立一個實際處理數據的類ChannelInitializer,進行初始化的準備工做,好比設置傳入數據的字符集,格式,實現實際處理數據的接口。
四、綁定端口,執行同步阻塞方法等待服務器啓動便可。
當對於NIO模型,netty簡單、健壯、性能穩定,並且這幾步都是模板式開發,之後能夠直接用,開發只需專一實際處理數據類的實現。
Netty最佳實踐(數據通信、心跳檢測)
netty服務最好能夠單獨做爲一個項目,固然也能夠與web項目集成在一塊兒發佈到tomcat,
這樣好處是能夠用到web項目中的service方法,可是web項目8080關閉,netty監聽的端口號也關閉了
因此netty能夠打成jar包運行,固然若是要用到service層的代碼,也能夠將service層的代碼打成jar包
給netty業務類使用。
netty通信的方式:
①使用長鏈接通道不斷開的形式進行通訊,也就是服務器和客戶端的通道一直處於開啓狀態,若是服務器的
性能比較好,並且客戶端的數量也很少的狀況下,能夠考慮這種方式
②一次性批量提交數據,採用短鏈接的方式,也就是咱們把數據保存在本地臨時緩衝區或者臨時表中,
當達到臨界值時進行一次性批量提交,又或者根據定時任務輪詢提交,這種狀況下弊端是作不到
實時性傳輸,在實時性要求不高的程序中能夠採用
③採用一種特殊的長鏈接,在指定某一段時間以內,服務端和某臺客戶端沒有任何通信,則斷開鏈接,
數據庫
下次若是客戶端要向服務端發送數據時,再次創建鏈接。json
但有兩個因素要考慮:bootstrap
一、如何在超時(即服務端和客戶端沒有任何通訊)後關閉通道?關閉後如何再次鏈接?數組
二、客戶端宕機,無需考慮,下次客戶端重啓後能夠與服務端創建鏈接,可是服務器宕機怎麼辦?緩存
服務端代碼Server:tomcat
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); //線程組:用來處理網絡事件處理(接受客戶端鏈接) EventLoopGroup cGroup = new NioEventLoopGroup(); //線程組:用來進行網絡通信讀寫 //Bootstrap用來配置參數 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) //註冊服務端channel /** * BACKLOG用於構造服務端套接字ServerSocket對象,標識當服務器請求處理線程全滿時, * 用於臨時存放已完成三次握手的請求的隊列的最大長度。若是未設置或所設置的值小於1,將使用默認值50。 * 服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接,多個客戶端來的時候, * 服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog參數指定了隊列的大小 */ .option(ChannelOption.SO_BACKLOG, 1024) //設置日誌 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { //marshaliing的編解碼操做,要傳輸對象,必須編解碼 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //5s沒有交互,就會關閉channel sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); //服務端業務處理類 } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
客戶端代碼:服務器
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; public class Client { private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf ; private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉響應的通道,主要爲減少服務端資源佔用) sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); //客戶端業務處理類 } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("遠程服務器已經鏈接, 能夠進行數據交換.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture(){ //若是管道沒有被開啓或者被關閉了,那麼重連 if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ //客戶端發送的數據 UserParam request = new UserParam(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("數據信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } //當5s沒有交互,就會異步關閉channel cf.channel().closeFuture().sync(); //再模擬一次傳輸 new Thread(new Runnable() { @Override public void run() { try { ChannelFuture cf = c.getChannelFuture(); //System.out.println(cf.channel().isActive()); //System.out.println(cf.channel().isOpen()); //再次發送數據 UserParam request = new UserParam(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("數據信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子線程結束."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("斷開鏈接,主線程結束.."); } }
服務端處理類:網絡
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接受客戶端對象 UserParam user = (UserParam)msg; System.out.println("客戶端發來的消息 : " + user.getId() + ", " + user.getName() + ", " + user.getRequestMessage()); //給客戶端返回對象 UserData response = new UserData(); response.setId(user.getId()); response.setName("response" + user.getId()); response.setResponseMessage("響應內容" + user.getId()); ctx.writeAndFlush(response); //處理完畢,關閉服務端 //ctx.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客戶端處理類:
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { UserData user = (UserData)msg; System.out.println("服務器返回的消息 : " + user.getId() + ", " + user.getName() + ", " + user.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客戶端傳輸的參數對象UserParam -- > id name requestMessage
服務端傳輸的參數對象UserData -- > id name responseMessage
心跳檢測:
Server代碼,Client代碼是模板代碼,基本都同樣,不一樣是業務處理的方法。
Server業務處理類ServerHeartBeatHandler:
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.HashMap; public class ServerHeartBeatHandler extends ChannelHandlerAdapter { /** * key:ip value:auth ** * 擁有的客戶端列表,從數據庫中或者配置文件中讀取 */ private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>(); //模擬受權的key private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("192.168.1.200", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg){ //System.out.println(msg); String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { //接受客戶端發來的他機器的性能參數 RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("當前主機ip爲: " + info.getIp()); System.out.println("當前主機cpu狀況: "); HashMap<String, Object> cpu = info.getCpuPercMap(); System.out.println("總使用率: " + cpu.get("combined")); System.out.println("用戶使用率: " + cpu.get("user")); System.out.println("系統使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空閒率: " + cpu.get("idle")); System.out.println("當前主機memory狀況: "); HashMap<String, Object> memory = info.getMemoryMap(); System.out.println("內存總量: " + memory.get("total")); System.out.println("當前內存使用量: " + memory.get("used")); System.out.println("當前內存剩餘量: " + memory.get("free")); System.out.println("--------------------------------------------"); //通知客戶端消息已收到 ctx.writeAndFlush("info received!"); }else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } } }
Client業務處理類ClienHeartBeattHandler:
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import java.net.InetAddress; import java.util.HashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.hyperic.sigar.CpuPerc; import org.hyperic.sigar.Mem; import org.hyperic.sigar.Sigar; public class ClienHeartBeattHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture<?> heartBeat; //主動向服務器發送認證信息 private InetAddress addr ; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; //證書 String auth = ip + "," + key; ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String)msg; if(SUCCESS_KEY.equals(ret)){ // 握手成功,主動發送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); //ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); //cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMap<String, Object> cpuPercMap = new HashMap<String, Object>(); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap<String, Object> memoryMap = new HashMap<String, Object>(); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } } }
netty編解碼技術:
java序列化技術,序列化目的:
①網絡傳輸(網絡協議是基於二進制的,內存中的參數的值要序列化成二進制的形式)
②對象持久化(對象必須在JVM中存活,不可能超過JVM的生命週期)
雖然咱們可使用java進行對象序列化,netty去傳輸,可是java序列化的硬傷太多:
1.沒法跨語言。這應該是java序列化最致命的問題了。
因爲java序列化是java內部私有的協議,其餘語言不支持,致使別的語言沒法反序列化,這嚴重阻礙了它的應用。
關於跨語言問題,也就是對象傳輸,通常都採用json字符串。
2.序列後的碼流太大。java序列化的大小是二進制編碼的5倍多!
3.序列化性能過低。java序列化的性能只有二進制編碼的6.17倍,可見java序列化性能實在太差了。
咱們判斷一個編碼框架的優劣主要從如下幾個方面:
1.是否支持跨語言,支持語種是否豐富
2.編碼後的碼流
3.編解碼的性能
4.類庫是否小巧,API使用是否方便
5.使用者開發的工做量和難度。
java序列化前3條變現太差,致使在遠程服務調用中不多用它
主流的編解碼框架:
①JBoss的Marshalling包:
對jdk默認的序列化進行了優化,又保持跟java.io.Serializable接口的兼容,同時增長了一些可調的參數和附加特性,
而且這些參數和特性可經過工廠類的配置
1.可拔插的類解析器,提供更加便捷的類加載定製策略,經過一個接口便可實現定製。
2.可拔插的對象替換技術,不須要經過繼承的方式。
3.可拔插的預約義類緩存表,能夠減小序列化的字節數組長度,提高經常使用類型的對象序列化性能。
4.無須實現java.io.Serializable接口
5.經過緩存技術提高對象的序列化性能。
6.使用很是簡單
②google的Protobuf
③基於Protobuf的Kyro
④MessagePack框架
Marshalling工具類:
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; public final class MarshallingCodeCFactory { /** * 建立Jboss Marshalling解碼器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立了MarshallingConfiguration對象,配置了版本號爲5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據marshallerFactory和configuration建立provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 建立Jboss Marshalling編碼器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
RPC(Remote Procedure Call):
RPC是指遠程過程調用,也就是說兩臺服務器A,B,一個應用部署在A服務器上,想要調用B服務器上應用提供的函數/方法,
因爲不在一個內存空間,不能直接調用,須要經過網絡來表達調用的語義和傳達調用的數據。
好比遠程調用方法:Employee getEmployeeByName(String fullName)
一、要解決通信的問題,主要是經過在客戶端和服務器之間創建TCP鏈接,遠程過程調用的全部交換的數據都在這個鏈接裏傳輸。
鏈接能夠是按需鏈接,調用結束後就斷掉,也能夠是長鏈接,多個遠程過程調用共享同一個鏈接。
二、要解決尋址的問題,也就是說,A服務器上的應用怎麼告訴底層的RPC框架,
如何鏈接到B服務器(如主機或IP地址)以及特定的端口,方法的名稱名稱是什麼,這樣才能完成調用。
好比基於Web服務協議棧的RPC,就要提供一個endpoint URI,或者是從UDDI服務上查找。
若是是RMI調用的話,還須要一個RMI Registry來註冊服務的地址。
三、要解決編碼的問題,當A服務器上的應用發起遠程過程調用時,方法的參數須要經過底層的網絡協議如TCP傳遞到B服務器,
因爲網絡協議是基於二進制的,內存中的參數的值要序列化成二進制的形式,也就是序列化(Serialize)或編組(marshal),
經過尋址和傳輸將序列化的二進制發送給B服務器。
四、要解決解碼的問題,B服務器收到請求後,須要對參數進行反序列化(序列化的逆操做),恢復爲內存中的表達方式,
而後找到對應的方法(尋址的一部分)進行本地調用,而後獲得返回值。
五、返回值還要發送回服務器A上的應用,也要通過序列化的方式發送,服務器A接到後,再反序列化,
恢復爲內存中的表達方式,交給A服務器上的應用
爲何RPC呢?就是沒法在一個進程內,甚至一個計算機內經過本地調用的方式完成的需求,好比不一樣的系統間的通信,甚至不一樣的組織間的通信。因爲計算能力須要橫向擴展,須要在多臺機器組成的集羣上部署應用。而Netty框架不侷限於RPC,更多的是做爲一種網絡協議的實現框架,因爲RPC須要高效的網絡通訊,就可能選擇以Netty做爲基礎 --------------------- 本文來自 wive 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/javadhh/article/details/66477423?utm_source=copy