Netty 做爲業界最流行的 nio 框架之一,它的健壯性、功能、性能、可定製性、可擴展性都是數一數二的。html
優勢:java
目前存在的線程模型有:react
根據 Reactor 的數量和處理資源線程池的數量不一樣,有三種不一樣實現:web
主從 Reactor 多線程模型
作了必定的改進,其中主從 Reactor 多線程模型有多個 Reactor。模型特色編程
問題分析bootstrap
解決方案瀏覽器
基於I/O複用模型
:多個鏈接共用一個阻塞對象,應用程序只須要在一個阻塞對象等待,無需阻塞全部鏈接,當某個鏈接有新的數據能夠處理時,操做系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。基於線程池複用線程資源
:沒必要爲每個鏈接建立線程,將鏈接完成後的業務處理任務分配給線程進行處理,一個線程能夠處理多個鏈接的業務。核心組成緩存
Reactor
:在一個單獨的線程中運行,負責監聽和分發事件,分發給適當的處理程序對 I/O 事件做出反應。Handlers
:處理程序執行 I/O 事件要完成的實際事件。Reactor 經過調用適當的處理程序來響應 I/O 事件,處理程序非阻塞操做。優缺點:服務器
優勢
:模型簡單,無多線程、進程通訊、競爭的問題,所有由一個線程完成。缺點
:性能問題,只有一個線程沒法發揮出多核 CPU 的性能,Handler 在處理某鏈接業務時,整個進程沒法處理其餘鏈接事件,容易致使性能瓶頸。缺點
:可靠性問題,線程意外停止,或者進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部信息,節點故障。使用場景
:客戶端數量有限,業務處理快捷(例如 Redis 在業務處理的時間複雜度爲 O(1)的狀況)。優缺點:網絡
優勢
:能夠充分的利用多核 CPU 的處理能力。缺點
:多線程數據共享、訪問操做比較複雜,Reactor 處理全部的事件的監聽和響應,由於 Reactor 在單線程中運行,在高併發場景容易出現性能瓶頸。優缺點:
優勢
:父線程和子線程的職責明確,父線程只須要接收新鏈接,子線程完成後續業務處理。優勢
:父線程與子線程的數據交互簡單,Reactor 主線程是須要把新鏈接傳給子線程,子線程無需返回數據。缺點
:編程複雜度較高。單 Reactor 單線程:前臺接待員和服務員是同一我的,全程爲顧客服務。
單 Reactor 多線程:一個前臺接待員,多個服務員。
主從 Reactor 多線程:多個前臺接待員,多個服務員。
服務端端包含 1 個 Boss NioEventLoopGroup 和 1 個 Worker NioEventLoopGroup。
NioEventLoopGroup 至關於 1 個事件循環組,這個組裏包含多個事件循環 NioEventLoop,每一個 NioEventLoop 包含 1 個 Selector 和 1 個事件循環線程。
每一個 Boss NioEventLoop 循環執行的任務包含 3 步:
每一個 Worker NioEventLoop 循環執行的任務包含 3 步:
/** * @author jack */ public class SimpleServer { public static void main(String[] args) { //建立bossGroup , 只負責鏈接請求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); //建立workerGroup , 負責客戶端業務處理 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //建立服務端啓動對象,配置參數. ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workerGroup)//設置線程組 .channel(NioServerSocketChannel.class)//使用NioSocketChannel做爲服務端的通道實現 .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列獲得鏈接個數 .childOption(ChannelOption.SO_KEEPALIVE, true)//設置保持活動鏈接狀態 .childHandler(new ChannelInitializer<SocketChannel>() {//建立一個通道測試對象 //給pipeline設置處理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); //自定義handler } });//workerGroup的EventLoop對應的管道設置處理器 System.out.println("服務端準備就緒..."); //綁定一個端口而且同步,生成了一個channelFuture對象 ChannelFuture cf = serverBootstrap.bind(6667).sync(); //對關閉通道進行監聽 cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
/** * 服務端自定義handler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 讀取實際數據(這裏咱們能夠讀取客戶端發送的消息) * * @param ctx 上下文對象,含有管道pipeline,通道channel ,地址 * @param msg 客戶端發送的內容 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("客戶端發送: " + buf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址爲:" + ctx.channel().remoteAddress()); } /** * 讀取完成後 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客戶端", CharsetUtil.UTF_8)); } /** * 處理異常,通常是關閉通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
/** * @author jack */ public class SimpleClient { public static void main(String[] args) { //客戶端須要一個事件循環組 NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); //建立客戶端啓動對象 Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(clientLoopGroup)//設置線程組 .channel(NioSocketChannel.class)//設置客戶端通道實現類 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler());//加入自定義處理器 } }); System.out.println("客戶端已準備就緒"); //鏈接服務器 ChannelFuture cf = bootstrap.connect("127.0.0.1", 6667).sync(); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
/** * 客戶端自定義handler */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 通道準備就緒時調用 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服務端!", CharsetUtil.UTF_8)); } /** * 獲取客戶端回覆 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服務端回覆: " + buf.toString(CharsetUtil.UTF_8)); } /** * 處理異常,通常是關閉通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
任務隊列中的 task 有 3 種使用場景
用戶自定義的普通任務
ctx.channel().eventLoop().execute(() -> System.out.println("任務邏輯"));
用戶自定義的定時任務
ctx.channel().eventLoop().schedule(() -> System.out.println("任務邏輯..."), 60, TimeUnit.SECONDS);
例如在推送系統的業務線程裏面,根據用戶的標識,找到對應的 channel 引用,而後調用 write 類方法向該用戶推送消息,就會進入到這種場景。最終的 write 會提交到任務隊列中後被異步消費。
一個 Netty 應用一般由一個 BootStrap 開始,主要做用是配置整個 Netty 程序,串聯各個組件,Netty 中的 BootStrap 類是客戶端程序的啓動引導類,ServerBootStrap 是服務端啓動引導類。
經常使用方法:
方法 | 含義 |
---|---|
public ServerBootstrap group(EventLoopGroup parentGroup , EventLoopGroup childGroup) | 做用於服務器端,用來設置兩個 EventLoop |
public B group(EventLoopGroup group) | 做用於客戶端,用來設置一個 EventLoopGroup |
public B channel(Class<? extends C> channelClass) | 用來設置一個服務端的通道實現 |
public <T> B option(ChannelOption<T> option, T value) | 用來給 ServerChannel 添加配置 |
public <T> ServerBootStrap childOption (ChannelOption<T> childOption, T value) | 用來給接收到的通道添加配置 |
public ServerBootstrap childHandler (ChannelHandler childHandler) | 用來設置業務處理類(自定義 handler) |
public B handler(ChannelHandler handler) | Handler 則在服務器端自己 bossGroup 中使用 |
public ChannelFuture bind(int inetPort) | 用於服務端,設置佔用的端口號 |
public ChannelFuture connect (String inetHost,int inetPort) | 該方法用於客戶端,用來鏈接服務器 |
Netty 中全部操做都是異步的,不能當即得知消息是否被正確處理,但能夠過一會等它執行完成或直接註冊一個監聽器,具體實現經過 Future 和 ChannelFuture,它們能夠註冊一個監聽,當操做執行成功或失敗時,監聽會自動觸發註冊的監聽事件。
經常使用方法:
方法 | 含義 |
---|---|
Channel channel() | 返回當前正在進行 I/O 操做的通道 |
ChannelFuture sync() | 等待異步操做執行完畢 |
經常使用的 Channel 類型有:
方法 | 含義 |
---|---|
NioSocketChannel | 異步的客戶端 TCP Socket 鏈接 |
NioServerSocketChannel | 異步的服務端 TCP Socket 鏈接 |
NioDatagramChannel | 異步的 UDP 鏈接 |
NioStcpChannel | 異步的客戶端 Sctp 鏈接 |
NioSctpServerChannel | 異步的服務端 Sctp 鏈接 |
ChannelInboundHandler
: 用於處理 Channel 入站 I/O 事件。ChannelOutBoundHandler
:用於處理 Channel 出站 I/O 操做。適配器:
ChannelInboundHandlerAdapter
:用於處理出站 I/O 操做。ChanneInboundHandlerAdapter
:用於處理入站 I/O 操做。ChannelDuplexHandler
:用於處理入站和出站事件。以客戶端應用程序爲例:若是事件運動方向是客戶端服務器,咱們稱之爲「出站」,即客戶端發送的數據會經過 pipeline 中的一系列 ChannelOutboundHandler,並被這些 Handler 處理,反之稱爲「入站」。
ChannelPipeline 是一個重點:
經常使用方法:
方法 | 含義 |
---|---|
ChannelPipeline addFirst(ChannelHandler... handlers) | 把一個業務處理類,放到鏈表中頭結點的位置 |
ChannelPipeline addLast(ChannelHandler... handlers) | 把一個業務處理類,放到鏈表中尾結點的位置 |
經常使用方法:
方法 | 含義 |
---|---|
ChannelFuture close() | 關閉通道 |
ChannelOutboundInvoker flush() | 刷新 |
ChannelFuture writeAndFlush(Object msg) | 將數據寫入到 ChannelPipeline 中當前 ChannelHandler 的下一個 ChannelHandler 開始處理(出站) |
ChannelOption 參數以下:
ChannelOption.SO_BACKLOG
:對應 TCP/IP 協議 listen 函數中的 backlog 參數,用來初始化服務器可鏈接隊列大小。服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接,多個客戶端來的時候,服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog 參數指定了隊列的大小。ChannelOption.SO_KEEPALIVE
:一直保持鏈接活動狀態。ByteBuf buffer = Unpooled.buffer(10); ByteBuf buf =Unpooled.copiedBuffer("你好", CharsetUtil.UTF_8);
在 Netty 的 buffer 中,讀取 buffer 中的數據不須要經過 flip()方法進行狀態切換,其底層維護了 readerIndex 和 writerIndex
0 ——> readerIndex
:已讀區域。readerIndex ——> writerIndex
:未讀但可讀區域。writerIndex ——> capacity
:可寫區域。public abstract CharSequence getCharSequence(int index, int length, Charset charset)
:的做用是按照某一個範圍進行數據的讀取,index 表示起始位置,length 表示讀取長度,charset 表示字符編碼格式。Server 端
public class Server { private static final int port = 6667; public static void main(String[] args) { run(); } /** * 處理客戶端請求 */ public static void run() { //建立兩個線程組 NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossLoopGroup, workerLoopGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() //增長解碼器 .addLast("decoder", new StringDecoder()) //增長編碼器 .addLast("encoder", new StringEncoder()) //加入自定義業務處理器 .addLast(new ServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerLoopGroup.shutdownGracefully(); bossLoopGroup.shutdownGracefully(); } } }
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 定義一個channel 組,管理全部的channel , GlobalEventExecutor.INSTANCE是全局事件執行器,單例模式 */ private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 鏈接創建調用,將當前channel加入channelGroup * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //提示其餘客戶端當前客戶端已上線 channels.writeAndFlush("[客戶端]" + channel.remoteAddress() + "加入聊天!\n"); channels.add(channel); } /** * 表示channel處於活動狀態,提示上線 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + ":已上線!"); } /** * 非活動狀態提示 離線 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + ":已離線!"); } /** * 斷開鏈接 * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //提示其餘客戶端當前客戶端已斷開鏈接 channels.writeAndFlush("[客戶端]" + channel.remoteAddress() + "斷開鏈接!\n"); } /** * 讀取客戶端消息並轉發 * @param channelHandlerContext * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { Channel channel = channelHandlerContext.channel(); channels.forEach(ch -> { if (channel != ch) { ch.writeAndFlush("[客戶]: " + channel.remoteAddress() + sdf.format(new Date()) +" 說:" + msg + "\n"); } else { ch.writeAndFlush(sdf.format(new Date())+" 你說:" + msg + "\n"); } }); } /** * 異常關閉 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
Client 端
public class Client { private static final String HOST = "127.0.0.1"; private static final int PORT = 6667; public static void main(String[] args) { run(); } public static void run() { NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(clientLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() //增長解碼器 .addLast("decoder", new StringDecoder()) //增長編碼器 .addLast("encoder", new StringEncoder()) .addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); Channel channel = future.channel(); System.out.println("客戶端:" + channel.localAddress() + " 準備就緒"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.nextLine(); //經過channel發送到服務器端 channel.writeAndFlush(msg + "\r\n"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg.trim()); } }
客戶端同用上面的便可。記得端口對應
Server 端
public class Server { public static void main(String[] args) { //建立兩個線程組 NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossLoopGroup, workerLoopGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO))//在bossLoopGroup 增長日誌處理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 加入 IdleStateHandler // 第一個參數 多長時間沒讀 就發送心跳監測包看是否鏈接 // 第二個參數 多長時間沒寫 就發送心跳監測包看是否鏈接 // 第三個參數 多長時間沒有讀寫 就發送心跳監測包看是否鏈接 // 第四個參數 時間單位 //當 觸發後 會傳遞給管道中的下一個handler來處理,調用下一個handler的userEventTriggered pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS)); //加入空閒檢測處理的handler pipeline.addLast(new ServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workerLoopGroup.shutdownGracefully(); bossLoopGroup.shutdownGracefully(); } } }
ServerHandler
public class ServerHandler extends ChannelInboundHandlerAdapter { /** * @param ctx 上下文 * @param evt 事件 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ //將evt轉型 IdleStateEvent event = (IdleStateEvent) evt; SocketAddress socketAddress = ctx.channel().remoteAddress(); switch (event.state()){ case READER_IDLE: System.out.println(socketAddress + "發生讀空閒"); break; case WRITER_IDLE: System.out.println(socketAddress + "發生寫空閒"); break; case ALL_IDLE: System.out.println(socketAddress + "發生讀寫空閒"); break; } } } }
Http 短鏈接和長鏈接
Http 鏈接無狀態
WebSocket 簡介
代碼案例
Server 端
public class Server { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //由於基於http協議,故使用http的編解碼器 pipeline.addLast(new HttpServerCodec()); //過程當中以塊的方式寫,添加 ChunkedWriteHandler 處理器 pipeline.addLast(new ChunkedWriteHandler()); /** * 說明 * 一、http數據在傳輸過程當中是分段的,HttpObjectAggregator 能夠將多個數據段整合起來 * 二、所以,當瀏覽器發送大量數據時,就會發出屢次http請求 * */ pipeline.addLast(new HttpObjectAggregator(8192)); /** * 說明 * 一、對於 WebSocket,它的數據以 幀(Frame)的形式傳遞 * 二、能夠看到 WebSocketFrame 下面有6個子類 * 三、瀏覽器請求時 ws://localhost:7000/xxx 表示請求的uri * 四、WebSocketServerProtocolHandler 會把 http 協議升級爲ws協議 * 即保持長鏈接----------核心功能 * 五、如何升級——經過狀態瑪切換101 */ pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); //自定義的 handler 處理業務邏輯 pipeline.addLast(new TextWebSocketFrameHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
Handler
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { System.out.println("服務器收到消息:" + textWebSocketFrame.text()); //回覆消息 channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame("服務器時間:" + LocalDateTime.now() + " " + textWebSocketFrame.text())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded 被調用:" + ctx.channel().id().asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被調用:" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常發生"+cause.getMessage()); ctx.close(); } }
HTML
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <title>Title</title> </head> <body> <script> var socket; //判斷當前瀏覽器是否支持webSocket編程 if (window.WebSocket) { //go on socket = new WebSocket("ws://localhost:7000/hello"); //至關於channelRead0,收到服務器端回送的消息 socket.onmessage = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + ev.data; }; //至關於鏈接開啓 socket.onopen = function (ev) { var rt = document.getElementById("responseText"); rt.value = "鏈接開啓"; }; socket.onclose = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + "鏈接關閉"; }; } else { alert("當前瀏覽器不支持webSocket"); } //發送消息到服務器 function send(message) { if (!window.socket) { //先判斷socket是否建立好了 return; } if (socket.readyState == WebSocket.OPEN) { //經過socket發送消息 socket.send(message); } else { alert("鏈接沒有開啓"); } } </script> <form onsubmit="return false"> <textarea name="message" style="height: 300px; width: 300px"></textarea> <input type="button" value="發送消息" onclick="send(this.form.message.value)" /> <textarea id="responseText" style="height: 300px; width: 300px" ></textarea> <input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''" /> </form> </body> </html>
Netty 自帶的 ObjectEncoder 和 ObjectDecoder 能夠用於實現 POJO 對象或其餘業務對象的編解碼,其底層使用的還是 java 的序列化技術,存在如下問題:
第一步:idea 加入插件 protoc
第二步:加入 maven 依賴
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency>
第三步:編寫 proto 文件
syntax = "proto2"; //版本 option java_outer_classname = "StudentPOJO"; //生成的外部類名稱,同時文件名 //protobuf以message的形式管理數據 message Student{ //會在 studentPOJO 外部類生成一個內部類 Student,它是真正發送的POJO對象 required int32 id = 1; //表示 Student 類中有一個屬性 名字爲id,類型爲 int32(protoType),1表示屬性的序號 required string name = 2; }
根據網上教程安裝 protobuf。生成 StudnetPOJO 文件,這裏就不展現代碼了,比較長。
Server 端
public class Server { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //在pipeline中加入ProtoBufferDecoder //指定對哪種對象進行解碼 pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance())); pipeline.addLast(new ServerHandler()); } }); ChannelFuture cf = serverBootstrap.bind(6668).sync(); //給 cf 添加監聽器,監聽感興趣的事件 cf.addListener((ChannelFutureListener) future -> { if (cf.isSuccess()) { System.out.println("綁定端口 6668 成功"); } else { System.out.println(cf.cause()); } }); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception { System.out.println("客戶端發送: id = " + msg.getId() + " 名字 = " + msg.getName()); } }
Client 端
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //發送一個 student 對象到服務器 StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1000).setName("Jack").build(); ctx.writeAndFlush(student); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服務器回送消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服務器端地址:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ClientHandler
public class Client { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //在pipeline中加入ProtoBufferEncoder ChannelPipeline pipeline = ch.pipeline(); //編碼 pipeline.addLast("encoder", new ProtobufEncoder()); pipeline.addLast(new ClientHandler()); } }); System.out.println("客戶端已準備就緒"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventExecutors.shutdownGracefully(); } } }
以入站爲例,對於每一個從入站 Channel 讀取的消息,這個方法會被調用。隨後,他將調用由解碼器所提供的 decode()方法進行解碼,並將已經解碼的字節轉發給 ChannelPipeline 中的下一個 ChannelInboundHandler。
消息入站後,會通過 ChannelPipeline 中的一系列 ChannelHandler 處理,這些 handler 中有 Netty 已經實現的,也有咱們從新實現的自定義 handler,但它們都須要實現 ChannelInboundHandler 接口;即消息入站後所通過的 handler 鏈是由一系列 ChannelInboundHandler 組成的,其中第一個通過的 handler 就是解碼器 Decoder;消息出站與入站相似,但消息出站須要通過一系列 ChannelOutboundHandler 的實現類,最後一個通過的 handler 是編碼器 Encoder。
因爲不知道遠程節點是否會發送一個完整的信息,TCP 可能出現粘包和拆包的問題。ByteToMessageDecoder 的做用就是對入站的數據進行緩衝,直至數據準備好被處理。
ByteToMessageDecoder 示例分析:
public class ToIntgerDecoder extends ByteToMessageDecoder{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{ if (in.readableBytes() >= 4) { out.add(in.readint()); } } }
在此實例中,假設經過 Socket 發送了 8 字節數據,每次入站從 ByteBuf 中讀取個 4 字節,將其解碼爲一個 int,並加入一個 List 中。當沒有更多的元素能夠被添加到該 List 中時,表明這次發送的數據已發送完成,List 中的全部內容會被髮送給下一個 ChannelInboundHandler。Int 在被添加到 List 中時,會被自動裝箱爲 Intger,調用 readInt()方法前必須驗證所輸入的 ByteBuf 是否有足夠的數據。
代碼示例:
Server 端
public class Server { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()); //自定義初始化類 ChannelFuture future = serverBootstrap.bind(7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
ServerInitializer 自定義初始化類
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //入站的handler解碼 pipeline.addLast(new ByteToLongDecoder()).addLast(new ServerInboundHandler()); } }
ByteToLongDecoder 自定義解碼器
public class ByteToLongDecoder extends ByteToMessageDecoder { /** * @param channelHandlerContext 上下文對象 * @param byteBuf 入站的ByteBuf * @param list List集合,將解碼後的數據傳給下一個Handler * @throws Exception */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // Long 大於 8個字節 if (byteBuf.readableBytes() >= 8) { list.add(byteBuf.readLong()); } } }
ServerInboundHandler 自定義 handler,處理業務
public class ServerInboundHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long aLong) throws Exception { System.out.println("從客戶端讀取:" + aLong); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client 端
public class Client { public static void main(String[] args) { NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(clientLoopGroup) .channel(NioSocketChannel.class) .handler(new ClientInitializer());//自定義初始化類 ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
ClientInitializer 客戶端自定義初始化類
public class ClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //出站,數據進行編碼 pipeline.addLast(new LongToByteEncoder()).addLast(new ClientHandler()); } }
LongToByteEncoder 編碼器
public class LongToByteEncoder extends MessageToByteEncoder<Long> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Long aLong, ByteBuf byteBuf) throws Exception { System.out.println("開始編碼,msg = " + aLong); byteBuf.writeLong(aLong); } }
ClientHandler 自定義 handler,處理邏輯
public class ClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服務器的ip : " + ctx.channel().remoteAddress()); System.out.println("收到服務器數據 : " + msg); } /** * 發送數據 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client發送數據"); ctx.writeAndFlush(12345678L); } }
LineBasedFrameDecoder
:它使用行尾控制字符(\n 或\r\n)做爲分割符來解析數據;DelimiterBasedFrameDecoder
:使用自定義的特殊字符做爲分隔符;HttpObjectDecoder
:一個 HTTP 數據的解碼器;LengthFieldBasedFrameDecoder
:經過指定長度來標識整包信息,這樣就能夠自動的處理粘包和半包信息Server 端
public class Server { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()); //自定義初始化類 ChannelFuture future = serverBootstrap.bind(7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
ServerInitializer
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ServerHandler()); } }
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { byte[] buffer = new byte[buf.readableBytes()]; buf.readBytes(buffer); //將buffer轉換成字符串 String str = new String(buffer, CharsetUtil.UTF_8); System.out.println("服務端接收到數據:" + str); System.out.println("服務端接收次數:" + ++count); ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), CharsetUtil.UTF_8); ctx.writeAndFlush(byteBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client 端
public class Client { public static void main(String[] args) { NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(clientLoopGroup) .channel(NioSocketChannel.class) .handler(new ClientInitializer());//自定義初始化類 ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { clientLoopGroup.shutdownGracefully(); } } }
ClientInitializer
public class ClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ClientHandler()); } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String str = new String(bytes, CharsetUtil.UTF_8); System.out.println("客戶端接收到數據: " + str); System.out.println("客戶端接收次數:" + ++count); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //發送十條數據 for (int i = 0; i < 10; i++) { ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, CharsetUtil.UTF_8); ctx.writeAndFlush(byteBuf); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
能夠看到在第一個客戶端沒有發生問題,啓動第二個客戶端後就發生了拆包問題。
使用自定義協議+編解碼器實現具體功能:
具體代碼
客戶端與服務器主程序與以前相同
MessageProtocol 自定義協議
public class MessageProtocol { private int length; //關鍵 private byte[] context; public int getLength() { return length; } public byte[] getContext() { return context; } public void setLength(int length) { this.length = length; } public void setContext(byte[] context) { this.context = context; }
MessageEncoder 自定義編碼器
public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MessageEncoder encode方法被調用"); out.writeInt(msg.getLength()); out.writeBytes(msg.getContext()); } }
MessageDecoder.自定義解碼器
public class MessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MessageDecoder decode方法被調用"); //將獲得的二進制字節碼轉換爲 MessageProtocol 數據包 int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); //封裝成MessageProtocol對象,放入out中交給下一個handler處理 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLength(length); messageProtocol.setContext(content); out.add(messageProtocol); } }
在 ServerInitializer 和 ClientInitializer 中增長 addList()編解碼器
ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { //接收數據並處理 int len = msg.getLength(); byte[] context = msg.getContext(); System.out.println("服務端接收到信息以下"); System.out.println("數據長度:"+len); System.out.println("內容:"+new String(context, CharsetUtil.UTF_8)); System.out.println("服務器接收到協議包數量 = "+(++this.count)); //回覆消息 String response = UUID.randomUUID().toString(); int responseLen = response.getBytes("utf-8").length; byte[] responseBytes = response.getBytes("utf-8"); //構建一個協議包 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLength(responseLen); messageProtocol.setContext(responseBytes); ctx.writeAndFlush(messageProtocol); } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //使用客戶端循環發送10條數據 for (int i=0;i<5;i++){ String mes = "今天下雨,出門帶傘"; byte[] content = mes.getBytes(Charset.forName("utf-8")); int length = mes.getBytes(Charset.forName("utf-8")).length; //建立協議包 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLength(length); messageProtocol.setContext(content); ctx.writeAndFlush(messageProtocol); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常消息 = "+cause.getMessage()); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { int len = msg.getLength(); byte[] msgContext = msg.getContext(); System.out.println("客戶端接收的消息以下:"); System.out.println("消息長度 = "+len); System.out.println("消息內容 = "+new String(msgContext, CharsetUtil.UTF_8)); System.out.println("客戶端接收消息的數量 = "+(++this.count)); } }