Netty(二)入門

在上篇《Netty(一)引題》中,分別對AIO,BIO,PIO,NIO進行了簡單的闡述,並寫了簡單的demo。可是這裏說的簡單,我也只能呵呵了,特別是NIO、AIO(我全手打的,好麻煩)。
在開始netty開發TimeServer以前,先回顧下NIO進行服務端開發的步驟:html

  • 1.建立ServerSocketChannel,配置它爲非阻塞;
  • 2.綁定監聽,配置TCP參數,如backlog大小;
  • 3.建立獨立的IO線程,用於輪詢多路複用器Selector;
  • 4.建立Selector,將以前建立的ServerSocketChannel註冊到Selector上,監聽SelectionKey.ACEPT;
  • 5.啓動IO線程,在循環體中執行Selector.select()方法,輪詢就緒的Channel;
  • 6.當輪詢處處於就緒狀態的Channel時,須要對其進行判斷,若是是OP_ACCEPT狀態,說明是新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端;
  • 7.設置新接入的客戶端連接SocketChannel爲非阻塞模式,配置其餘的一些TCP參數;
  • 8.將SocketChannel註冊到Selector,監聽OP_READ操做位;
  • 9.若是輪詢的Channel爲OP_READ,則說明SocketChannel中有新的就緒的數據包須要讀取,則構造ByteBuffer對象,讀取數據包;
  • 10.若是輪詢的Channel爲OP_WRITE,說明還有數據沒有發送完成,須要繼續發送。

一個簡單的NIO程序,須要通過繁瑣的十多步操做才能完成最基本的消息讀取和發送,這也是我學netty的緣由,下面就看看使用netty是如何輕鬆寫服務器的。
在這裏,我使用IDEA 14 + Maven用netty寫上篇中TimeServer的程序。這裏我直接用Maven的pom.xml來直接下載netty的包(Maven是對依賴進行管理,支持自動化的測試、編譯、構建的項目管理工具,具體的Maven請讀者自行百度、google搜索)。java

/* TimeServer */git

 1 public class TimeServer {
 2     public void bind(int port)throws Exception{
 3         /* 配置服務端的NIO線程組 */
 4         // NioEventLoopGroup類 是個線程組,包含一組NIO線程,用於網絡事件的處理
 5         // (實際上它就是Reactor線程組)。
 6         // 建立的2個線程組,1個是服務端接收客戶端的鏈接,另外一個是進行SocketChannel的
 7         // 網絡讀寫
 8         EventLoopGroup bossGroup = new NioEventLoopGroup();
 9         EventLoopGroup WorkerGroup = new NioEventLoopGroup();
10 
11         try {
12             // ServerBootstrap 類,是啓動NIO服務器的輔助啓動類
13             ServerBootstrap b = new ServerBootstrap();
14             b.group(bossGroup,WorkerGroup)
15                     .channel(NioServerSocketChannel.class)
16                     .option(ChannelOption.SO_BACKLOG,1024)
17                     .childHandler(new ChildChannelHandler());
18 
19             // 綁定端口,同步等待成功
20             ChannelFuture f= b.bind(port).sync();
21 
22             // 等待服務端監聽端口關閉
23             f.channel().closeFuture().sync();
24         }finally {
25             // 釋放線程池資源
26             bossGroup.shutdownGracefully();
27             WorkerGroup.shutdownGracefully();
28         }
29     }
30 
31     private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
32         @Override
33         protected  void initChannel(SocketChannel arg0)throws Exception{
34             arg0.pipeline().addLast(new TimeServerHandler());
35         }
36     }
37 
38     public static void main(String[]args)throws Exception{
39         int port = 8080;
40         if(args!=null && args.length>0){
41             try {
42                 port = Integer.valueOf(args[0]);
43             }
44             catch (NumberFormatException ex){}
45         }
46         new TimeServer().bind(port);
47     }
48 }
 1 public class TimeServerHandler extends ChannelHandlerAdapter{
 2     // 用於網絡的讀寫操做
 3     @Override
 4     public void channelRead(ChannelHandlerContext ctx,Object msg)
 5             throws Exception{
 6         ByteBuf buf = (ByteBuf)msg;
 7         byte[]req = new byte[buf.readableBytes()];
 8         buf.readBytes(req);
 9         String body = new String(req,"UTF-8");
10         System.out.println("the time server order : " + body);
11 
12         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(
13                 System.currentTimeMillis()).toString():"BAD ORDER";
14         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
15         ctx.write(resp);
16     }
17 
18     @Override
19     public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
20         ctx.flush();   // 它的做用是把消息發送隊列中的消息寫入SocketChannel中發送給對方
21         // 爲了防止頻繁的喚醒Selector進行消息發送,Netty的write方法,並不直接將消息寫入SocketChannel中
22         // 調用write方法只是把待發送的消息發到緩衝區中,再調用flush,將發送緩衝區中的消息
23         // 所有寫到SocketChannel中。
24     }
25 
26     @Override
27     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
28         ctx.close();
29     }
30 }

 

/* TimeClient */github

 1 public class TimeClient {
 2     public void connect(String host,int port)throws Exception{
 3         // 配置服務端的NIO線程組
 4         EventLoopGroup group = new NioEventLoopGroup();
 5 
 6         try {
 7             // Bootstrap 類,是啓動NIO服務器的輔助啓動類
 8             Bootstrap b = new Bootstrap();
 9             b.group(group).channel(NioSocketChannel.class)
10                     .option(ChannelOption.TCP_NODELAY,true)
11                     .handler(new ChannelInitializer<SocketChannel>() {
12                         @Override
13                         public void initChannel(SocketChannel ch)
14                                 throws Exception{
15                             ch.pipeline().addLast(new TimeClientHandler());
16                         }
17                     });
18 
19             // 發起異步鏈接操做
20             ChannelFuture f= b.connect(host,port).sync();
21 
22             // 等待客服端鏈路關閉
23             f.channel().closeFuture().sync();
24         }finally {
25             group.shutdownGracefully();
26         }
27     }
28 
29     public static void main(String[]args)throws Exception{
30         int port = 8080;
31         if(args!=null && args.length>0){
32             try {
33                 port = Integer.valueOf(args[0]);
34             }
35             catch (NumberFormatException ex){}
36         }
37         new TimeClient().connect("127.0.0.1",port);
38     }
39 }
 1 public class TimeClientHandler extends ChannelHandlerAdapter{
 2 
 3     // 寫日誌
 4     private static final Logger logger =
 5             Logger.getLogger(TimeClientHandler.class.getName());
 6 
 7     private final ByteBuf firstMessage;
 8 
 9     public TimeClientHandler(){
10         byte[] req = "QUERY TIME ORDER".getBytes();
11         firstMessage = Unpooled.buffer(req.length);
12         firstMessage.writeBytes(req);
13     }
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx,Object msg)
17             throws Exception{
18         ByteBuf buf = (ByteBuf)msg;
19         byte[]req = new byte[buf.readableBytes()];
20         buf.readBytes(req);
21         String body = new String(req,"UTF-8");
22         System.out.println("Now is : " + body);
23     }
24 
25     @Override
26     public void channelActive(ChannelHandlerContext ctx){
27         // 當客戶端和服務端創建tcp成功以後,Netty的NIO線程會調用channelActive
28         // 發送查詢時間的指令給服務端。
29         // 調用ChannelHandlerContext的writeAndFlush方法,將請求消息發送給服務端
30         // 當服務端應答時,channelRead方法被調用
31         ctx.writeAndFlush(firstMessage);
32     }
33 
34     @Override
35     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
36         logger.warning("message from:"+cause.getMessage());
37         ctx.close();
38     }
39 }

 

本例子沒有考慮讀半包的處理,對於功能演示和測試,本例子沒問題,可是若是進行性能或者壓力測試,就不能正常工做了。在下一節會弄正確處理半包消息的例子。面試

項目在源碼在src/main/java/Netty/下,分爲客戶端和服務端。服務器

源碼下載:GitHub地址:https://github.com/orange1438/Netty_Course網絡

題外話:雖然文章全是我純手打,沒任何複製,可是文章大多數內容來自《Netty權威指南》,我也是順便學習的。以前我作C++服務端,由於狗血的面試C++,結果公司系統竟然是java的,無耐我所在的重慶,C++少得可憐,因此只有在公司裏學java了。固然,有epoll,select,事件驅動,TCP/IP概念的小夥伴來講,學這個netty,仍是挺簡單的。異步

 
做者:orange1438
出處:http://www.cnblogs.com/orange1438/
本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。
相關文章
相關標籤/搜索