Netty4.x用戶指導(1)3個HelloWorld小例子

題記
html

最近對netty有了興趣,如今官方推薦版本是netty4.*,可是縱觀網絡,大部分都是關於netty3.x的知識。java

最好的學習,莫過於經過官方文檔進行學習,系統,透徹,權威,缺點是英文。本文,算作本身學習netty的第一篇,整體思路與User guide for 4.x基本一致,本篇文章不是嚴格意義的翻譯文章。開始了...web

1.前言編程

1.1 問題bootstrap

現 在,咱們使用通用的應用程序和程序庫,進行互相交流。例如,咱們常用HTTP client庫從web服務器上獲取信息,經過web services調用遠程接口。然而,通用的協議或實現,有時候不能很好的擴展伸縮。就像咱們不能使用通用協議進行部分信息的交換,如:huge files,e-mail,實時信息。咱們須要高度優化的協議實現,來完成一些特殊目的。好比,你想實現一款專門的HTTP服務器,以支持基於AJAX的聊天應用,流媒體,大文件傳輸。你須要設計和實現一個完整的新協議,不可避免須要處理遺留協議,在不影響性能和穩定的狀況下,實現新協議速度能有多塊?api

1.2 解決方案
緩存

Netty 致力於提供異步,基於事件驅動的網絡應用框架,是一款進行快速開發高性能,可伸縮的協議服務器和客戶端工具。換句話說,Netty是一個快速和容易開發的NIO客戶端,服務端網絡框架,它簡化和使用流式方式處理網絡編程,如TCP,UDP。
服務器

"快速和容易「,並不表明netty存在可維護性和性能問題。它很完美的提供FTP,SMTP,HTTP,二進制和基於文本的協議支持。有的用戶可能已經發現了擁有一樣有點的其餘網絡應用框架。你可能想問:netty和他們的區別?這個問題很差回答,Netty被設計成提供最合適的API和實現,會讓你的生活更簡單。網絡

2.開始
多線程

本節使用一些簡單的例子讓你快速的感知netty的核心結構。閱讀本節以後,你能夠熟練掌握netty,能夠寫一個client和server。

準備

JDK 1.6+

最新版本的netty:下載地址

maven依賴

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>${netty.version}</version>
</dependency>


2.1寫一個Discard服務

最簡單的協議,不是'hello world',而是DISCARD。這個協議拋棄接收的數據,沒有響應。

協議實現,惟一須要作的一件事就是無視全部接收到的數據,讓咱們開始吧。

直接上Handler的實現,它處理來自netty的I/O事件。

package io.netty.examples.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
        /*
        //也能夠這樣
         try {        // Do something with msg
              } finally {
                      ReferenceCountUtil.release(msg);
            }        
        */
        
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}


1)DiscardServerHandler繼承了ChannelInboundHandlerAdapter

實現了ChannelInboundHandler接口。ChannelInboundHandler提供了多個可重寫的事件處理方法。如今,繼承了ChannelInboundHandlerAdapter,而不用本身實現handler接口。

2)重寫了channelRead(),這個方法在接收到新信息時被調用。信息類型是ByteBuf

3)實現Discard協議,ByteBuf是reference-counted對象,必須顯示的調用release(),進行釋放。須要記住handler的責任包括釋聽任意的reference-counted對象。

4)exceptionCaught(),當發生異常時調用,I/O異常或Handler處理異常。通常狀況下,在這裏能夠記錄異常日誌,和返回錯誤碼。

到了這裏,完成了一半,接下來編寫main(),啓動Server

package io.netty.examples.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.examples.discard.time.TimeServerHandler;

/**
 * Discards any incoming data.
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new TimeServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是一個多線程的處理I/O操做Event Loop(這是異步機制特色) Netty 提供了多個 EventLoopGroup,支持多種傳輸方式。在這個例子中,咱們使用了2個NioEventLoopGroup


    第一個,叫"boss",接收進入的鏈接,第二個常常叫「worker",一旦boss接收了鏈接並註冊鏈接到這個worker,worker就會處理這個鏈接。多少個線程被使用?,EventLoopGroup擁有多少個Channel?均可以經過構造函數配置

  2. ServerBootstrap是創建server的幫助類。你可使用Channel直接建立server,注意,這個建立過程很是的冗長,大部分狀況,你不須要直接建立。

  3. 指定 NioServerSocketChannel類,當接收新接入鏈接時,被用在Channel的初始化。

  4. 這個handler,初始化Channel時調用。ChannelInitializer是一個專門用於配置新的Channel的Handler,通常爲Channel配置ChannelPipeline。當業務複雜時,會添加更多的handler到pipeline.

  5. 你能夠設置影響Channel實現的參數,好比keepAlive..

  6. option()  影響的是接收進入的鏈接 的NioServerSocketChannel ;childOption()影響的是來自父 ServerChannel分發的Channel, 在本例中是 NioServerSocketChannel

  7. 保證工做準備好了


經過telnet進行測試。

輸入telnet 127.0.0.1 8080

wKioL1dhJzahlRuaAAAJeL7rMjk054.png

因爲是Discard協議,一沒響應,二服務端也沒輸出。經過測試,只能確認是服務正常啓動了。

調整下Handler邏輯,修改channelRead()方法。

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
        try {
           while (in.isReadable()) { // (1)
             System.out.print((char) in.readByte());
            System.out.flush();
             }
    } finally {
            ReferenceCountUtil.release(msg); // (2)
    }
}

修改以後,在使用telnet測試。你在命令行中輸入什麼,在服務端就能看到什麼!(只能是英文)


2.2寫一個Echo服務


做爲一個服務,沒有返回響應信息,明顯是不合格的。接下來實現Echo協議,返回響應信息。與前面的Discard服務實現惟一的不一樣點就是Handler,須要回寫接收到的信息,而不是打印輸出。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }

1)ChannelHandlerContext提供多個方法,觸發I/O事件。在這裏,不須要像Discard同樣釋放 接收的信息。

2)ctx.write(Object)不能保證徹底寫入,底層存在緩存,須要經過ctx.flush()刷新,保證徹底寫入。


完成。

2.3寫一個Time服務

時間協議見 TIME protocol。和前面2個協議不一樣,服務將發送一個32位的integer值。不須要接收信息,一旦信息發出,將關閉鏈接。在這個例子中,將學到如何構造和發送信息,並在完成時關閉鏈接。

因爲咱們不須要接收信息,因此不須要channelRead()方法,而是使用channelActive()。

package io.netty.example.time;public class TimeServerHandler extends ChannelInboundHandlerAdapter {    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {            @Override
            public void operationComplete(ChannelFuture future) {                assert f == future;
                ctx.close();
            }
        }); // (4)
    }    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

1)channelActive()在鏈接創建和準備完成時調用。

2) 發送一個新消息,須要分配一個buff.未來寫入一個4位的integer值。獲取當前的 ByteBufAllocator,能夠經過ChannelHandlerContext.alloc()獲取。

3)注意沒有java.nio.ByteBuffer.flip(),(nio 讀寫切換時使用),這是由於netty重寫了Buffer,維護了2個指示器,分別用來(reader index)讀取和( writer index )寫入。

注意:ChannelHandlerContext.write() (and writeAndFlush())返回的ChannelFuture 。ChannelFuture 表明一個還沒有發生的I/O操做。這一個是異步操做,會觸發通知listeners 。這兒的意思是當ChannelFuture完成時,纔會調用close()。


最終版的Server Handler

package io.netty.examples.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

接下來實現Time客戶端

與Echo和Discard協議不一樣,人不能很好的將32位的integer,轉換爲能夠理解的日期數據。經過學習本節,能夠學習如何寫一個Client。這和前面的EchoServer,DiscardServer實現有很大的不一樣。

package io.netty.examples.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = "127.0.0.1";
        int port = Integer.parseInt("8080");
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap is similar to ServerBootstrap except that it's for non-server channels such as a client-side or connectionless channel.

  2. 若是隻使用一個 EventLoopGroup,它會被當成boss group 和 worker group.  boss worker不會再client使用。

  3. 代替NioServerSocketChannelNioSocketChannel 是一個 客戶端的Channel.

  4. 注意沒有使用 childOption() ,由於客戶端沒有上級。

  5. 調用connect(),而不是 bind()

下面是對應的Handler

package io.netty.examples.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}


  1. In TCP/IP, Netty reads the data sent from a peer into a ByteBuf.

測試過程:略。

相關文章
相關標籤/搜索