Netty高級

Netty快速入門java

什麼Netty

 Netty 是一個基於 JAVA NIO 類庫的異步通訊框架,它的架構特色是:異步非阻塞、基於事件驅動、高性能、高可靠性和高可定製性。算法

Netty應用場景

1.分佈式開源框架中dubbo、ZookeeperRocketMQ底層rpc通信使用就是netty編程

2.遊戲開發,底層使用netty通信json

爲何選擇netty

在本小節,咱們總結下爲何不建議開發者直接使用JDKNIO類庫進行開發的緣由:bootstrap

1)      NIO的類庫和API繁雜,使用麻煩,你須要熟練掌握SelectorServerSocketChannelSocketChannelByteBuffer等;數組

2)      須要具有其它的額外技能作鋪墊,例如熟悉Java多線程編程,由於NIO編程涉及到Reactor模式,你必須對多線程和網路編程很是熟悉,才能編寫出高質量的NIO程序;緩存

3)      可靠性能力補齊,工做量和難度都很是大。例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處 n理等等,NIO編程的特色是功能開發相對容易,可是可靠性能力補齊工做量和難度都很是大;安全

4)      JDK NIOBUG,例如臭名昭著的epoll bug,它會致使Selector空輪詢,最終致使CPU 100%。官方聲稱在JDK1.6版本的update18修復了該問題,可是直到JDK1.7版本該問題仍舊存在,只不過該bug發生機率下降了一些而已,它並無被根本解決。該BUG以及與該BUG相關的問題單以下:服務器

 

Netty服務器

 

class ServerHandler extends SimpleChannelHandler {
    
    /**
     * 通道關閉的時候觸發
     */
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelClosed");
    }

    /**
     * 必須是鏈接已經創建,關閉通道的時候纔會觸發.
     */
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("channelDisconnected");
    }

    /**
     * 捕獲異常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
        System.out.println("exceptionCaught");

    }

    /**
     * 接受消息
     */
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.messageReceived(ctx, e);
//        System.out.println("messageReceived");
        System.out.println("服務器端收到客戶端消息:"+e.getMessage());
        //回覆內容
        ctx.getChannel().write("好的");
    }

}
// netty 服務器端
public class NettyServer {

    public static void main(String[] args) {
        // 建立服務類對象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 建立兩個線程池 分別爲監聽監聽端口 ,nio監聽
        ExecutorService boos = Executors.newCachedThreadPool();
        ExecutorService worker = Executors.newCachedThreadPool();
        // 設置工程 並把兩個線程池加入中
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, worker));
        // 設置管道工廠
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                //將數據轉換爲string類型.
                pipeline.addLast("decoder", new StringDecoder());
                pipeline.addLast("encoder", new StringEncoder());
                pipeline.addLast("serverHandler", new ServerHandler());
                return pipeline;
            }
        });
        // 綁定端口號
        serverBootstrap.bind(new InetSocketAddress(9090));
        System.out.println("netty server啓動....");
    }
}

 

 

Netty客戶端

package com.itmayiedu;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
class ClientHandler extends SimpleChannelHandler {

    
    /**
     * 通道關閉的時候觸發
     */
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelClosed");
    }

    /**
     * 必須是鏈接已經創建,關閉通道的時候纔會觸發.
     */
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("channelDisconnected");
    }

    /**
     * 捕獲異常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
        System.out.println("exceptionCaught");

    }

    /**
     * 接受消息
     */
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.messageReceived(ctx, e);
//        System.out.println("messageReceived");
        System.out.println("服務器端向客戶端回覆內容:"+e.getMessage());
        //回覆內容
//        ctx.getChannel().write("好的");
    }

}
public class NettyClient {

    public static void main(String[] args) {
        System.out.println("netty client啓動...");
        // 建立客戶端類
        ClientBootstrap clientBootstrap = new ClientBootstrap();
        // 線程池
        ExecutorService boos = Executors.newCachedThreadPool();
        ExecutorService worker = Executors.newCachedThreadPool();
        clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, worker));
        clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                // 將數據轉換爲string類型.
                pipeline.addLast("decoder", new StringDecoder());
                pipeline.addLast("encoder", new StringEncoder());
                pipeline.addLast("clientHandler", new ClientHandler());
                return pipeline;

            }
        });
        //鏈接服務端
        ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));
        Channel channel = connect.getChannel();
        System.out.println("client start");
        Scanner scanner=    new Scanner(System.in);
        while (true) {
            System.out.println("請輸輸入內容...");
            channel.write(scanner.next());
        }
    }
    
    

}


Maven座標網絡

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty</artifactId>
    <version>3.3.0.Final</version>
</dependency>


N
etty5.0用法

建立服務器

class ServerHandler extends ChannelHandlerAdapter {
    /**
     * 當通道被調用,執行該方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接收數據
        String value = (String) msg;
        System.out.println("Server msg:" + value);
        // 回覆給客戶端 「您好!」
        String res = "好的...";
        ctx.writeAndFlush(Unpooled.copiedBuffer(res.getBytes()));
    }

}

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("服務器端已經啓動....");
        // 1.建立2個線程,一個負責接收客戶端鏈接, 一個負責進行 傳輸數據
        NioEventLoopGroup pGroup = new NioEventLoopGroup();
        NioEventLoopGroup cGroup = new NioEventLoopGroup();
        // 2. 建立服務器輔助類
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                // 3.設置緩衝區與發送區大小
                .option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new StringDecoder());
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });
        ChannelFuture cf = b.bind(8080).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    
    }

}


建立客戶端

class ClientHandler extends ChannelHandlerAdapter {

    /**
     * 當通道被調用,執行該方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接收數據
        String value = (String) msg;
        System.out.println("client msg:" + value);
    }

    
}

public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("客戶端已經啓動....");
        // 建立負責接收客戶端鏈接
        NioEventLoopGroup pGroup = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
         cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes()));
         cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes()));
        // 等待客戶端端口號關閉
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();

    }

}


Maven座標

 

<dependencies>
        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.3.19.GA</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.3.18.GA</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

 


TCP、拆包問題解決方案

什麼是粘包/拆包

   一個完整的業務可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這個就是TCP的拆包和封包問題。

下面能夠看一張圖,是客戶端向服務端發送包:

 

1. 第一種狀況,Data1Data2都分開發送到了Server端,沒有產生粘包和拆包的狀況。

2. 第二種狀況,Data1Data2數據粘在了一塊兒,打成了一個大的包發送到Server端,這個狀況就是粘包。

3. 第三種狀況,Data2被分離成Data2_1Data2_2,而且Data2_1Data1以前到達了服務端,這種狀況就產生了拆包。

因爲網絡的複雜性,可能數據會被分離成N多個複雜的拆包/粘包的狀況,因此在作TCP服務器的時候就須要首先解決拆包/

解決辦法

     消息定長,報文大小固定長度,不夠空格補全,發送和接收方遵循相同的約定,這樣即便粘包了經過接收方編程實現獲取定長報文也能區分。

sc.pipeline().addLast(new FixedLengthFrameDecoder(10));

包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符做爲報文分隔符,接收方經過特殊分隔符切分報文區分。

ByteBuf buf = Unpooled.copiedBuffer("_mayi".getBytes());

sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));

將消息分爲消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段

 

序列化協議自定義序列化協議

序列化定義

序列化(serialization)就是將對象序列化爲二進制形式(字節數組),通常也將序列化稱爲編碼(Encode),主要用於網絡傳輸、數據持久化等;

反序列化(deserialization)則是將從網絡、磁盤等讀取的字節數組還原成原始對象,以便後續業務的進行,通常也將反序列化稱爲解碼(Decode),主要用於網絡傳輸對象的解碼,以便完成遠程調用。

序列化協議鼻祖

我知道的第一種序列化協議就是Java默認提供的序列化機制,須要序列化的Java對象只須要實現 Serializable / Externalizable 接口並生成序列化ID,這個類就可以經過 ObjectInput ObjectOutput 序列化和反序列化,若對Java默認的序列化協議不瞭解,或是遺忘了,請參考:序列化詳解

可是Java默認提供的序列化有不少問題,主要有如下幾個缺點:

沒法跨語言:我認爲這對於Java序列化的發展是致命的失誤,由於Java序列化後的字節數組,其它語言沒法進行反序列化。;

序列化後的碼流太大::相對於目前主流的序列化協議,Java序列化後的碼流太大;

序列化的性能差:因爲Java序列化採用同步阻塞IO,相對於目前主流的序列化協議,它的效率很是差。

影響序列化性能的關鍵因素

序列化後的碼流大小(網絡帶寬的佔用);

序列化的性能(CPU資源佔用);

是否支持跨語言(異構系統的對接和開發語言切換)。

幾種流行的序列化協議比較

XML

1)定義:

XMLExtensible Markup Language)是一種經常使用的序列化和反序列化協議, 它歷史悠久,從1998年的1.0版本被普遍使用至今。

2)優勢

人機可讀性好

可指定元素或特性的名稱

3)缺點

序列化數據只包含數據自己以及類的結構,不包括類型標識和程序集信息。

類必須有一個將由 XmlSerializer 序列化的默認構造函數。

只能序列化公共屬性和字段

不能序列化方法

文件龐大,文件格式複雜,傳輸佔帶寬

4)使用場景

當作配置文件存儲數據

實時數據轉換

JSON

1)定義:

JSON(JavaScript Object Notation, JS 對象標記) 是一種輕量級的數據交換格式。它基於 ECMAScript (w3c制定的js規範)的一個子集, JSON採用與編程語言無關的文本格式,可是也使用了類C語言(包括CC++C#JavaJavaScriptPerlPython等)的習慣,簡潔和清晰的層次結構使得 JSON 成爲理想的數據交換語言。

2)優勢

先後兼容性高

數據格式比較簡單,易於讀寫

序列化後數據較小,可擴展性好,兼容性好

XML相比,其協議比較簡單,解析速度比較快

3)缺點

數據的描述性比XML

不適合性能要求爲ms級別的狀況

額外空間開銷比較大

4)適用場景(可替代XML)

跨防火牆訪問

可調式性要求高的狀況

基於Web browserAjax請求

傳輸數據量相對小,實時性要求相對低(例如秒級別)的服務

Fastjson

1)定義

Fastjson是一個Java語言編寫的高性能功能完善的JSON庫。它採用一種假定有序快速匹配的算法,把JSON Parse的性能提高到極致。

2)優勢

接口簡單易用

目前java語言中最快的json

3)缺點

過於注重快,而偏離了標準及功能性

代碼質量不高,文檔不全

4)適用場景

協議交互

Web輸出

Android客戶端

Thrift

1)定義:

Thrift並不只僅是序列化協議,而是一個RPC框架。它可讓你選擇客戶端與服務端之間傳輸通訊協議的類別,即文本(text)和二進制(binary)傳輸協議, 爲節約帶寬,提供傳輸效率,通常狀況下使用二進制類型的傳輸協議。

2)優勢

序列化後的體積小, 速度快

支持多種語言和豐富的數據類型

對於數據字段的增刪具備較強的兼容性

支持二進制壓縮編碼

3)缺點

使用者較少

跨防火牆訪問時,不安全

不具備可讀性,調試代碼時相對困難

不能與其餘傳輸層協議共同使用(例如HTTP

沒法支持向持久層直接讀寫數據,即不適合作數據持久化序列化協議

4)適用場景

分佈式系統的RPC解決方案

Avro

1)定義:

Avro屬於Apache Hadoop的一個子項目。 Avro提供兩種序列化格式:JSON格式或者Binary格式。Binary格式在空間開銷和解析性能方面能夠和Protobuf媲美,Avro的產生解決了JSON的冗長和沒有IDL的問題

2)優勢

支持豐富的數據類型

簡單的動態語言結合功能

具備自我描述屬性

提升了數據解析速度

快速可壓縮的二進制數據形式

能夠實現遠程過程調用RPC

支持跨編程語言實現

3)缺點

對於習慣於靜態類型語言的用戶不直觀

4)適用場景

Hadoop中作HivePigMapReduce的持久化數據格式

Protobuf

1)定義

protocol buffers 由谷歌開源而來,在谷歌內部久經考驗。它將數據結構以.proto文件進行描述,經過代碼生成工具能夠生成對應數據結構的POJO對象和Protobuf相關的方法和屬性。

2)優勢

序列化後碼流小,性能高

結構化數據存儲格式(XML JSON等)

經過標識字段的順序,能夠實現協議的前向兼容

結構化的文檔更容易管理和維護

3)缺點

須要依賴於工具生成代碼

支持的語言相對較少,官方只支持Java C++ Python

4)適用場景

對性能要求高的RPC調用

具備良好的跨防火牆的訪問屬性

適合應用層對象的持久化

其它

protostuff 基於protobuf協議,但不須要配置proto文件,直接導包即

Jboss marshaling 能夠直接序列化java類, 無須實java.io.Serializable接口

Message pack 一個高效的二進制序列化格式

Hessian 採用二進制協議的輕量級remoting onhttp工具

kryo 基於protobuf協議,只支持java語言,須要註冊(Registration),而後序列化(Output),反序列化(Input

性能對比圖解

時間

 

空間

 

分析上圖知:

XML序列化(Xstream)不管在性能和簡潔性上比較差。

ThriftProtobuf相比在時空開銷方面都有必定的劣勢。

ProtobufAvro在兩方面表現都很是優越。

選型建議

不一樣的場景適用的序列化協議:

對於公司間的系統調用,若是性能要求在100ms以上的服務,基於XMLSOAP協議是一個值得考慮的方案。

基於Web browserAjax,以及Mobile app與服務端之間的通信,JSON協議是首選。對於性能要求不過高,或者以動態類型語言爲主,或者傳輸數據載荷很小的的運用場景,JSON也是很是不錯的選擇。

對於調試環境比較惡劣的場景,採用JSONXML可以極大的提升調試效率,下降系統開發成本。

當對性能和簡潔性有極高要求的場景,ProtobufThriftAvro之間具備必定的競爭關係。

對於T級別的數據的持久化應用場景,ProtobufAvro是首要選擇。若是持久化後的數據存儲在Hadoop子項目裏,Avro會是更好的選擇。

因爲Avro的設計理念偏向於動態類型語言,對於動態語言爲主的應用場景,Avro是更好的選擇。

對於持久層非Hadoop項目,以靜態類型語言爲主的應用場景,Protobuf會更符合靜態類型語言工程師的開發習慣。

若是須要提供一個完整的RPC解決方案,Thrift是一個好的選擇。

若是序列化以後須要支持不一樣的傳輸層協議,或者須要跨防火牆訪問的高性能場景,Protobuf能夠優先考慮。

Marshalling編碼

public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }

}
相關文章
相關標籤/搜索