<!-- more -->java
Nowadays we use general purpose applications or libraries to communicate with each other. For example, we often use an HTTP client library to retrieve information from a web server and to invoke a remote procedure call via web services. However, a general purpose protocol or its implementation sometimes does not scale very well. It is like how we don't use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What's required is a highly optimized protocol implementation that is dedicated to a special purpose. For example, you might want to implement an HTTP server that is optimized for AJAX-based chat application, media streaming, or large file transfer. You could even want to design and implement a whole new protocol that is precisely tailored to your need. Another inevitable case is when you have to deal with a legacy proprietary protocol to ensure the interoperability with an old system. What matters in this case is how quickly we can implement that protocol while not sacrificing the stability and performance of the resulting application.
這是netty的官方介紹,大概意思就是:
咱們常常但願咱們的應用可以和其它應用互相通訊。例如,咱們常用http請求去查詢信息或者使用rpc調用webservice,可是對於這種特定的協議(http,ftp等)來講,是不易於專門針對
本身應用程序進行擴展的。比方說咱們不會使用http協議去傳輸大文件,郵件,即時通信(金融信息),這須要對現有協議作出較大的優化!這樣咱們就可使用netty定製屬於你本身的協議!git
這裏借用知乎上一個回答:程序員
做爲一個學Java的,若是沒有研究過Netty,那麼你對Java語言的使用和理解僅僅停留在表面水平,會點SSH,寫幾個MVC,訪問數據庫和緩存,這些只是初等Java程序員乾的事。若是你要進階,想了解Java服務器的深層高階知識,Netty絕對是一個必需要過的門檻。有了Netty,你能夠實現本身的HTTP服務器,FTP服務器,UDP服務器,RPC服務器,WebSocket服務器,Redis的Proxy服務器,MySQL的Proxy服務器等等。若是你想知道Nginx是怎麼寫出來的,若是你想知道Tomcat和Jetty,Dubbo是如何實現的,若是你也想實現一個簡單的Redis服務器,那都應該好好理解一下Netty,它們高性能的原理都是相似的。
while ture events = takeEvents(fds) // 獲取事件,若是沒有事件,線程就休眠 for event in events { if event.isAcceptable { doAccept() // 新連接來了 } elif event.isReadable { request = doRead() // 讀消息 if request.isComplete() { doProcess() } } elif event.isWriteable { doWrite() // 寫消息 } } }
NIO的流程大體就是上面的僞代碼描述的過程,跟實際真實的代碼有較多差別,不過對於初學者,這樣理解也是足夠了。Netty是創建在NIO基礎之上,Netty在NIO之上又提供了更高層次的抽象。在Netty裏面,Accept鏈接可使用單獨的線程池去處理,讀寫操做又是另外的線程池來處理。Accept鏈接和讀寫操做也可使用同一個線程池來進行處理。而請求處理邏輯既可使用單獨的線程池進行處理,也能夠跟放在讀寫線程一塊處理。線程池中的每個線程都是NIO線程。用戶能夠根據實際狀況進行組裝,構造出知足系統需求的併發模型。Netty提供了內置的經常使用編解碼器,包括行編解碼器[一行一個請求],前綴長度編解碼器[前N個字節定義請求的字節長度],可重放解碼器[記錄半包消息的狀態],HTTP編解碼器,WebSocket消息編解碼器等等Netty提供了一些列生命週期回調接口,當一個完整的請求到達時,當一個鏈接關閉時,當一個鏈接創建時,用戶都會收到回調事件,而後進行邏輯處理。Netty能夠同時管理多個端口,可使用NIO客戶端模型,這些對於RPC服務是頗有必要的。Netty除了能夠處理TCP Socket以外,還能夠處理UDP Socket。在消息讀寫過程當中,須要大量使用ByteBuffer,Netty對ByteBuffer在性能和使用的便捷性上都進行了優化和抽象。總之,Netty是Java程序員進階的必備神奇。若是你知其然,還想知其因此然,必定要好好研究下Netty。若是你以爲Java枯燥無謂,Netty則是從新開啓你對Java興趣大門的鑰匙。
總結:程序員水平進階的利器!github
note: 對於本例中除了很是重要的核心類會講解外,其餘類不會過多講解,本章只作入門,其它章節會重點講解!
咱們已經知道了netty的做用(靈活優化定製你本身的協議),以及爲何要學習netty。那接下來咱們就一步一步來定製本身的協議最後完成聊天室!web
既然咱們取名print協議,那就是打印的意思:服務端接受客服端的信息而且打印!
首先咱們編寫一個ChannelInboundHandlerAdapter,用於處理接收到的消息,咱們首先分析下這個類的做用,繼承關係以下:
它的做用簡單歸納就是:用於處理 I/O事件的處理器,因此本例咱們天然是用它來處理消息,因而乎有了以下類:PrintServerHandler:數據庫
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) ByteBuf byteBuf = (ByteBuf) msg; System.out.println(byteBuf.toString(Charset.forName("utf-8"))); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
收到消息後打印,接着繼續編寫一個啓動類,用於啓動一個開啓咱們本身協議的服務,PrintServerApp:緩存
public class EchoServerApp { private int port; public EchoServerApp(int port) { this.port = port; } public void run() throws Exception { NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(); NioEventLoopGroup workLoopGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossLoopGroup, workLoopGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossLoopGroup.shutdownGracefully(); workLoopGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new EchoServerApp(8080).run(); } }
啓動。而後咱們使用win自帶的telnet工具來測試(控制面板-》程序和控制-》開啓或關閉window功能,勾選telnet)。打開cmd,輸入服務器
telnet localhost 8080
測試成功,咱們完成了第一個demo,實現了本身的print協議。接下來咱們把客戶端也換成netty編寫。目的:啓動客戶端,獲取服務端時間,叫time協議。架構
首先同上面同樣,寫一個TimeServerHandler:併發
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf timeBuf = ctx.alloc().buffer(); timeBuf.writeBytes(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()).getBytes()); ChannelFuture channelFuture = ctx.writeAndFlush(timeBuf); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { assert channelFuture == future; // ctx.close(); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
啓動類同上,接下來,編寫客戶端TimeClientHandler:
public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf byteBuf = (ByteBuf) msg; int length = byteBuf.readableBytes(); byte[] buff = new byte[1024]; byteBuf.readBytes(buff, 0, length); System.out.println("current time: " + new String(buff, 0, length)); ctx.close(); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
分別啓動服務端,客戶端。
測試結果如圖,客戶端啓動後拿到了服務端的時間,這樣咱們就實現了本身的time protocol,接下來繼續擴展,編寫一個客戶端與服務端通訊的聊天室:
首先,客戶端與服務端通訊的信息咱們抽象出一個對象,Message以及工具類:
@Data @NoArgsConstructor @AllArgsConstructor public class Message { private String username; private Date sentTime; private String msg; }
public class Utils { public static String encodeMsg(Message message) { return message.getUsername() + "~" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message.getSentTime())) + "~" + message.getMsg(); } public static String formatDateTime(Date time) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time); } public static Date parseDateTime(String time) { try { return new SimpleDateFormat("yyyy-MM-dd Hh:mm:ss").parse(time); } catch (ParseException e) { return null; } } public static void printMsg(Message msg) { System.out.println("================================================================================================="); System.out.println(" " + Utils.formatDateTime(msg.getSentTime()) + " "); System.out.println(msg.getUsername() + ": " + msg.getMsg()); System.out.println("================================================================================================="); } }
三個屬性分別表明用戶名,發送時間,消息內容,接着編寫一個用於處理輸入消息的handler,用於將byte消息轉換成Message,ServerTransferMsgHandler:
public class ServerTransferMsgHandler extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { String totalMsg = in.readCharSequence(in.readableBytes(), Charset.forName("utf-8")).toString(); String[] content = totalMsg.split("~"); out.add(new Message(content[0], Utils.parseDateTime(content[1]), content[2])); } }
接着,編寫一個處理接收消息的Handler,用於打印客戶端發送過來的消息,ServerMsgHandler:
public class ServerMsgHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("jsbintask-client進入聊天室。"); Message message = new Message(Constants.SERVER, new Date(), "Hello, I'm jsbintask-server side."); ByteBuf buffer = ctx.alloc().buffer(); String content = Utils.encodeMsg(message); buffer.writeBytes(content.getBytes()); ctx.writeAndFlush(buffer); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg1) throws Exception { try { Message msg = (Message) msg1; Utils.printMsg(msg); Scanner scanner = new Scanner(System.in); System.out.print("jsbintask-server, please input msg: "); String reply = scanner.nextLine(); Message message = new Message(Constants.SERVER, new Date(), reply); ctx.writeAndFlush(message); } finally { ReferenceCountUtil.release(msg1); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
知道注意的是,channelActive方法,在客戶端連接的時候,率先給客戶端發送了一條消息,最後,在編寫一個用戶將服務端Message轉成Byte消息的handler,MessageEncoder:
public class MessageEncoder extends MessageToByteEncoder<Message> { @Override protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception { ByteBuf buffer = ctx.alloc().buffer(); String content = Utils.encodeMsg(message); buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8)); ctx.writeAndFlush(buffer); } }
最後,編寫server端啓動類,ChatroomServerApp:
public class ChatroomServerApp { public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new MessageEncoder(), new ServerTransferMsgHandler(), new ServerMsgHandler()); } }) .option(ChannelOption.SO_BACKLOG, 1024 * 10) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
啓動Server,繼續編寫ChatroomClient。
同server端同樣,client的關鍵也是handler,ClientMsgHandler以下:
public class ClientMsgHandler extends SimpleChannelInboundHandler<Message> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } @Override protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { try { Utils.printMsg(msg); Scanner scanner = new Scanner(System.in); System.out.print("jsbintask-client, please input msg: "); String reply = scanner.nextLine(); Message message = new Message(Constants.CLIENT, new Date(), reply); ByteBuf buffer = ctx.alloc().buffer(); String content = message.getUsername() + "~" + Utils.formatDateTime(message.getSentTime()) + "~" + message.getMsg(); buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8)); ctx.writeAndFlush(buffer); } finally { ReferenceCountUtil.release(msg); } } }
接着,一樣有將byte轉換成Message的轉換器,CliengMsgHandler:
public class ClientTransferMsgHandler extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { byte[] buff = new byte[2024]; int length = in.readableBytes(); in.readBytes(buff, 0, length); String totalMsg = new String(buff, 0, length, StandardCharsets.UTF_8); String[] content = totalMsg.split("~"); out.add(new Message(content[0], Utils.parseDateTime(content[1]), content[2])); } }
最後,啓動類ChatroomClientApp:
public class ChatroomClientApp { public static void main(String[] args) throws Exception { NioEventLoopGroup workLoopGroup = new NioEventLoopGroup(); try { Bootstrap clientBootstrap = new Bootstrap(); clientBootstrap.group(workLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientTransferMsgHandler(), new ClientMsgHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = clientBootstrap.connect("localhost", 8888).sync(); channelFuture.channel().closeFuture().sync(); } finally { workLoopGroup.shutdownGracefully(); } } }
一樣啓動client,觀察控制檯。首先,server端提示client進入了聊天室,而且客戶端看到了server端發送過來的」招呼「信息:
這樣就表明咱們的連接創建完畢,接着,客戶端,服務端相互發送消息:
如圖,這樣,咱們的聊天室也就編寫成功了,完整demo以下:
本章,咱們開啓了學習netty的大門,首先介紹了netty,爲何要學netty,而且經過三個案例一步一步實現了聊天室,成功踏入了netty的大門,下一章,咱們就來學習一下netty的架構!
例子源碼:https://github.com/jsbintask22/netty-learning.git,歡迎fork,star學習修改。
本文原創地址:https://jsbintask.cn/2019/01/30/netty/netty-chatroom/,轉載請註明出處。若是你以爲本文對你有用,歡迎關注,分享!