《一塊兒學netty》

o文章摘自 netty 官網(netty.io)
 
netty 是一個異步的,事件驅動的網絡應用通訊框架,可讓咱們快速編寫可靠,高性能,高可擴展的服務端和客戶端
 
  • 樣例一:discard server(丟棄任何消息的服務端)
package io.netty.example.discard;
 
import io.netty.buffer.ByteBuf;
 
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
1.DiscardServerHandler擴展了ChannelInboundHandlerAdapter,它是ChannelInboundHandler的一
個實現。 ChannelInboundHandler提供了能夠覆蓋的各類事件處理程序方法。 目前,只需擴展
ChannelInboundHandlerAdapter而不是本身實現處理程序接口。
 
2.咱們在這裏覆蓋channelRead()事件處理程序方法。 每當從客戶端接收到新數據時,都會使用收到的
消息調用此方法。 在此示例中,接收消息的類型是ByteBuf。
 
3.要實現DISCARD協議,處理程序必須忽略收到的消息。 ByteBuf是一個引用計數對象( ReferenceCounted ),
必須經過 release()方法顯式釋放。 請記住,處理程序有責任釋放傳遞給處理程序的任何引用計數對象。 一般,
channelRead()處理程序方法的實現方式以下:
 
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
 
4.io,或者hadler在處理數據時,可能會致使netty拋出異常,這時會調用exceptionCaught()方法,在大
多數狀況下,應該記錄捕獲的異常並在此處關閉其關聯的通道,固然,根據具體狀況,也可添加其餘邏輯,
好比發送帶有錯誤代碼的響應消息。
 
  • 至此咱們已經實現了一半discardserver,接下來編寫main方法來啓動server
package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;
 
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
* Discards any incoming data.
*/
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
 
        new DiscardServer(port).run();
    }
}
 
1.NioEventLoopGroup是一個處理I / O操做的多線程事件循環。 Netty爲不一樣類型的傳輸提供各類
EventLoopGroup實現。 咱們在此示例中實現了服務器端應用程序,所以將使用兩個NioEventLoopGroup。 
第一個,一般稱爲「boss」,接受傳入鏈接。 第二個,一般稱爲「worker」,一旦boss接受鏈接並將接受
的鏈接註冊到worker,worker就處理被接受鏈接的io。 NioEventLoopGroup使用多少個線程以及它們如何
映射到建立的Channels取決於EventLoopGroup實現,也能夠經過構造函數進行配置。
 
2.ServerBootstrap是一個設置服務器的輔助類。 
 
3.在這裏,咱們指定使用NioServerSocketChannel類,該類用於實例化新Channel以接受傳入鏈接。
 
4.此處指定的處理程序將始終由新接受的Channel評估(不太懂)。 ChannelInitializer是一個特殊的處理程
序,旨在幫助用戶配置新的Channel。 能夠將不一樣的handler 添加到pipeline 中來完成對消息的複雜處理。
 
5.能夠經過option()方法來給channel 來指定參數
 
6.你注意到option()和childOption()嗎? option()用於接受傳入鏈接的NioServerSocketChannel。
 childOption()用於父ServerChannel接受的Channels,在這種狀況下是NioServerSocketChannel。
 
7.咱們如今準備好了。 剩下的就是綁定到端口並啓動服務器。 在這裏,咱們綁定到機器中全部NIC(網絡
接口卡)的端口8080。 您如今能夠根據須要屢次調用bind()方法(使用不一樣的綁定地址。)
 
  • 以前是把收到的消息直接丟棄,如今咱們把收到的消息寫出去
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
 
1.ChannelHandlerContext對象提供各類操做,使您可以觸發各類I / O事件和操做。 在這裏,咱們調用
write(Object)來逐個寫出接收到的消息。 請注意,咱們沒有release收到的消息,這與咱們在DISCARD
示例中的操做不一樣。 這是由於Netty在寫入線路時會爲您release
 
2. ctx.write(Object)不會將消息寫入線路。 而是存在內部緩存,而後經過 ctx.flush()刷新到線路。 或
者能夠將上述兩步合二爲一, 調用ctx.writeAndFlush(msg)便可
 
  • 接下來寫一個time server
 
它與前面的示例的不一樣之處在於,它發送包含32位整數的消息,而不接收任何請求,並在 發送消息後關閉鏈接
 在此示例中,您將學習如何構造和發送消息,以及在完成時關閉鏈接。
由於咱們將忽略任何接收的數據,可是一旦創建鏈接就發送消息,此次咱們不能使用channelRead()方法。 
相反,咱們應該覆蓋channelActive()方法。 如下是實現:
 
package io.netty.example.time;
 
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
 
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
 
1.如上所述,當創建鏈接並準備進行io時,將調用channelActive()方法。 讓咱們寫一個32位整數來表
示這個方法中的當前時間。
2.要發送新消息,咱們須要分配一個包含消息的新緩衝區。 咱們要寫一個32位整數,所以咱們須要一個容
量至少爲4個字節的ByteBuf。 經過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator並分
配一個新的緩衝區。
3.flip在哪裏?在NIO中發送消息以前,咱們不習慣調用java.nio.ByteBuffer.flip()嗎? 
下面時java.nio.ByteBuffer.flip方法的英文註釋:
Flips this buffer. The limit is set to the current position and then the position is set 
to zero. If the mark is defined then it is discarded.
個人理解是把limit 設置爲當前下標位置,而後下標歸零,若是定義了mark,則丟棄
 
netty ByteBuf沒有這樣的方法,由於它有兩個指針;一個用於讀操做,另外一個用於寫操做。當您在讀取器索
引未更改時向ByteBuf寫入內容時,寫入器索引會增長。 reader索引和writer索引分別表示消息的開始和結
束位置。
 
另外一點須要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法返回一個
ChannelFuture。 ChannelFuture表示尚 未發生的I / O操做。 這意味着,任何請求的操做可能還沒有執行,
由於全部操做在Netty中都是異步的。 例如,如下代碼可能會在發送消息以前關閉鏈接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
所以,您須要在ChannelFuture完成以後調用close()方法,並在寫入操做完成時通知其偵聽器。 請注意,
close()也可能不會當即關閉鏈接,由於close 也返回ChannelFuture。
 
4.咱們怎麼知道寫請求何時完成,讓後來關閉鏈接呢,能夠添加一個ChannelFutureListener,讓其監聽
channelfuture,當channelfuture完成任務時關閉鏈接,上面的方法中咱們用了一個匿名ChannelFutureListener
更簡單的替代方法是
f.addListener(ChannelFutureListener.CLOSE);
 
  • 接下來寫一個time client 來接受server發回 的消息
server 和 client沒多大不一樣,只是使用了不一樣的 channel 和 bootstrap
package io.netty.example.time;
 
public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
 
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
1.Bootstrap與ServerBootstrap相似,不一樣之處在於它 適用於非服務器通道,例如客戶端或無鏈接通道。
2.只指定一個EventLoopGroup,它將同時用做boss組和worker組。
3. NioSocketChannel用於建立客戶端通道,而不是NioServerSocketChannel。
4.請注意,咱們不像在ServerBootstrap中那樣使用childOption(),由於客戶端S ocketChannel沒有
父服務器
5.咱們應該調用connect()方法而不是bind()方法。
 
  • ChannelHandler實現怎麼樣? 它應該從服務器接收一個32位整數,將其轉換爲人類可讀的格式,
打印翻譯的時間,並關閉鏈接:
package io.netty.example.time;
 
import java.util.Date;
 
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
 
1.在TCP / IP中,Netty將從對等方發送的數據讀入ByteBuf。
 
  • 上面的handler 有時候回發神經,拋出IndexOutOfBoundsException,緣由以下:
 
基於流的傳輸(例如TCP / IP)中,接收的 數據存儲在套接字接收緩衝區中。可是套接字緩衝區中存儲的
不是包隊列,而是字節隊列。 這意味着,即便您將兩條消息做爲兩個獨立的數據包發送,操做系統也不會將
它們視爲兩條消息,而只是一堆字節。 所以,沒法保證您所閱讀的內容正是您的遠程peer所寫的內容。 例如,
假設操做系統的TCP / IP堆棧已收到三個數據包:
 
abc  def   ghi
 
因爲基於流的協議的這種通常屬性,應用程序頗有可能如下面的碎片形式讀取它們
 
ab cdef g hi
 
那麼應該怎麼解決這個問題呢?還記得以前在ChannelInitializer 中添加多個hadler嗎?能夠在pipeline中添加
一個hadler 來專門解決碎片化問題
package io.netty.example.time;
 
public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes( 4)); // (4)
    }
}
1.ByteToMessageDecoder是ChannelInboundHandler的一個實現,能夠很容易地處理碎片問題。
2.每當收到新數據時,ByteToMessageDecoder都會使用內部維護的 累積緩衝區調用decode()方法。
3.若是累積緩衝區中沒有足夠數據,decode()不向out中添加內容。 當收到更多數據時,
ByteToMessageDecoder將再次調用decode()。
4.若是decode()將對象添加到out,則意味着解碼器成功解碼了一條消息。 ByteToMessageDecoder
丟棄累積緩衝區的 讀取 部分。ByteToMessageDecoder將繼續調用decode()方法,直到它不添加任
何內容。
 
下面來看看改版的ChannelInitializer
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});
相關文章
相關標籤/搜索