Netty 是一個異步的,事件驅動的網絡通訊框架,用於高性能的基於協議的客戶端和服務端的開發。javascript
異步指的是會當即返回,並不知道到底發送過去沒有,成功沒有,通常都會使用監聽器來監聽返回。java
事件驅動是指開發者只須要關注事件對應的回調方法便可,好比 channel active,inactive,read 等等。web
網絡通訊框架就不用解釋了,不少你很是熟悉的組件都使用了 netty,好比 spark,dubbo 等等。編程
第一個簡單的例子,使用 Netty 實現一個 http 服務器,客戶端調用一個沒有參數的方法,服務端返回一個 hello world。bootstrap
Netty 裏面大量的代碼都是對線程的處理和 IO 的異步的操做。瀏覽器
package com.paul; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Server { public static void main(String[] args) throws InterruptedException { //定義兩個線程組,事件循環組,能夠類比與 Tomcat 就是死循環,不斷接收客戶端的鏈接 // boss 線程組不斷從客戶端接受鏈接,但不處理,由 worker 線程組對鏈接進行真正的處理 // 一個線程組其實也能完成,推薦使用兩個 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 服務端啓動器,能夠輕鬆的啓動服務端的 channel ServerBootstrap serverBootstrap = new ServerBootstrap(); //group 方法有兩個,一個接收一個參數,另外一個接收兩個參數 // childhandler 是咱們本身寫的請求處理器 serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class) .childHandler(new ServerInitializer()); //綁定端口 ChannelFuture future = serverBootstrap.bind(8011).sync(); //channel 關閉的監聽 future.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.paul; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面能夠有不少 handler,一層層過濾的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); //HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的組合,編碼和解碼的 h handler pipeline.addLast("httpServerCodec", new HttpServerCodec()); pipeline.addLast("handler", new ServerHandler()); } }
package com.paul; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if(httpObject instanceof HttpRequest) { ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //單純的調用 write 只會放到緩存區,不會真的發送 channelHandlerContext.writeAndFlush(response); } } }
咱們在 SimpleChannelInboundHandler 裏分析一下,先看它繼承的 ChannelInboundHandlerAdapter 裏面的事件回調方法,包括通道註冊,解除註冊,Active,InActive等等。緩存
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); }
執行順序爲 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。服務器
Netty 自己並非遵循 servlet 規範的。Http 是基於請求和響應的無狀態協議。Http 1.1 是有 keep-alived 參數的,若是3秒沒有返回,則服務端主動關閉瞭解,Http 1.0 則是請求完成直接返回。websocket
Netty 的鏈接會被一直保持,咱們須要本身去處理這個功能。網絡
在服務端發送完畢數據後,能夠在服務端關閉 Channel。
ctx.channel.close();
對於 Http 編程來講,咱們實現了服務端就能夠了,客戶端徹底可使用瀏覽器或者 CURL 工具來充當。可是對於 Socket 編程來講,客戶端也得咱們本身實現。
服務器端:
Server 類於上面 Http 服務器那個同樣,在 ServerInitoalizer 有一些變化
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面能夠有不少 handler,一層層過濾的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); // TCP 粘包 拆包 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); // 字符串編碼,解碼 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ServerHandler()); } }
public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()+","+msg); ctx.channel().writeAndFlush("from server:" + UUID.randomUUID()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客戶端:
public class Client { public static void main(String[] args) throws InterruptedException { //客戶端不須要兩個 group,只須要一個就夠了,直接鏈接服務端發送數據就能夠了 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); //服務器端既可使用 handler 也可使用 childhandler, 客戶端通常使用 handler //對於 服務端,handler 是針對 bossgroup的,childhandler 是針對 workergorup 的 bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ClientInitializer()); ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync(); channelFuture.channel().closeFuture().sync(); }finally { eventLoopGroup.shutdownGracefully(); } } }
public class ClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道,管道里面能夠有不少 handler,一層層過濾的柑橘 ChannelPipeline pipeline = socketChannel.pipeline(); // TCP 粘包 拆包 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); // 字符串編碼,解碼 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ClientHandler()); } }
public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()+","+msg); System.out.println("client output:"+ msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().writeAndFlush("23123"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Server 端:
public class ServerHandler extends SimpleChannelInboundHandler<String> { //定義 channel group 來管理全部 channel private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "加入\n"); channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "離開\n"); //這個 channel 會被自動從 channelGroup 裏移除 } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + "上線"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + "離開"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client 端:
BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for(;;){ channel.writeAndFlush(br.readLine() + "\r\n"); }
集羣之間各個節點的通訊,主從節點之間須要進行數據同步,每當主節點的數據發生變化時,經過異步的方式將數據同步到從節點,同步方式能夠用日誌等等,所以主從節點之間不是實時一致性而是最終一致性。
節點與節點之間如何進行通訊那?這種主從模式是須要互相之間有長鏈接的,這樣來肯定對方還活着,實現方式是互相之間定時發送心跳數據包。若是發送幾回後對方仍是沒有響應的話,就能夠認爲對方已經掛掉了。
回到客戶端與服務端的模式,有人可能會想,客戶端斷開鏈接後服務端的 handlerRemoved 等方法不是能感知嗎?還要心跳幹什麼哪?
真實狀況其實很是複雜,好比手機客戶端和服務端進行一個長鏈接,客戶端沒有退出應用,客戶端開了飛行模型,或者強制關機,此時雙方是感知不到鏈接已經斷掉了,或者說須要很是長的時間才能感知到,這是咱們不想看到的,這時就須要心跳了。
來看一個示例:
其餘的代碼仍是和上面的同樣,咱們就不列出來了,直接進入主題,看不一樣的地方:
服務端
// Netty 爲了支持心跳的 IdleStateHandler,空閒狀態監測處理器。 pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));
來看看 IdleStateHandler 的說明
/* * Triggers an IdleStateEvent when a Channel has not performed read, write, or both * operation for a while * 當一個 channel 一斷時間沒有進行 read,write 就觸發一個 IdleStateEvent */ public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); //三個參數分別爲多長時間沒進行讀,寫或者讀寫操做則觸發 event。 }
觸發 event 後咱們編寫這個 event 對應的處理器。
public class MyHandler extends ChannelInboundHandlerAdapter{ //觸發某個事件後這個方法就會被調用 //一個 channelhandlerContext 上下文對象,另外一個是事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; String eventType = null; switch(event.state()){ case READER_IDLE: eventType = "讀空閒"; case WRITER_IDLE: eventType = "寫空閒"; case ALL_IDLE: eventType = "讀寫空閒"; } }else{ //繼續將事件向下一個 handler 傳遞 ctx. } } }
WebSocket 是一種規範,是 HTML5 規範的一部分,主要是解決 Http 協議自己存在的問題。能夠實現瀏覽器和服務端的長鏈接,鏈接頭信息只在創建鏈接時發送一次。是在 Http 協議之上構建的,好比請求鏈接實際上是一個 Http 請求,只不過裏面加了一些 WebSocket 信息。也能夠用在非瀏覽器場合,好比 app 上。
Http 是一種無狀態的基於請求和響應的協議,意思是必定是客戶端想服務端發送一個請求,服務端給客戶端一個響應。Http 1.0 在服務端給客戶端響應後鏈接就斷了。Http 1.1 增長可 keep-alive,服務端能夠和客戶端在短期以內保持一個鏈接,某個事件以內服務端和客戶端能夠複用這個連接。在這種狀況下,網頁聊天就是實現不了的,服務端的數據推送是沒法實現的。
之前有一些假的長鏈接技術,好比輪詢,缺點和明顯,這裏就不細說了。
Http 2.0 實現了長鏈接,可是這不在咱們討論範圍以內。
針對服務端,Tomcat 新版本,Spring,和 Netty 都實現了對 Websocket 的支持。
使用 Netty 對 WebSocket 的支持來實現長鏈接
其餘的部分仍是同樣的,先來看服務端的 WebSocketChannelInitializer。
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{ //須要支持 websocket,咱們在 initChannel 是作一點改動 @Override protected void initChannel(SocketChannel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); //由於 websocket 是基於 http 的,因此要加入 http 相應的編解碼器 pipeline.addLast(new HttpServerCodec()); //以塊的方式進行寫的處理器 pipeline.addLast(new ChunkedWriteHandler()); // 進行 http 聚合的處理器,將 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者 // FullHttpResponse //HttpObjectAggregator 在基於 netty 的 http 編程使用的很是多,粘包拆包。 pipeline.addLast(new HttpObjectAggregator(8192)); // 針對 websocket 的類,完成 websocket 構建的全部繁重工做,負責握手,以及心跳(close,ping, // pong)的處理, websocket 經過 frame 幀來傳遞數據。 // BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame, // PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。 // /ws 是 context_path,websocket 協議標準,ws://server:port/context_path pipeline.addLast(new WebSocketServerProcotolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler()); } }
// websocket 協議須要用幀來傳遞參數 public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{ System.out.println("收到消息:"+ msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器返回")); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ System.out.println("handlerAdded" + ctx.channel().id.asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{ System.out.println("handlerRemoved" + ctx.channel().id.asLongText()); } }
客戶端咱們直接經過瀏覽器的原聲 JS 來寫
<script type="text/javascript"> var socket; if(window.WebSocket){ socket = new WebSocket("ws://localhost:8899/ws"); socket.onmessage = function(event){ alert(event.data); } socket.onopen = function(event){ alert("鏈接開啓"); } socket.onclose = function(event){ alert("鏈接關閉"); } }else{ alert("瀏覽器不支持 WebSocket"); } function send(message){ if(!window.WebSocket){ return; } if(socket.readyState == WebSocket.OPEN){ socket.send(message); } } </script>
咱們在瀏覽器中經過 F12 看看 Http 協議升級爲 WebSocket 協議的過程。