第十六章:從EventLoop取消註冊和從新註冊

本章介紹java

EventLoopbootstrap

從EventLoop註冊和取消註冊網絡

在Netty中使用舊的Socket和Channel框架

Netty提供了一個簡單的方法來鏈接Socket/Channel,這是在Netty以外建立並轉移他們的責任到Netty。這容許你將遺留的集成框架以無縫方式一步一步遷移到Netty;Netty還容許取消註冊的通道來中止處理IO,這能夠暫停程序處理並釋放資源。異步

這些功能在某些狀況或某種程度上可能不是很是有用,但使用這些特性能夠解決一些困難的問題。舉個例子,有一個很是受歡迎的社交網絡,其用戶增加很是快,系統程序須要處理每秒幾千個交互或消息,若是用戶持續增加,系統將會處理每秒數以萬計的交互;這很使人興奮,但隨着用戶數的增加,系統將消耗大量的內存和CPU而致使性能低下;此時最須要作的就是改進他們,而且不要花太多的錢在硬件設備上。這種狀況下,系統必須保持功能正常能處理日益增加的數據量,此時,註冊/註銷事件循環就派上用場了。ide

經過容許外部Socket/Channel來註冊和註銷,Netty可以以這樣的方式改進舊系統的缺陷,全部的Netty程序均可以經過一種有效精巧的方式整合到現有系統,本章將重點講解Netty是如何整合。oop

16.1 註冊和取消註冊的Channel和Socket性能

前面章節講過,每一個通道須要註冊到一個EventLoop來處理IO或事件,這是在引導過程當中自動完成。下圖顯示了他們的關係:this

上圖只是顯示了他們關係的一部分,通道關閉時,還須要將註冊到EventLoop中的Socket/Channel註銷以釋放資源。netty

有時不得不處理java.nio.channels.SocketChannel或其餘java.nio.channes.Channel實現,這多是遺留程序或框架的一些緣由所致。咱們可使用Netty來包裝預先建立的java.nio.channels.Channel,而後再註冊到EventLoop。咱們可使用Netty的全部特性,同時還能重用現有的東西。下面代碼顯示了此功能:

//nio  
java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open();  
//netty  
SocketChannel ch = new NioSocketChannel(mySocket);  
EventLoopGroup group = new NioEventLoopGroup();  
//register channel  
ChannelFuture registerFuture = group.register(ch);  
//de-register channel  
ChannelFuture deregisterFuture = ch.deregister();

Netty也適用於包裝OIO,看下面代碼:

//oio  
Socket mySocket = new Socket("www.baidu.com", 80);  
//netty  
SocketChannel ch = new OioSocketChannel(mySocket);  
EventLoopGroup group = new OioEventLoopGroup();  
//register channel  
ChannelFuture registerFuture = group.register(ch);  
//de-register channel  
ChannelFuture deregisterFuture = ch.deregister();

只有2個重點以下:

    使用Netty包裝已建立的Socket或Channel必須使用與之對應的實現,如Socket是OIO,則使用Netty的OioSocketChannel;SocketChannel是NIO,則使用NioSocketChannel。

    EventLoop.register(...)和Channel.deregister(...)都是非阻塞異步的,也就是說它們可能不會理解執行完成,可能稍後完成。它們返回ChannelFuture,咱們在須要進一步操做或確認完成操做時能夠添加一個ChannelFutureLister或在ChannelFuture上同步等待至完成;選擇哪種方式看實際需求,通常建議使用ChannelFutureLister,應避免阻塞。

16.2 掛起IO處理

在一些狀況下可能須要中止一個指定通道的處理操做,好比程序耗盡內存、崩潰、失去一些消息,此時,咱們能夠中止處理事件的通道來清理系統資源,以保持程序穩定繼續處理後續消息。若這樣作,最好的方式就是從EventLoop取消註冊的通道,這能夠有效阻止通道再處理任何事件。若須要被取消的通道再次處理事件,則只須要將該通道從新註冊到EventLooop便可。看下圖:

看下面代碼:

EventLoopGroup group = new NioEventLoopGroup();  
Bootstrap bootstrap = new Bootstrap();  
bootstrap.group(group).channel(NioSocketChannel.class)  
        .handler(new SimpleChannelInboundHandler<ByteBuf>() {  
            @Override  
            protected void channelRead0(ChannelHandlerContext ctx,  
                    ByteBuf msg) throws Exception {  
                //remove this ChannelHandler and de-register  
                ctx.pipeline().remove(this);  
                ctx.deregister();  
            }  
        });  
ChannelFuture future = bootstrap.connect(  
        new InetSocketAddress("www.baidu.com", 80)).sync();  
//....  
Channel channel = future.channel();  
//re-register channel and add ChannelFutureLister  
group.register(channel).addListener(new ChannelFutureListener() {  
    @Override  
    public void operationComplete(ChannelFuture future) throws Exception {  
        if(future.isSuccess()){  
            System.out.println("Channel registered");  
        }else{  
            System.out.println("register channel on EventLoop fail");  
            future.cause().printStackTrace();  
        }  
    }  
});

16.3 遷移通道到另外一個事件循環

另外一個取消註冊和註冊一個Channel的用例是將一個活躍的Channel移到另外一個EventLoop,有下面一些緣由可能致使須要這麼作:

當前EventLoop太忙碌,須要將Channel移到一個不是很忙碌的EventLoop;

終止EventLoop釋放資源同時保持活躍Channel能夠繼續使用;

遷移Channel到一個執行級別較低的非關鍵業務的EventLoop中。

下圖顯示遷移Channel到另外一個EventLoop:

看下面代碼:

EventLoopGroup group = new NioEventLoopGroup();  
final EventLoopGroup group2 = new NioEventLoopGroup();  
Bootstrap b = new Bootstrap();  
b.group(group).channel(NioSocketChannel.class)  
        .handler(new SimpleChannelInboundHandler<ByteBuf>() {  
            @Override  
            protected void channelRead0(ChannelHandlerContext ctx,  
                    ByteBuf msg) throws Exception {  
                // remove this channel handler and de-register  
                ctx.pipeline().remove(this);  
                ChannelFuture f = ctx.deregister();  
                // add ChannelFutureListener  
                f.addListener(new ChannelFutureListener() {  
                    @Override  
                    public void operationComplete(ChannelFuture future)  
                            throws Exception {  
                        // migrate this handler register to group2  
                        group2.register(future.channel());  
                    }  
                });  
            }  
        });  
ChannelFuture future = b.connect("www.baidu.com", 80);  
future.addListener(new ChannelFutureListener() {  
    @Override  
    public void operationComplete(ChannelFuture future)  
            throws Exception {  
        if (future.isSuccess()) {  
            System.out.println("connection established");  
        } else {  
            System.out.println("connection attempt failed");  
            future.cause().printStackTrace();  
        }  
    }  
});
相關文章
相關標籤/搜索