Netty權威指南 第2版

第一章  Java的I/O演進之路javascript

1.I/O基礎入門html

  1.Linux網絡I/O模型簡介前端

    1.阻塞I/O模型:最經常使用的I/O模型,缺省狀況下, 全部文件操做都是阻塞的html5

 

    2.非阻塞I/O模型:recvform從應用層到內核的時候,輪詢檢查是否有數據到來java

 

 

    3.I/O複用模型node

    4.信號渠道I/O模型web

    5.異步I/O:告知內核啓動某個操做,並讓內核在整個操做完成後通知咱們數據庫

  2.I/O多路複用技術apache

    1.應用場景:服務器須要同時處理多個處於監聽狀態或者多個鏈接狀態的套接字;服務器須要同時處理多種網絡協議的套接字編程

    2.epoll的改進

      1.支持一個進行打開的socket描述符不受限制

      2.I/O效率不會隨着FD數目的增長而線性降低

      3.使用mmap加速內核與用戶空間的消息傳遞

      4.epoll API更加簡單

2.Java的I/O演進

  1.Java的I/O發展簡史

第二章  NIO入門

1.傳統的BIO編程

  1.BIO通訊模型

 

  2.同步阻塞式I/O建立的TimeServer源碼分析

    1.TimeServer

public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("服務器啓動,端口=" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();//阻塞等待客戶端鏈接
                new Thread(new TimeServerHandler(socket)).start();
            }
        } finally {
            if (serverSocket != null) {
                System.out.println("服務器關閉");
                serverSocket.close();
                serverSocket = null;
            }
        }
    }
}

    2.TimeServerHandler

public class TimeServerHandler implements Runnable {
    private Socket socket;

    public TimeServerHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String currentTime = null;
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null) {
                    break;
                }
                System.out.println("收到客戶端請求:" + body);
                if ("QUERY TIME ORDER".equalsIgnoreCase(body)) {
                    currentTime = String.valueOf(System.currentTimeMillis());
                } else {
                    currentTime = "BAD ORDER";
                }
                out.println(currentTime);//發送消息給客戶端
            }
        } catch (Exception e) {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (out != null) {
                out.close();
                out = null;
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

    3.NpeCheck

public class NpeCheck {
    public static boolean checkArray(String[] args) {
        if (args != null && args.length > 0) {
            return true;
        } else {
            return false;
        }
    }
}

  3.同步阻塞式I/O建立的TimerClient源碼分析

 

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }

        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket("127.0.0.1", port);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println("QUERY TIME ORDER");//發送請求到服務器
            System.out.println("發送請求到服務器");
            String resp = in.readLine();//讀取服務器的響應
            System.out.println("當前時間=" + resp);
        } catch (Exception e) {
            //不須要處理
        } finally {
            if (out != null) {
                out.close();
                out = null;
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                in = null;
            }

            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

2.僞異步I/O編程

  1.僞異步I/O模型圖

    當新的客戶端接入時,將客戶端的socket封裝成一個Task投遞到後端的線程池中進行處理,jdk的線程池維護一個消息隊列和N個活躍線程,對消息隊列中的任務進行處理。

  2.僞異步I/O建立的TimerServer源碼分析

    1.TimerServer

 

public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }

        ServerSocket server = null;
        try {
            server = new ServerSocket(port);
            System.out.println("服務器啓動,端口=" + port);
            Socket socket = null;
            TimeServerHandlerExecutePool singleExecutor=new TimeServerHandlerExecutePool(50,100);//建立I/O任務線程池
            while (true) {
                socket = server.accept();//阻塞等待客戶端鏈接
                singleExecutor.execute(new TimeServerHandler(socket));//線程池自動調用線程執行
            }
        } finally {
            if (server != null) {
                System.out.println("服務器關閉");
                server.close();
                server = null;
            }
        }
    }
}

 

    2.TimeServerHandlerExecutePool線程池

public class TimeServerHandlerExecutePool {
    private ExecutorService executor;

    //建立線程池
    public TimeServerHandlerExecutePool(int maxPoolSize, int queueSiez) {
        executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSiez));
    }

    //執行任務
    public void execute(Runnable task) {
        executor.execute(task);
    }
}

 

  3.僞異步I/O弊端分析:通訊對方返回應答時間過長會引發的級聯故障

    1.服務端處理換麼,返回應答消息耗時60s,平時只需10ms

    2.採用僞異步I/O的線程正在讀取故障服務節點的響應,因爲讀取輸入流是阻塞的,它將會被同步阻塞60s

    3.加入全部的可用線程都被故障服務器阻塞,那後續全部的I/O消息都將在隊列中排隊

    4.因爲線程池採用阻塞隊列實現,當隊列積滿後,後續入隊列的操做將被阻塞

    5.因爲前端只有一個Accptor線程接收客戶端接入,它被阻塞在線程池的同步阻塞隊列以後,新的客戶端請求消息將被拒絕,客戶端會發生大量的鏈接超時

    6.因爲幾乎全部的鏈接都會超時,調用者任務系統已經崩潰,沒法接收新的請求消息

 

3.NIO編程

  1.NIO類庫簡介

    1.緩衝區Buffer,保護一些要寫入或者讀出的數據,實質是一個字節數組,提供了對數據的結構化訪問以及維護讀寫位置等信息

    2.通道Channel:雙向的,能夠用於讀、寫或者兩者同時進行,網絡讀寫SelectableChannel和文件操做FileChannel

    3.多路複用器Selector:提供已經就緒的任務

  2.NIO服務端序列圖

  3.NIO建立的TimerServer源碼分析

    1.TimeServer

public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }

        //建立多路複用類,負責輪詢多路複用器Selector,能夠處理多個客戶端的併發接入
        MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
    }
}

    2.MultiplexerTimeServer多路複用器

 

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel serverChannel;

    private volatile boolean stop;

    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();//3.1建立Reactor線程
            serverChannel = ServerSocketChannel.open();//1.打開ServerSocketChannel,用於監聽客戶端的鏈接,是全部客戶端鏈接的父管道
            serverChannel.configureBlocking(false);//2.2設置鏈接爲非阻塞模式
            serverChannel.socket().bind(new InetSocketAddress(port), 1024);//2.1綁定監聽端口,設置鏈接爲非阻塞模式
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);//4.將打開ServerSocketChannel註冊到Reactor線程的多路複用器selector上,監聽ACCEPT事件
            System.out.println("服務器啓動,端口=" + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);//若端口占用,退出程序
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                //5.多路複用器在線程run方法的無限循環體內輪詢準備就緒的key
                selector.select(1000);//1秒輪詢1次
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        //多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會自動去註冊並關閉,因此不須要重複釋放資源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    //處理新接入的請求消息
    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            //處理新接入的請求消息
            if (key.isAcceptable()) {
                //接收新的鏈接
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                //6.多路複用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,創建物理鏈路
                SocketChannel sc = ssc.accept();
                //7.設置客戶端鏈路爲非阻塞模式
                sc.configureBlocking(false);
                //8.將新接入的客戶端鏈接註冊到Reactor線程的多路複用器上,監聽讀操做,讀取客戶端發送的網絡消息
                sc.register(selector, SelectionKey.OP_READ);
            }

            if (key.isReadable()) {
                //讀取數據
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                //9.異步讀取客戶端請求消息到緩衝區
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {//讀取到字節,對字節進行編解碼
                    readBuffer.flip();//刷新指針
                    //10.對ByteBuffer進行編解碼,若是有半包消息指針reset,繼續讀取後續的報文,將界面成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排
                    byte[] bytes = new byte[readBuffer.remaining()];//建立可讀的字節數組
                    readBuffer.get(bytes);//複製到新建立的數組中
                    String body = new String(bytes, "UTF-8");//建立請求消息體
                    System.out.println("接收請求=" + body);
                    String currentTime;
                    if ("QUERY TIME ORDER".equalsIgnoreCase(body)) {
                        currentTime = String.valueOf(System.currentTimeMillis());
                    } else {
                        currentTime = "BAD ORDER";
                    }
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {//鏈路已經關閉,須要關閉SocketChannel,釋放資源
                    key.cancel();
                    sc.close();
                } else {
                    //沒有讀取到字節,屬於正常場景,忽略
                }
            }
        }
    }

    //將應答消息異步發送給客戶端
    private void doWrite(SocketChannel channel, String response) throws IOException {
        if (StringUtils.isNotBlank(response)) {
            byte[] bytes = response.getBytes();//將字符串編碼成字節數組
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//根據字節數組的容量建立ByteBuffer
            writeBuffer.put(bytes);//將字節數組複製到緩衝區
            writeBuffer.flip();//刷新緩衝區
            //11.將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發給客戶端
            //若是發送區TCP緩衝區滿,會致使寫半包,此時,須要註冊監聽寫操做位,循環寫,知道整包消息寫入TCP緩衝區
            channel.write(writeBuffer);
        }
    }
}

  4.NIO客戶端序列圖

 

  5.NIO建立的TimerClient源碼分析

    1.TimeClient

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
    }
}

    2.TimeClientHandle

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();//6.1建立Reactor線程
            socketChannel = SocketChannel.open();//1.打開SocketChannel,綁定客戶端本地地址
            socketChannel.configureBlocking(false);//2.設置SocketChannel爲非阻塞模式,同時設置客戶端鏈接的TCP參數
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                //7.多路複用器在線程的run方法的無限循環體內輪詢準備就緒的key
                selector.select(1000);
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeySet.iterator();
                SelectionKey key;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        //多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會自動去註冊並關閉,因此不須要重複釋放資源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    //處理新接入的請求消息
    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            //判斷是否鏈接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {//8.接收鏈接事件進行處理
                if (sc.finishConnect()) {//9.判斷鏈接結果,若是鏈接成功,註冊讀事件到多路複用器
                    sc.register(selector, SelectionKey.OP_READ);//10.註冊讀事件到多路複用器
                    doWrite(sc);
                } else {
                    System.exit(1);//鏈接失敗,進程退出
                }
            }
            if (key.isReadable()) {
                //11.異步讀服務器的應答消息到緩衝區
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readByte = sc.read(readBuffer);
                if (readByte > 0) {
                    //12.對ByteBuffer進行編解碼,若是有半包消息指針reset,繼續讀取後續的報文,將界面成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("讀取時間=" + body);
                    this.stop = true;
                } else if (readByte < 0) {
                    //對端鏈路關閉
                    key.cancel();
                    sc.close();
                } else {
                    //沒有讀取到字節,屬於正常場景,忽略
                }
            }
        }
    }

    private void doConnect() throws IOException {
        //若是鏈接成功,則註冊到多路複用器上,發送請求消息,讀應答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {//3.異步鏈接服務器
            socketChannel.register(selector, SelectionKey.OP_READ);//4.註冊讀狀態到多路複用器
            doWrite(socketChannel);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);//5.向Reactor線程的多路複用器註冊鏈接狀態位,監聽TCPACK應答
        }
    }

    //將應答消息異步發送給客戶端
    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);//13.將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發給客戶端
        if (!writeBuffer.hasRemaining()) {//判斷消息是否所有發送完成
            System.out.println("請求服務器成功");
        }
    }
}

4.AIO編程

  1.AIO建立的TimerServer源碼分析

    1.TimeServer

 

import com.example.demo.util.NpeCheck;

import java.io.IOException;

public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }

        //建立多路複用類,負責輪詢多路複用器Selector,能夠處理多個客戶端的併發接入
        AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
        new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();//3.2建立多路複用器並啓動線程
    }
}

 

    2.AsyncTimeServerHandler

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

public class AsyncTimeServerHandler implements Runnable {
    private int port;

    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    public AsyncTimeServerHandler(int port) {
        this.port = port;
        try {
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
            asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("服務器啓動,端口號=" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        latch = new CountDownLatch(1);//完成一組正在執行的操做以前,容許當前的線程一直阻塞
        doAccept();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void doAccept() {
        asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());//接收客戶端鏈接
    }
}

    3.AcceptCompletionHandler

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

//做爲handler來接收通知消息
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {

    @Override
    public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
        attachment.asynchronousServerSocketChannel.accept(attachment, this);//接口客戶端鏈接
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        result.read(buffer, buffer, new ReadCompletionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
        exc.printStackTrace();
        attachment.latch.countDown();
    }
}

 

  2.AIO建立的TimerClient源碼分析

    1.TimeClient

 

import com.example.demo.util.NpeCheck;

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClient-001").start();//6.2建立多路複用器並啓動線程
    }
}

 

    2.AsyncTimeClientHandler

 

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AsyncTimeClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            client = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        latch = new CountDownLatch(1);//防止異步操做沒有執行完成線程就退出
        //發起異步操做,attachment用於回調通知時做爲入參被傳遞,handler異步回調通知接口
        client.connect(new InetSocketAddress(host, port), this, this);
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {//未發送完成繼續異步發送
                    client.write(buffer, buffer, this);
                } else {//發送完成,異步讀取
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String body;
                            try {
                                body = new String(bytes, "UTF-8");
                                System.out.println("讀取時間=" + body);
                                latch.countDown();
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                client.close();//關閉鏈路
                                latch.countDown();//讓線程執行完畢
                            } catch (IOException e) {
                                //忽略
                            }
                        }
                    });
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    client.close();
                    latch.countDown();
                } catch (IOException e) {
                    //忽略
                }
            }
        });
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
        try {
            client.close();
            latch.countDown();
        } catch (IOException e) {
            //忽略
        }
    }
}

 

    3.ReadCompletionHandler

 

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel channel;//用於讀取半包消息和發送應答

    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        if (this.channel == null) {
            this.channel = channel;
        }
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        byte[] body = new byte[attachment.remaining()];
        attachment.get(body);
        try {
            String req = new String(body, "UTF-8");
            System.out.println("收到請求:" + req);
            String currentTime;
            if ("QUERY TIME ORDER".equalsIgnoreCase(req)) {
                currentTime = String.valueOf(System.currentTimeMillis());
            } else {
                currentTime = "BAD ORDER";
            }
            doWrite(currentTime);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    private void doWrite(String currentTime) {
        if (StringUtils.isNotBlank(currentTime)) {
            byte[] bytes = (currentTime).getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                    //若是沒有發送完成,繼續發送
                    if (buffer.hasRemaining()) {
                        channel.write(buffer, buffer, this);
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        //忽略
                    }
                }
            });
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            //忽略
        }
    }
}

  3.AIO版本時間服務器運行結果

    異步SocketChannel是被動執行對象,咱們不須要像NIO編程那樣建立一個獨立的I/O線程來處理讀寫操做。對應AsynchronousServerSocketChannel和AsynchronousSocketChannel都是JDK底層的線程池負責回調並驅動讀寫操做,AIO編程比NIO編程更爲簡單。

5.4種I/O對比

  1.概念澄清

    1.異步非阻塞I/O,NIO不是真正意義的異步非阻塞I/O,是非阻塞I/O

    2.多路複用器Selector

    3.僞異步I/O,經過線程池作緩衝區實現

  2.不一樣I/O模型對比

6.選擇Netty的理由

  1.不選擇Java元素NIO編程的緣由

  2.爲何選擇Netty

第3章  Netty入門應用

3.1Netty開發環境的搭建

  3.1.1下載Netty的軟件包

  3.1.2搭建Netty應用工程

3.2Netty服務端開發

  3.2.1步驟

 

3.3Netty客戶端開發

  3.3.1TimeClient

 

import com.example.demo.util.NpeCheck;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public void connect(int port, String host) throws Exception {
        //配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {//處理網絡I/O事件
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //發起異步鏈接操做
            ChannelFuture f=b.connect(host,port).sync();

            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new TimeClient().connect(port,"127.0.0.1");
    }
}

 

  3.3.2TimeClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.logging.Logger;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());

    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);//鏈接成功後,發送請求
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;//讀取應答消息
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("當前時間=" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //釋放資源
        logger.warning("異常=" + cause.getMessage());
        ctx.close();
    }
}

 

3.4運行和調試

  3.4.1服務器和客戶端的運行

  3.4.2打包和部署

第4章  TCP粘包/拆包問題的解決之道

 

 4.1TCP粘包/拆包

  一個完整的包可能會被TCP拆分多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送

  4.1.1TCO粘包/拆包問題說明

    1.示意圖

    2.分析

 

  4.1.2TCP粘包/拆包發生的緣由

  4.1.3粘包問題的解決策略

4.2未考慮TCP粘包致使功能異常案例

  4.2.1TimeServer的改造

  4.2.2TimeClient的改造

  4.2.3運行結果

4.3理由LineBasedFrameDecoder解決TCP粘包問題

  4.3.1支持TCP粘包的TimeServer

    1.TimeServer

 

import com.example.demo.util.NpeCheck;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeServer {
    public void bind(int port) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用於接收客戶端的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用於用於網絡讀寫的線程組
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//設置通道
                    .option(ChannelOption.SO_BACKLOG, 1024)//設置TCP參數
                    .childHandler(new ChildChannelHandler());//用於處理網絡I/O事件(記錄日誌、對消息編解碼)
            ChannelFuture f = b.bind(port).sync();//綁定端口,同步等待成功

            f.channel().closeFuture().sync();//等待服務端監聽端口關閉,才退出main函數
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new TimeServer().bind(port);
    }
}

 

    2.TimeServerHandler

 

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;//接收到的是刪除回車換行符後的請求消息
        System.out.println("接收到請求命令:" + body + ";次數="+ ++counter);
        String currentTime;
        if ("QUERY TIME ORDER".equals(body)) {
            currentTime = new Date(System.currentTimeMillis()).toString();
        } else {
            currentTime = "參數錯誤";
        }
        currentTime += System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

 

  4.3.2支持TCP粘包的TimeClient

    1.TimeClient

 

import com.example.demo.util.NpeCheck;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeClient {
    public void connect(int port, String host) throws Exception {
        //配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {//處理網絡I/O事件
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //發起異步鏈接操做
            ChannelFuture f=b.connect(host,port).sync();

            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new TimeClient().connect(port,"127.0.0.1");
    }
}

 

    2.TimeClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.logging.Logger;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("當前時間=" + body + ";次數=" + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //釋放資源
        logger.warning("異常=" + cause.getMessage());
        ctx.close();
    }
}

  4.3.3運行支持TCP粘包的時間服務器程序

  4.3.4LineBasedFrameDecoder和StringDecoder的原理分析

第5章  分隔符和定長解碼器的應用

5.1DelimiterBasedFrameDecoder應用開發

  能夠自動完成以分割服做爲碼流結束標識的消息的解碼

  5.1.1DelimiterBasedFrameDecoder服務端開發

    1.EchoServer

 

import com.example.demo.util.NpeCheck;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EchoServer {
    public void bind(int port) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用於接收客戶端的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用於用於網絡讀寫的線程組
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//設置通道
                    .option(ChannelOption.SO_BACKLOG, 1024)//設置TCP參數
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());//建立分割符緩存對象
                            //1024標識單條消息的最大長度,當達到該長度後仍然沒有查找到分割符,就拋出異常,防止異常碼流缺失分隔符致使的內存溢出,是netty解碼器的可靠性保護
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();//綁定端口,同步等待成功

            f.channel().closeFuture().sync();//等待服務端監聽端口關閉,才退出main函數
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new EchoServer().bind(port);
    }
}

 

    2.EchoServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoServerHandler extends ChannelHandlerAdapter {

    int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;//接收到的是刪除回車換行符後的請求消息
        System.out.println("接收到請求命令:" + body + ";次數=" + ++counter);
        body += "$_";//加上分隔符
        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
        ctx.writeAndFlush(echo);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 

  5.1.2DelimiterBasedFrameDecoder客戶端開發

    1.EchoClient

 

import com.example.demo.util.NpeCheck;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class EchoClient {
    public void connect(int port, String host) throws Exception {
        //配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {//處理網絡I/O事件
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());//建立分割符緩存對象
                            //1024標識單條消息的最大長度,當達到該長度後仍然沒有查找到分割符,就拋出異常,防止異常碼流缺失分隔符致使的內存溢出,是netty解碼器的可靠性保護
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //發起異步鏈接操做
            ChannelFuture f=b.connect(host,port).sync();

            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new EchoClient().connect(port,"127.0.0.1");
    }
}

 

    2.EchoClientHandler

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoClientHandler extends ChannelHandlerAdapter {

    private int counter;

    static final String ECHO_REQ = "hello netty.$_";


    public EchoClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("接收的消息=" + body + ";次數=" + ++counter);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  5.1.3運行DelimiterBasedFrameDecoder服務端和客戶端

5.2FixedLengthFrameDecoder應用開發

  固定長度解碼器:按照指定的長度對消息進行自動解碼,不須要考慮粘包/拆包問題

  5.2.1FixedLengthFrameDecoder服務端開發

    1.EchoServer

 

import com.example.demo.util.NpeCheck;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EchoServer {
    public void bind(int port) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用於接收客戶端的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用於用於網絡讀寫的線程組
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//設置通道
                    .option(ChannelOption.SO_BACKLOG, 1024)//設置TCP參數
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();//綁定端口,同步等待成功

            f.channel().closeFuture().sync();//等待服務端監聽端口關閉,才退出main函數
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new EchoServer().bind(port);
    }
}

 

    2.EchoServerHandler

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("收到客戶端請求:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

  5.2.2利用telnet命令行測試EchoServer服務端

第6章  編解碼技術

6.1Java序列化的缺點

  6.1.1沒法跨語言

  6.1.2序列化後的碼流太大

  6.1.3序列化性能過低

6.2業界主流的編解碼框架

  6.2.1Google的Protobuf

  6.2.2Facebook的Thrift

    適用於靜態的數據交換、適用於搭建大型數據交換及存儲的通用工具

 

  6.2.3JBoss Marshalling

第7章  MessagePack編解碼

7.1MessagePack介紹

  7.1.1MessagePack多語言支持

  7.1.2MessagePack Java API介紹

    1.導入

 

    compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12'

 

    2.使用

 

    public static void main(String[] args) throws IOException {
        List<String> list=new ArrayList<>();
        list.add("a");
        list.add("b");
        list.add("c");
        MessagePack messagePack=new MessagePack();
        byte[] data = messagePack.write(list);
        List<String> read = messagePack.read(data, Templates.tList(Templates.TString));
        for(String s:read){
            System.out.println(s);
        }
    }

 

  7.1.3MessagePack開發包下載

7.2MessagePack編碼器和解碼器開發

  7.2.1MessagePack編碼器開發

    1.MsgpackEncoder

 

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;

//編碼器
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        MessagePack messagePack = new MessagePack();
        byte[] write = messagePack.write(msg);//將POJO對象編碼爲byte數組
        out.writeBytes(write);
    }
}

 

  7.2.2MessagePack解碼器開發

    1.MsgpackDecoder

 

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.util.List;

//解碼器
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        final byte[] array;
        final int length = msg.readableBytes();
        array = new byte[length];
        msg.getBytes(msg.readerIndex(), array, 0, length);//獲取須要解碼的byte數組
        MessagePack messagePack = new MessagePack();
        out.add(messagePack.read(array));//反序列化爲Objec對象,添加到解碼列表
    }
}

 

  7.2.3功能測試

7.3粘包/半包支持

  1.EchoClient

 

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

public class EchoClient {
    private final String host;
    private final int port;
    private final int sendNumber;

    public EchoClient(String host, int port, int sendNumber) {
        this.host = host;
        this.port = port;
        this.sendNumber = sendNumber;
    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                            ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
                            ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                            ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
                            ch.pipeline().addLast(new EchoClientHandler(sendNumber));
                        }
                    });
            //發起異步鏈接操做
            ChannelFuture f = b.connect(host, port).sync();

            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new EchoClient("127.0.0.1",port,100).run();
    }
}

 

  2.EchoClientHandler

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoClientHandler extends ChannelHandlerAdapter {
    private final int sendNumber;

    public EchoClientHandler(int sendNumber) {
        this.sendNumber = sendNumber;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        UserInfo[] userInfos = UserInfo();
        for (UserInfo userInfo : userInfos) {
            ctx.write(userInfo);
        }
        ctx.flush();
    }

    private UserInfo[] UserInfo() {
        UserInfo[] userInfos = new UserInfo[sendNumber];
        UserInfo userInfo;
        System.out.println(sendNumber);
        for (int i = 0; i < sendNumber; i++) {
            userInfo = new UserInfo();
            userInfo.setAge(i);
            userInfo.setName("---->" + i);
            userInfos[i] = userInfo;
            System.out.println(i);
        }
        return userInfos;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("客戶端接收消息 = " + msg);
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
}

  3.EchoServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EchoServer {
    public void bind(int port) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用於接收客戶端的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用於用於網絡讀寫的線程組
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//設置通道
                    .option(ChannelOption.SO_BACKLOG, 1024)//設置TCP參數
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                            ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
                            ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                            ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();//綁定端口,同步等待成功

            f.channel().closeFuture().sync();//等待服務端監聽端口關閉,才退出main函數
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new EchoServer().bind(port);
    }
}

  4.EchoServerHandler

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("收到客戶端請求:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

第8章  Google Protobuf編解碼

8.1Protobuf的入門

  8.1.1Protobuf開發環境搭建

  8.1.2Protobuf編解碼開發

    1.SubscribeReqProto

 

public final class SubscribeReqProto {
  private SubscribeReqProto() {}
  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistry registry) {
  }
  public interface SubscribeReqOrBuilder
      extends com.google.protobuf.MessageOrBuilder {

    // required int32 subReqID = 1;
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    boolean hasSubReqID();
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    int getSubReqID();

    // required string userName = 2;
    /**
     * <code>required string userName = 2;</code>
     */
    boolean hasUserName();
    /**
     * <code>required string userName = 2;</code>
     */
    String getUserName();
    /**
     * <code>required string userName = 2;</code>
     */
    com.google.protobuf.ByteString
        getUserNameBytes();

    // required string productName = 3;
    /**
     * <code>required string productName = 3;</code>
     */
    boolean hasProductName();
    /**
     * <code>required string productName = 3;</code>
     */
    String getProductName();
    /**
     * <code>required string productName = 3;</code>
     */
    com.google.protobuf.ByteString
        getProductNameBytes();

    // repeated string address = 4;
    /**
     * <code>repeated string address = 4;</code>
     */
    java.util.List<String>
    getAddressList();
    /**
     * <code>repeated string address = 4;</code>
     */
    int getAddressCount();
    /**
     * <code>repeated string address = 4;</code>
     */
    String getAddress(int index);
    /**
     * <code>repeated string address = 4;</code>
     */
    com.google.protobuf.ByteString
        getAddressBytes(int index);
  }
  /**
   * Protobuf type {@code netty.SubscribeReq}
   */
  public static final class SubscribeReq extends
      com.google.protobuf.GeneratedMessage
      implements SubscribeReqOrBuilder {
    // Use SubscribeReq.newBuilder() to construct.
    private SubscribeReq(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
      super(builder);
      this.unknownFields = builder.getUnknownFields();
    }
    private SubscribeReq(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }

    private static final SubscribeReq defaultInstance;
    public static SubscribeReq getDefaultInstance() {
      return defaultInstance;
    }

    public SubscribeReq getDefaultInstanceForType() {
      return defaultInstance;
    }

    private final com.google.protobuf.UnknownFieldSet unknownFields;
    @Override
    public final com.google.protobuf.UnknownFieldSet
        getUnknownFields() {
      return this.unknownFields;
    }
    private SubscribeReq(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      initFields();
      int mutable_bitField0_ = 0;
      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
          com.google.protobuf.UnknownFieldSet.newBuilder();
      try {
        boolean done = false;
        while (!done) {
          int tag = input.readTag();
          switch (tag) {
            case 0:
              done = true;
              break;
            default: {
              if (!parseUnknownField(input, unknownFields,
                                     extensionRegistry, tag)) {
                done = true;
              }
              break;
            }
            case 8: {
              bitField0_ |= 0x00000001;
              subReqID_ = input.readInt32();
              break;
            }
            case 18: {
              bitField0_ |= 0x00000002;
              userName_ = input.readBytes();
              break;
            }
            case 26: {
              bitField0_ |= 0x00000004;
              productName_ = input.readBytes();
              break;
            }
            case 34: {
              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
                address_ = new com.google.protobuf.LazyStringArrayList();
                mutable_bitField0_ |= 0x00000008;
              }
              address_.add(input.readBytes());
              break;
            }
          }
        }
      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
        throw e.setUnfinishedMessage(this);
      } catch (java.io.IOException e) {
        throw new com.google.protobuf.InvalidProtocolBufferException(
            e.getMessage()).setUnfinishedMessage(this);
      } finally {
        if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
          address_ = new com.google.protobuf.UnmodifiableLazyStringList(address_);
        }
        this.unknownFields = unknownFields.build();
        makeExtensionsImmutable();
      }
    }
    public static final com.google.protobuf.Descriptors.Descriptor
        getDescriptor() {
      return SubscribeReqProto.internal_static_netty_SubscribeReq_descriptor;
    }

    protected FieldAccessorTable
        internalGetFieldAccessorTable() {
      return SubscribeReqProto.internal_static_netty_SubscribeReq_fieldAccessorTable
          .ensureFieldAccessorsInitialized(
              SubscribeReq.class, Builder.class);
    }

    public static com.google.protobuf.Parser<SubscribeReq> PARSER =
        new com.google.protobuf.AbstractParser<SubscribeReq>() {
      public SubscribeReq parsePartialFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws com.google.protobuf.InvalidProtocolBufferException {
        return new SubscribeReq(input, extensionRegistry);
      }
    };

    @Override
    public com.google.protobuf.Parser<SubscribeReq> getParserForType() {
      return PARSER;
    }

    private int bitField0_;
    // required int32 subReqID = 1;
    public static final int SUBREQID_FIELD_NUMBER = 1;
    private int subReqID_;
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    public boolean hasSubReqID() {
      return ((bitField0_ & 0x00000001) == 0x00000001);
    }
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    public int getSubReqID() {
      return subReqID_;
    }

    // required string userName = 2;
    public static final int USERNAME_FIELD_NUMBER = 2;
    private Object userName_;
    /**
     * <code>required string userName = 2;</code>
     */
    public boolean hasUserName() {
      return ((bitField0_ & 0x00000002) == 0x00000002);
    }
    /**
     * <code>required string userName = 2;</code>
     */
    public String getUserName() {
      Object ref = userName_;
      if (ref instanceof String) {
        return (String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        String s = bs.toStringUtf8();
        if (bs.isValidUtf8()) {
          userName_ = s;
        }
        return s;
      }
    }
    /**
     * <code>required string userName = 2;</code>
     */
    public com.google.protobuf.ByteString
        getUserNameBytes() {
      Object ref = userName_;
      if (ref instanceof String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (String) ref);
        userName_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    // required string productName = 3;
    public static final int PRODUCTNAME_FIELD_NUMBER = 3;
    private Object productName_;
    /**
     * <code>required string productName = 3;</code>
     */
    public boolean hasProductName() {
      return ((bitField0_ & 0x00000004) == 0x00000004);
    }
    /**
     * <code>required string productName = 3;</code>
     */
    public String getProductName() {
      Object ref = productName_;
      if (ref instanceof String) {
        return (String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        String s = bs.toStringUtf8();
        if (bs.isValidUtf8()) {
          productName_ = s;
        }
        return s;
      }
    }
    /**
     * <code>required string productName = 3;</code>
     */
    public com.google.protobuf.ByteString
        getProductNameBytes() {
      Object ref = productName_;
      if (ref instanceof String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (String) ref);
        productName_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    // repeated string address = 4;
    public static final int ADDRESS_FIELD_NUMBER = 4;
    private com.google.protobuf.LazyStringList address_;
    /**
     * <code>repeated string address = 4;</code>
     */
    public java.util.List<String>
        getAddressList() {
      return address_;
    }
    /**
     * <code>repeated string address = 4;</code>
     */
    public int getAddressCount() {
      return address_.size();
    }
    /**
     * <code>repeated string address = 4;</code>
     */
    public String getAddress(int index) {
      return address_.get(index);
    }
    /**
     * <code>repeated string address = 4;</code>
     */
    public com.google.protobuf.ByteString
        getAddressBytes(int index) {
      return address_.getByteString(index);
    }

    private void initFields() {
      subReqID_ = 0;
      userName_ = "";
      productName_ = "";
      address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
    }
    private byte memoizedIsInitialized = -1;
    public final boolean isInitialized() {
      byte isInitialized = memoizedIsInitialized;
      if (isInitialized != -1) return isInitialized == 1;

      if (!hasSubReqID()) {
        memoizedIsInitialized = 0;
        return false;
      }
      if (!hasUserName()) {
        memoizedIsInitialized = 0;
        return false;
      }
      if (!hasProductName()) {
        memoizedIsInitialized = 0;
        return false;
      }
      memoizedIsInitialized = 1;
      return true;
    }

    public void writeTo(com.google.protobuf.CodedOutputStream output)
                        throws java.io.IOException {
      getSerializedSize();
      if (((bitField0_ & 0x00000001) == 0x00000001)) {
        output.writeInt32(1, subReqID_);
      }
      if (((bitField0_ & 0x00000002) == 0x00000002)) {
        output.writeBytes(2, getUserNameBytes());
      }
      if (((bitField0_ & 0x00000004) == 0x00000004)) {
        output.writeBytes(3, getProductNameBytes());
      }
      for (int i = 0; i < address_.size(); i++) {
        output.writeBytes(4, address_.getByteString(i));
      }
      getUnknownFields().writeTo(output);
    }

    private int memoizedSerializedSize = -1;
    public int getSerializedSize() {
      int size = memoizedSerializedSize;
      if (size != -1) return size;

      size = 0;
      if (((bitField0_ & 0x00000001) == 0x00000001)) {
        size += com.google.protobuf.CodedOutputStream
          .computeInt32Size(1, subReqID_);
      }
      if (((bitField0_ & 0x00000002) == 0x00000002)) {
        size += com.google.protobuf.CodedOutputStream
          .computeBytesSize(2, getUserNameBytes());
      }
      if (((bitField0_ & 0x00000004) == 0x00000004)) {
        size += com.google.protobuf.CodedOutputStream
          .computeBytesSize(3, getProductNameBytes());
      }
      {
        int dataSize = 0;
        for (int i = 0; i < address_.size(); i++) {
          dataSize += com.google.protobuf.CodedOutputStream
            .computeBytesSizeNoTag(address_.getByteString(i));
        }
        size += dataSize;
        size += 1 * getAddressList().size();
      }
      size += getUnknownFields().getSerializedSize();
      memoizedSerializedSize = size;
      return size;
    }

    private static final long serialVersionUID = 0L;
    @Override
    protected Object writeReplace()
        throws java.io.ObjectStreamException {
      return super.writeReplace();
    }

    public static SubscribeReq parseFrom(
        com.google.protobuf.ByteString data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static SubscribeReq parseFrom(
        com.google.protobuf.ByteString data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static SubscribeReq parseFrom(byte[] data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static SubscribeReq parseFrom(
        byte[] data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static SubscribeReq parseFrom(java.io.InputStream input)
        throws java.io.IOException {
      return PARSER.parseFrom(input);
    }
    public static SubscribeReq parseFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return PARSER.parseFrom(input, extensionRegistry);
    }
    public static SubscribeReq parseDelimitedFrom(java.io.InputStream input)
        throws java.io.IOException {
      return PARSER.parseDelimitedFrom(input);
    }
    public static SubscribeReq parseDelimitedFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return PARSER.parseDelimitedFrom(input, extensionRegistry);
    }
    public static SubscribeReq parseFrom(
        com.google.protobuf.CodedInputStream input)
        throws java.io.IOException {
      return PARSER.parseFrom(input);
    }
    public static SubscribeReq parseFrom(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return PARSER.parseFrom(input, extensionRegistry);
    }

    public static Builder newBuilder() { return Builder.create(); }
    public Builder newBuilderForType() { return newBuilder(); }
    public static Builder newBuilder(SubscribeReq prototype) {
      return newBuilder().mergeFrom(prototype);
    }
    public Builder toBuilder() { return newBuilder(this); }

    @Override
    protected Builder newBuilderForType(
        BuilderParent parent) {
      Builder builder = new Builder(parent);
      return builder;
    }
    /**
     * Protobuf type {@code netty.SubscribeReq}
     */
    public static final class Builder extends
        com.google.protobuf.GeneratedMessage.Builder<Builder>
       implements SubscribeReqOrBuilder {
      public static final com.google.protobuf.Descriptors.Descriptor
          getDescriptor() {
        return SubscribeReqProto.internal_static_netty_SubscribeReq_descriptor;
      }

      protected FieldAccessorTable
          internalGetFieldAccessorTable() {
        return SubscribeReqProto.internal_static_netty_SubscribeReq_fieldAccessorTable
            .ensureFieldAccessorsInitialized(
                SubscribeReq.class, Builder.class);
      }

      // Construct using com.example.demo.protobuf.SubscribeReqProto.SubscribeReq.newBuilder()
      private Builder() {
        maybeForceBuilderInitialization();
      }

      private Builder(
          BuilderParent parent) {
        super(parent);
        maybeForceBuilderInitialization();
      }
      private void maybeForceBuilderInitialization() {
        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
        }
      }
      private static Builder create() {
        return new Builder();
      }

      public Builder clear() {
        super.clear();
        subReqID_ = 0;
        bitField0_ = (bitField0_ & ~0x00000001);
        userName_ = "";
        bitField0_ = (bitField0_ & ~0x00000002);
        productName_ = "";
        bitField0_ = (bitField0_ & ~0x00000004);
        address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
        bitField0_ = (bitField0_ & ~0x00000008);
        return this;
      }

      public Builder clone() {
        return create().mergeFrom(buildPartial());
      }

      public com.google.protobuf.Descriptors.Descriptor
          getDescriptorForType() {
        return SubscribeReqProto.internal_static_netty_SubscribeReq_descriptor;
      }

      public SubscribeReq getDefaultInstanceForType() {
        return SubscribeReq.getDefaultInstance();
      }

      public SubscribeReq build() {
        SubscribeReq result = buildPartial();
        if (!result.isInitialized()) {
          throw newUninitializedMessageException(result);
        }
        return result;
      }

      public SubscribeReq buildPartial() {
        SubscribeReq result = new SubscribeReq(this);
        int from_bitField0_ = bitField0_;
        int to_bitField0_ = 0;
        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
          to_bitField0_ |= 0x00000001;
        }
        result.subReqID_ = subReqID_;
        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
          to_bitField0_ |= 0x00000002;
        }
        result.userName_ = userName_;
        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
          to_bitField0_ |= 0x00000004;
        }
        result.productName_ = productName_;
        if (((bitField0_ & 0x00000008) == 0x00000008)) {
          address_ = new com.google.protobuf.UnmodifiableLazyStringList(
              address_);
          bitField0_ = (bitField0_ & ~0x00000008);
        }
        result.address_ = address_;
        result.bitField0_ = to_bitField0_;
        onBuilt();
        return result;
      }

      public Builder mergeFrom(com.google.protobuf.Message other) {
        if (other instanceof SubscribeReq) {
          return mergeFrom((SubscribeReq)other);
        } else {
          super.mergeFrom(other);
          return this;
        }
      }

      public Builder mergeFrom(SubscribeReq other) {
        if (other == SubscribeReq.getDefaultInstance()) return this;
        if (other.hasSubReqID()) {
          setSubReqID(other.getSubReqID());
        }
        if (other.hasUserName()) {
          bitField0_ |= 0x00000002;
          userName_ = other.userName_;
          onChanged();
        }
        if (other.hasProductName()) {
          bitField0_ |= 0x00000004;
          productName_ = other.productName_;
          onChanged();
        }
        if (!other.address_.isEmpty()) {
          if (address_.isEmpty()) {
            address_ = other.address_;
            bitField0_ = (bitField0_ & ~0x00000008);
          } else {
            ensureAddressIsMutable();
            address_.addAll(other.address_);
          }
          onChanged();
        }
        this.mergeUnknownFields(other.getUnknownFields());
        return this;
      }

      public final boolean isInitialized() {
        if (!hasSubReqID()) {
          
          return false;
        }
        if (!hasUserName()) {
          
          return false;
        }
        if (!hasProductName()) {
          
          return false;
        }
        return true;
      }

      public Builder mergeFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws java.io.IOException {
        SubscribeReq parsedMessage = null;
        try {
          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
          parsedMessage = (SubscribeReq) e.getUnfinishedMessage();
          throw e;
        } finally {
          if (parsedMessage != null) {
            mergeFrom(parsedMessage);
          }
        }
        return this;
      }
      private int bitField0_;

      // required int32 subReqID = 1;
      private int subReqID_ ;
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public boolean hasSubReqID() {
        return ((bitField0_ & 0x00000001) == 0x00000001);
      }
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public int getSubReqID() {
        return subReqID_;
      }
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public Builder setSubReqID(int value) {
        bitField0_ |= 0x00000001;
        subReqID_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public Builder clearSubReqID() {
        bitField0_ = (bitField0_ & ~0x00000001);
        subReqID_ = 0;
        onChanged();
        return this;
      }

      // required string userName = 2;
      private Object userName_ = "";
      /**
       * <code>required string userName = 2;</code>
       */
      public boolean hasUserName() {
        return ((bitField0_ & 0x00000002) == 0x00000002);
      }
      /**
       * <code>required string userName = 2;</code>
       */
      public String getUserName() {
        Object ref = userName_;
        if (!(ref instanceof String)) {
          String s = ((com.google.protobuf.ByteString) ref)
              .toStringUtf8();
          userName_ = s;
          return s;
        } else {
          return (String) ref;
        }
      }
      /**
       * <code>required string userName = 2;</code>
       */
      public com.google.protobuf.ByteString
          getUserNameBytes() {
        Object ref = userName_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (String) ref);
          userName_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>required string userName = 2;</code>
       */
      public Builder setUserName(
          String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  bitField0_ |= 0x00000002;
        userName_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>required string userName = 2;</code>
       */
      public Builder clearUserName() {
        bitField0_ = (bitField0_ & ~0x00000002);
        userName_ = getDefaultInstance().getUserName();
        onChanged();
        return this;
      }
      /**
       * <code>required string userName = 2;</code>
       */
      public Builder setUserNameBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  bitField0_ |= 0x00000002;
        userName_ = value;
        onChanged();
        return this;
      }

      // required string productName = 3;
      private Object productName_ = "";
      /**
       * <code>required string productName = 3;</code>
       */
      public boolean hasProductName() {
        return ((bitField0_ & 0x00000004) == 0x00000004);
      }
      /**
       * <code>required string productName = 3;</code>
       */
      public String getProductName() {
        Object ref = productName_;
        if (!(ref instanceof String)) {
          String s = ((com.google.protobuf.ByteString) ref)
              .toStringUtf8();
          productName_ = s;
          return s;
        } else {
          return (String) ref;
        }
      }
      /**
       * <code>required string productName = 3;</code>
       */
      public com.google.protobuf.ByteString
          getProductNameBytes() {
        Object ref = productName_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (String) ref);
          productName_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>required string productName = 3;</code>
       */
      public Builder setProductName(
          String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  bitField0_ |= 0x00000004;
        productName_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>required string productName = 3;</code>
       */
      public Builder clearProductName() {
        bitField0_ = (bitField0_ & ~0x00000004);
        productName_ = getDefaultInstance().getProductName();
        onChanged();
        return this;
      }
      /**
       * <code>required string productName = 3;</code>
       */
      public Builder setProductNameBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  bitField0_ |= 0x00000004;
        productName_ = value;
        onChanged();
        return this;
      }

      // repeated string address = 4;
      private com.google.protobuf.LazyStringList address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
      private void ensureAddressIsMutable() {
        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
          address_ = new com.google.protobuf.LazyStringArrayList(address_);
          bitField0_ |= 0x00000008;
         }
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public java.util.List<String>
          getAddressList() {
        return java.util.Collections.unmodifiableList(address_);
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public int getAddressCount() {
        return address_.size();
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public String getAddress(int index) {
        return address_.get(index);
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public com.google.protobuf.ByteString
          getAddressBytes(int index) {
        return address_.getByteString(index);
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public Builder setAddress(
          int index, String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  ensureAddressIsMutable();
        address_.set(index, value);
        onChanged();
        return this;
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public Builder addAddress(
          String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  ensureAddressIsMutable();
        address_.add(value);
        onChanged();
        return this;
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public Builder addAllAddress(
          Iterable<String> values) {
        ensureAddressIsMutable();
        super.addAll(values, address_);
        onChanged();
        return this;
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public Builder clearAddress() {
        address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
        bitField0_ = (bitField0_ & ~0x00000008);
        onChanged();
        return this;
      }
      /**
       * <code>repeated string address = 4;</code>
       */
      public Builder addAddressBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  ensureAddressIsMutable();
        address_.add(value);
        onChanged();
        return this;
      }

      // @@protoc_insertion_point(builder_scope:netty.SubscribeReq)
    }

    static {
      defaultInstance = new SubscribeReq(true);
      defaultInstance.initFields();
    }

    // @@protoc_insertion_point(class_scope:netty.SubscribeReq)
  }

  private static com.google.protobuf.Descriptors.Descriptor
    internal_static_netty_SubscribeReq_descriptor;
  private static
    com.google.protobuf.GeneratedMessage.FieldAccessorTable
      internal_static_netty_SubscribeReq_fieldAccessorTable;

  public static com.google.protobuf.Descriptors.FileDescriptor
      getDescriptor() {
    return descriptor;
  }
  private static com.google.protobuf.Descriptors.FileDescriptor
      descriptor;
  static {
    String[] descriptorData = {
      "\n\022SubscribeReq.proto\022\005netty\"X\n\014Subscribe" +
      "Req\022\020\n\010subReqID\030\001 \002(\005\022\020\n\010userName\030\002 \002(\t\022" +
      "\023\n\013productName\030\003 \002(\t\022\017\n\007address\030\004 \003(\tB.\n" +
      "\031com.example.demo.protobufB\021SubscribeReq" +
      "Proto"
    };
    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
        public com.google.protobuf.ExtensionRegistry assignDescriptors(
            com.google.protobuf.Descriptors.FileDescriptor root) {
          descriptor = root;
          internal_static_netty_SubscribeReq_descriptor =
            getDescriptor().getMessageTypes().get(0);
          internal_static_netty_SubscribeReq_fieldAccessorTable = new
            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
              internal_static_netty_SubscribeReq_descriptor,
              new String[] { "SubReqID", "UserName", "ProductName", "Address", });
          return null;
        }
      };
    com.google.protobuf.Descriptors.FileDescriptor
      .internalBuildGeneratedFileFrom(descriptorData,
        new com.google.protobuf.Descriptors.FileDescriptor[] {
        }, assigner);
  }

  // @@protoc_insertion_point(outer_class_scope)
}

 

    2.SubscribeRespProto

 

public final class SubscribeRespProto {
  private SubscribeRespProto() {}
  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistry registry) {
  }
  public interface SubscribeRespOrBuilder
      extends com.google.protobuf.MessageOrBuilder {

    // required int32 subReqID = 1;
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    boolean hasSubReqID();
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    int getSubReqID();

    // required int32 respCode = 2;
    /**
     * <code>required int32 respCode = 2;</code>
     */
    boolean hasRespCode();
    /**
     * <code>required int32 respCode = 2;</code>
     */
    int getRespCode();

    // required string desc = 3;
    /**
     * <code>required string desc = 3;</code>
     */
    boolean hasDesc();
    /**
     * <code>required string desc = 3;</code>
     */
    String getDesc();
    /**
     * <code>required string desc = 3;</code>
     */
    com.google.protobuf.ByteString
        getDescBytes();
  }
  /**
   * Protobuf type {@code netty.SubscribeResp}
   */
  public static final class SubscribeResp extends
      com.google.protobuf.GeneratedMessage
      implements SubscribeRespOrBuilder {
    // Use SubscribeResp.newBuilder() to construct.
    private SubscribeResp(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
      super(builder);
      this.unknownFields = builder.getUnknownFields();
    }
    private SubscribeResp(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }

    private static final SubscribeResp defaultInstance;
    public static SubscribeResp getDefaultInstance() {
      return defaultInstance;
    }

    public SubscribeResp getDefaultInstanceForType() {
      return defaultInstance;
    }

    private final com.google.protobuf.UnknownFieldSet unknownFields;
    @Override
    public final com.google.protobuf.UnknownFieldSet
        getUnknownFields() {
      return this.unknownFields;
    }
    private SubscribeResp(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      initFields();
      int mutable_bitField0_ = 0;
      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
          com.google.protobuf.UnknownFieldSet.newBuilder();
      try {
        boolean done = false;
        while (!done) {
          int tag = input.readTag();
          switch (tag) {
            case 0:
              done = true;
              break;
            default: {
              if (!parseUnknownField(input, unknownFields,
                                     extensionRegistry, tag)) {
                done = true;
              }
              break;
            }
            case 8: {
              bitField0_ |= 0x00000001;
              subReqID_ = input.readInt32();
              break;
            }
            case 16: {
              bitField0_ |= 0x00000002;
              respCode_ = input.readInt32();
              break;
            }
            case 26: {
              bitField0_ |= 0x00000004;
              desc_ = input.readBytes();
              break;
            }
          }
        }
      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
        throw e.setUnfinishedMessage(this);
      } catch (java.io.IOException e) {
        throw new com.google.protobuf.InvalidProtocolBufferException(
            e.getMessage()).setUnfinishedMessage(this);
      } finally {
        this.unknownFields = unknownFields.build();
        makeExtensionsImmutable();
      }
    }
    public static final com.google.protobuf.Descriptors.Descriptor
        getDescriptor() {
      return SubscribeRespProto.internal_static_netty_SubscribeResp_descriptor;
    }

    protected FieldAccessorTable
        internalGetFieldAccessorTable() {
      return SubscribeRespProto.internal_static_netty_SubscribeResp_fieldAccessorTable
          .ensureFieldAccessorsInitialized(
              SubscribeResp.class, Builder.class);
    }

    public static com.google.protobuf.Parser<SubscribeResp> PARSER =
        new com.google.protobuf.AbstractParser<SubscribeResp>() {
      public SubscribeResp parsePartialFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws com.google.protobuf.InvalidProtocolBufferException {
        return new SubscribeResp(input, extensionRegistry);
      }
    };

    @Override
    public com.google.protobuf.Parser<SubscribeResp> getParserForType() {
      return PARSER;
    }

    private int bitField0_;
    // required int32 subReqID = 1;
    public static final int SUBREQID_FIELD_NUMBER = 1;
    private int subReqID_;
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    public boolean hasSubReqID() {
      return ((bitField0_ & 0x00000001) == 0x00000001);
    }
    /**
     * <code>required int32 subReqID = 1;</code>
     */
    public int getSubReqID() {
      return subReqID_;
    }

    // required int32 respCode = 2;
    public static final int RESPCODE_FIELD_NUMBER = 2;
    private int respCode_;
    /**
     * <code>required int32 respCode = 2;</code>
     */
    public boolean hasRespCode() {
      return ((bitField0_ & 0x00000002) == 0x00000002);
    }
    /**
     * <code>required int32 respCode = 2;</code>
     */
    public int getRespCode() {
      return respCode_;
    }

    // required string desc = 3;
    public static final int DESC_FIELD_NUMBER = 3;
    private Object desc_;
    /**
     * <code>required string desc = 3;</code>
     */
    public boolean hasDesc() {
      return ((bitField0_ & 0x00000004) == 0x00000004);
    }
    /**
     * <code>required string desc = 3;</code>
     */
    public String getDesc() {
      Object ref = desc_;
      if (ref instanceof String) {
        return (String) ref;
      } else {
        com.google.protobuf.ByteString bs = 
            (com.google.protobuf.ByteString) ref;
        String s = bs.toStringUtf8();
        if (bs.isValidUtf8()) {
          desc_ = s;
        }
        return s;
      }
    }
    /**
     * <code>required string desc = 3;</code>
     */
    public com.google.protobuf.ByteString
        getDescBytes() {
      Object ref = desc_;
      if (ref instanceof String) {
        com.google.protobuf.ByteString b = 
            com.google.protobuf.ByteString.copyFromUtf8(
                (String) ref);
        desc_ = b;
        return b;
      } else {
        return (com.google.protobuf.ByteString) ref;
      }
    }

    private void initFields() {
      subReqID_ = 0;
      respCode_ = 0;
      desc_ = "";
    }
    private byte memoizedIsInitialized = -1;
    public final boolean isInitialized() {
      byte isInitialized = memoizedIsInitialized;
      if (isInitialized != -1) return isInitialized == 1;

      if (!hasSubReqID()) {
        memoizedIsInitialized = 0;
        return false;
      }
      if (!hasRespCode()) {
        memoizedIsInitialized = 0;
        return false;
      }
      if (!hasDesc()) {
        memoizedIsInitialized = 0;
        return false;
      }
      memoizedIsInitialized = 1;
      return true;
    }

    public void writeTo(com.google.protobuf.CodedOutputStream output)
                        throws java.io.IOException {
      getSerializedSize();
      if (((bitField0_ & 0x00000001) == 0x00000001)) {
        output.writeInt32(1, subReqID_);
      }
      if (((bitField0_ & 0x00000002) == 0x00000002)) {
        output.writeInt32(2, respCode_);
      }
      if (((bitField0_ & 0x00000004) == 0x00000004)) {
        output.writeBytes(3, getDescBytes());
      }
      getUnknownFields().writeTo(output);
    }

    private int memoizedSerializedSize = -1;
    public int getSerializedSize() {
      int size = memoizedSerializedSize;
      if (size != -1) return size;

      size = 0;
      if (((bitField0_ & 0x00000001) == 0x00000001)) {
        size += com.google.protobuf.CodedOutputStream
          .computeInt32Size(1, subReqID_);
      }
      if (((bitField0_ & 0x00000002) == 0x00000002)) {
        size += com.google.protobuf.CodedOutputStream
          .computeInt32Size(2, respCode_);
      }
      if (((bitField0_ & 0x00000004) == 0x00000004)) {
        size += com.google.protobuf.CodedOutputStream
          .computeBytesSize(3, getDescBytes());
      }
      size += getUnknownFields().getSerializedSize();
      memoizedSerializedSize = size;
      return size;
    }

    private static final long serialVersionUID = 0L;
    @Override
    protected Object writeReplace()
        throws java.io.ObjectStreamException {
      return super.writeReplace();
    }

    public static SubscribeResp parseFrom(
        com.google.protobuf.ByteString data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static SubscribeResp parseFrom(
        com.google.protobuf.ByteString data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static SubscribeResp parseFrom(byte[] data)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data);
    }
    public static SubscribeResp parseFrom(
        byte[] data,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws com.google.protobuf.InvalidProtocolBufferException {
      return PARSER.parseFrom(data, extensionRegistry);
    }
    public static SubscribeResp parseFrom(java.io.InputStream input)
        throws java.io.IOException {
      return PARSER.parseFrom(input);
    }
    public static SubscribeResp parseFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return PARSER.parseFrom(input, extensionRegistry);
    }
    public static SubscribeResp parseDelimitedFrom(java.io.InputStream input)
        throws java.io.IOException {
      return PARSER.parseDelimitedFrom(input);
    }
    public static SubscribeResp parseDelimitedFrom(
        java.io.InputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return PARSER.parseDelimitedFrom(input, extensionRegistry);
    }
    public static SubscribeResp parseFrom(
        com.google.protobuf.CodedInputStream input)
        throws java.io.IOException {
      return PARSER.parseFrom(input);
    }
    public static SubscribeResp parseFrom(
        com.google.protobuf.CodedInputStream input,
        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
        throws java.io.IOException {
      return PARSER.parseFrom(input, extensionRegistry);
    }

    public static Builder newBuilder() { return Builder.create(); }
    public Builder newBuilderForType() { return newBuilder(); }
    public static Builder newBuilder(SubscribeResp prototype) {
      return newBuilder().mergeFrom(prototype);
    }
    public Builder toBuilder() { return newBuilder(this); }

    @Override
    protected Builder newBuilderForType(
        BuilderParent parent) {
      Builder builder = new Builder(parent);
      return builder;
    }
    /**
     * Protobuf type {@code netty.SubscribeResp}
     */
    public static final class Builder extends
        com.google.protobuf.GeneratedMessage.Builder<Builder>
       implements SubscribeRespOrBuilder {
      public static final com.google.protobuf.Descriptors.Descriptor
          getDescriptor() {
        return SubscribeRespProto.internal_static_netty_SubscribeResp_descriptor;
      }

      protected FieldAccessorTable
          internalGetFieldAccessorTable() {
        return SubscribeRespProto.internal_static_netty_SubscribeResp_fieldAccessorTable
            .ensureFieldAccessorsInitialized(
                SubscribeResp.class, Builder.class);
      }

      // Construct using com.example.demo.protobuf.SubscribeRespProto.SubscribeResp.newBuilder()
      private Builder() {
        maybeForceBuilderInitialization();
      }

      private Builder(
          BuilderParent parent) {
        super(parent);
        maybeForceBuilderInitialization();
      }
      private void maybeForceBuilderInitialization() {
        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
        }
      }
      private static Builder create() {
        return new Builder();
      }

      public Builder clear() {
        super.clear();
        subReqID_ = 0;
        bitField0_ = (bitField0_ & ~0x00000001);
        respCode_ = 0;
        bitField0_ = (bitField0_ & ~0x00000002);
        desc_ = "";
        bitField0_ = (bitField0_ & ~0x00000004);
        return this;
      }

      public Builder clone() {
        return create().mergeFrom(buildPartial());
      }

      public com.google.protobuf.Descriptors.Descriptor
          getDescriptorForType() {
        return SubscribeRespProto.internal_static_netty_SubscribeResp_descriptor;
      }

      public SubscribeResp getDefaultInstanceForType() {
        return SubscribeResp.getDefaultInstance();
      }

      public SubscribeResp build() {
        SubscribeResp result = buildPartial();
        if (!result.isInitialized()) {
          throw newUninitializedMessageException(result);
        }
        return result;
      }

      public SubscribeResp buildPartial() {
        SubscribeResp result = new SubscribeResp(this);
        int from_bitField0_ = bitField0_;
        int to_bitField0_ = 0;
        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
          to_bitField0_ |= 0x00000001;
        }
        result.subReqID_ = subReqID_;
        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
          to_bitField0_ |= 0x00000002;
        }
        result.respCode_ = respCode_;
        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
          to_bitField0_ |= 0x00000004;
        }
        result.desc_ = desc_;
        result.bitField0_ = to_bitField0_;
        onBuilt();
        return result;
      }

      public Builder mergeFrom(com.google.protobuf.Message other) {
        if (other instanceof SubscribeResp) {
          return mergeFrom((SubscribeResp)other);
        } else {
          super.mergeFrom(other);
          return this;
        }
      }

      public Builder mergeFrom(SubscribeResp other) {
        if (other == SubscribeResp.getDefaultInstance()) return this;
        if (other.hasSubReqID()) {
          setSubReqID(other.getSubReqID());
        }
        if (other.hasRespCode()) {
          setRespCode(other.getRespCode());
        }
        if (other.hasDesc()) {
          bitField0_ |= 0x00000004;
          desc_ = other.desc_;
          onChanged();
        }
        this.mergeUnknownFields(other.getUnknownFields());
        return this;
      }

      public final boolean isInitialized() {
        if (!hasSubReqID()) {
          
          return false;
        }
        if (!hasRespCode()) {
          
          return false;
        }
        if (!hasDesc()) {
          
          return false;
        }
        return true;
      }

      public Builder mergeFrom(
          com.google.protobuf.CodedInputStream input,
          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
          throws java.io.IOException {
        SubscribeResp parsedMessage = null;
        try {
          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
          parsedMessage = (SubscribeResp) e.getUnfinishedMessage();
          throw e;
        } finally {
          if (parsedMessage != null) {
            mergeFrom(parsedMessage);
          }
        }
        return this;
      }
      private int bitField0_;

      // required int32 subReqID = 1;
      private int subReqID_ ;
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public boolean hasSubReqID() {
        return ((bitField0_ & 0x00000001) == 0x00000001);
      }
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public int getSubReqID() {
        return subReqID_;
      }
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public Builder setSubReqID(int value) {
        bitField0_ |= 0x00000001;
        subReqID_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>required int32 subReqID = 1;</code>
       */
      public Builder clearSubReqID() {
        bitField0_ = (bitField0_ & ~0x00000001);
        subReqID_ = 0;
        onChanged();
        return this;
      }

      // required int32 respCode = 2;
      private int respCode_ ;
      /**
       * <code>required int32 respCode = 2;</code>
       */
      public boolean hasRespCode() {
        return ((bitField0_ & 0x00000002) == 0x00000002);
      }
      /**
       * <code>required int32 respCode = 2;</code>
       */
      public int getRespCode() {
        return respCode_;
      }
      /**
       * <code>required int32 respCode = 2;</code>
       */
      public Builder setRespCode(int value) {
        bitField0_ |= 0x00000002;
        respCode_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>required int32 respCode = 2;</code>
       */
      public Builder clearRespCode() {
        bitField0_ = (bitField0_ & ~0x00000002);
        respCode_ = 0;
        onChanged();
        return this;
      }

      // required string desc = 3;
      private Object desc_ = "";
      /**
       * <code>required string desc = 3;</code>
       */
      public boolean hasDesc() {
        return ((bitField0_ & 0x00000004) == 0x00000004);
      }
      /**
       * <code>required string desc = 3;</code>
       */
      public String getDesc() {
        Object ref = desc_;
        if (!(ref instanceof String)) {
          String s = ((com.google.protobuf.ByteString) ref)
              .toStringUtf8();
          desc_ = s;
          return s;
        } else {
          return (String) ref;
        }
      }
      /**
       * <code>required string desc = 3;</code>
       */
      public com.google.protobuf.ByteString
          getDescBytes() {
        Object ref = desc_;
        if (ref instanceof String) {
          com.google.protobuf.ByteString b = 
              com.google.protobuf.ByteString.copyFromUtf8(
                  (String) ref);
          desc_ = b;
          return b;
        } else {
          return (com.google.protobuf.ByteString) ref;
        }
      }
      /**
       * <code>required string desc = 3;</code>
       */
      public Builder setDesc(
          String value) {
        if (value == null) {
    throw new NullPointerException();
  }
  bitField0_ |= 0x00000004;
        desc_ = value;
        onChanged();
        return this;
      }
      /**
       * <code>required string desc = 3;</code>
       */
      public Builder clearDesc() {
        bitField0_ = (bitField0_ & ~0x00000004);
        desc_ = getDefaultInstance().getDesc();
        onChanged();
        return this;
      }
      /**
       * <code>required string desc = 3;</code>
       */
      public Builder setDescBytes(
          com.google.protobuf.ByteString value) {
        if (value == null) {
    throw new NullPointerException();
  }
  bitField0_ |= 0x00000004;
        desc_ = value;
        onChanged();
        return this;
      }

      // @@protoc_insertion_point(builder_scope:netty.SubscribeResp)
    }

    static {
      defaultInstance = new SubscribeResp(true);
      defaultInstance.initFields();
    }

    // @@protoc_insertion_point(class_scope:netty.SubscribeResp)
  }

  private static com.google.protobuf.Descriptors.Descriptor
    internal_static_netty_SubscribeResp_descriptor;
  private static
    com.google.protobuf.GeneratedMessage.FieldAccessorTable
      internal_static_netty_SubscribeResp_fieldAccessorTable;

  public static com.google.protobuf.Descriptors.FileDescriptor
      getDescriptor() {
    return descriptor;
  }
  private static com.google.protobuf.Descriptors.FileDescriptor
      descriptor;
  static {
    String[] descriptorData = {
      "\n\023SubscribeResp.proto\022\005netty\"A\n\rSubscrib" +
      "eResp\022\020\n\010subReqID\030\001 \002(\005\022\020\n\010respCode\030\002 \002(" +
      "\005\022\014\n\004desc\030\003 \002(\tB/\n\031com.example.demo.prot" +
      "obufB\022SubscribeRespProto"
    };
    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
        public com.google.protobuf.ExtensionRegistry assignDescriptors(
            com.google.protobuf.Descriptors.FileDescriptor root) {
          descriptor = root;
          internal_static_netty_SubscribeResp_descriptor =
            getDescriptor().getMessageTypes().get(0);
          internal_static_netty_SubscribeResp_fieldAccessorTable = new
            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
              internal_static_netty_SubscribeResp_descriptor,
              new String[] { "SubReqID", "RespCode", "Desc", });
          return null;
        }
      };
    com.google.protobuf.Descriptors.FileDescriptor
      .internalBuildGeneratedFileFrom(descriptorData,
        new com.google.protobuf.Descriptors.FileDescriptor[] {
        }, assigner);
  }

  // @@protoc_insertion_point(outer_class_scope)
}

 

    3.TestSubscribeReqProto

 

import com.example.demo.protobuf.SubscribeReqProto;
import com.google.protobuf.InvalidProtocolBufferException;

import java.util.ArrayList;
import java.util.List;

public class TestSubscribeReqProto {
    private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
        return req.toByteArray();//編碼
    }

    private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
        return SubscribeReqProto.SubscribeReq.parseFrom(body);//解碼
    }

    private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();

        //設置屬性
        builder.setSubReqID(1);
        builder.setUserName("plxz");
        builder.setProductName("netty");

        List<String> address = new ArrayList<>();
        address.add("GuangZhou xiaomanyao");
        address.add("BeiJing tiananmen");
        address.add("ChengDu chunxilu");
        builder.addAllAddress(address);//將集合對象設置到對應的屬性中
        return builder.build();
    }

    public static void main(String[] args) throws InvalidProtocolBufferException {
        SubscribeReqProto.SubscribeReq req = createSubscribeReq();
        System.out.println("解碼前 : " + req.toString());
        SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
        System.out.println("解碼後 : " + req2.toString());
        System.out.println("比較結果 : " + req2.equals(req));
    }
}

 

  8.1.3運行Protobuf例程

8.2Netty的Protobuf服務端開發

  8.2.1Protobuf版本的圖書訂購服務端開發

    1.SubReqServer

 

import com.example.demo.protobuf.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqServer {
    public void bind(int port) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用於接收客戶端的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用於用於網絡讀寫的線程組
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//設置通道
                    .option(ChannelOption.SO_BACKLOG, 1024)//設置TCP參數
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//用於半包消息的解碼器
                            ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));//指定解碼類型,不支持讀半包
                            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            ch.pipeline().addLast(new ProtobufEncoder());
                            ch.pipeline().addLast(new SubReqServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();//綁定端口,同步等待成功

            f.channel().closeFuture().sync();//等待服務端監聽端口關閉,才退出main函數
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new SubReqServer().bind(port);
    }
}

 

    2.SubReqServerHandler

 

import com.example.demo.protobuf.SubscribeReqProto;
import com.example.demo.protobuf.SubscribeRespProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        SubscribeReqProto.SubscribeReq req= (SubscribeReqProto.SubscribeReq) msg;//通過了ProtobufDecoder解碼後能夠直接使用消息
        if("plxz".equalsIgnoreCase(req.getUserName())){//驗證用戶名
            System.out.println("收到客戶端請求 : "+req.toString());
            //使用了ProtobufEncoder自動編碼,不須要對回送消息手工編碼
            ctx.writeAndFlush(resp(req.getSubReqID()));
        }
    }

    private Object resp(int subReqID) {
        SubscribeRespProto.SubscribeResp.Builder builder=SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqID(subReqID);
        builder.setRespCode(0);
        builder.setDesc("netty buy success");
        return builder.build();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

 

  8.2.2Protobuf版本的圖書訂購客務端開發

    1.SubReqClient

 

import com.example.demo.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class SubReqClient {
    public void connect(String host, int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//用於半包處理後添加ProtobufDecoder解碼器
                            ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));//指定解碼類型
                            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            ch.pipeline().addLast(new ProtobufEncoder());
                            ch.pipeline().addLast(new SubReqClientHandler());
                        }
                    });
            //發起異步鏈接操做
            ChannelFuture f = b.connect(host, port).sync();

            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new SubReqClient().connect("127.0.0.1", port);
    }
}

 

 

    2.SubReqClientHandler

 

import com.example.demo.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.ArrayList;
import java.util.List;

public class SubReqClientHandler extends ChannelHandlerAdapter {
    public SubReqClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for (int i = 0; i < 10; i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    private SubscribeReqProto.SubscribeReq subReq(int i) {
        SubscribeReqProto.SubscribeReq.Builder builder=SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqID(i);
        builder.setUserName("plxz");
        builder.setProductName("netty");
        List<String> address = new ArrayList<>();
        address.add("GuangZhou xiaomanyao");
        address.add("BeiJing tiananmen");
        address.add("ChengDu chunxilu");
        builder.addAllAddress(address);//將集合對象設置到對應的屬性中
        return builder.build();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("收到服務端響應消息 = " + msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

 

  8.2.3Protobuf版本的圖書訂購程序功能測試

8.3Protobuf使用注意事項

 

第9章  JBoss Marshalling編解碼

9.1Marshalling開發環境準備

9.2Netty的Marshalling服務端開發

  1.SubReqServer

 

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqServer {
    public void bind(int port) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用於接收客戶端的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用於用於網絡讀寫的線程組
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//設置通道
                    .option(ChannelOption.SO_BACKLOG, 1024)//設置TCP參數
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());//建立解碼器
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());//建立編碼器
                            ch.pipeline().addLast(new SubReqServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();//綁定端口,同步等待成功

            f.channel().closeFuture().sync();//等待服務端監聽端口關閉,才退出main函數
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new SubReqServer().bind(port);
    }
}

 

  2.MarshallingCodeCFactory

 

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

public class MarshallingCodeCFactory {
    //解碼器
    public static MarshallingDecoder buildMarshallingDecoder() {
        //獲取MarshallerFactory,serial表示建立的是java序列號工廠對象
        final MarshallerFactory marshallingCodeCFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();//建立MarshallingConfiguration對象
        configuration.setVersion(5);//設置版本號
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallingCodeCFactory, configuration);
        //1024表示單個消息序列化後的最大長度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    //編碼器
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallingCodeCFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallingCodeCFactory, configuration);
        //將實現序列化接口的POJO對象序列化爲二進制數組
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

 

9.3Netty的Marshalling客戶端開發

  1.SubReqClient

 

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class SubReqClient {
    public void connect(String host, int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());//建立解碼器
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());//建立編碼器
                            ch.pipeline().addLast(new SubReqClientHandler());
                        }
                    });
            //發起異步鏈接操做
            ChannelFuture f = b.connect(host, port).sync();

            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);//設置監聽端口
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new SubReqClient().connect("127.0.0.1", port);
    }
}

 

第10章  HTTP協議開發應用

10.1HTTP協議介紹

  10.1.1HTTP協議的URL

  10.1.2HTTP請求消息

  10.1.3HTTP響應消息

10.2Netty HTTP服務端入門開發

  10.2.1HTTP服務端例程場景描述

  10.2.2HTTP服務端開發

    1.HttpFileServer

 

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class HttpFileServer {
    public static final String DEFAULT_URL = "/src/main/java/com/example/demo/resources/";

    public void run(final int port, final String url) throws Exception {
        //配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用於接收客戶端的線程組
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用於用於網絡讀寫的線程組
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//設置通道
                    .option(ChannelOption.SO_BACKLOG, 1024)//設置TCP參數
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());//建立解碼器
                            ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));//將多個消息合併
                            ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());//建立編碼器
                            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//支持異步發送大的碼流,但不佔用過多內存
                            ch.pipeline().addLast("fileServerHandler", new HttpFileServerHandler(url));//建立編碼器
                        }
                    });
            ChannelFuture f = b.bind("127.0.0.1", port).sync();//綁定端口,同步等待成功

            f.channel().closeFuture().sync();//等待服務端監聽端口關閉,才退出main函數
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (NpeCheck.checkArray(args)) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //採用默認值
            }
        }
        new HttpFileServer().run(port, DEFAULT_URL);
    }
}

 

    2.HttpFileServerHandler

 

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;

import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.net.URLDecoder;
import java.util.regex.Pattern;

import static io.netty.handler.codec.http.HttpHeaderNames.*;
import static io.netty.handler.codec.http.HttpHeaderUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaderUtil.setContentLength;
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final String url;
    private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");

    public HttpFileServerHandler(String url) {
        this.url = url;
    }

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        //消息解碼
        if (!request.decoderResult().isSuccess()) {
            sendError(ctx, BAD_REQUEST);
            return;
        }
        //是否get請求
        if (request.method() != GET) {
            sendError(ctx, METHOD_NOT_ALLOWED);
            return;
        }
        final String uri = request.uri();
        final String path = sanitizeUri(uri);
        //是否有路徑
        if (path == null) {
            sendError(ctx, FORBIDDEN);
            return;
        }
        File file = new File(path);
        //是否隱藏或存在
        if (file.isHidden() || !file.exists()) {
            sendError(ctx, NOT_FOUND);
            return;
        }
        //是否存在目錄
        if (file.isDirectory()) {
            if (uri.endsWith("/")) {
                senfListing(ctx, file);
            } else {
                sendRedirect(ctx, uri + "/");
            }
            return;
        }
        //是否文件
        if (!file.isFile()) {
            sendError(ctx, FORBIDDEN);
            return;
        }
        RandomAccessFile randomAccessFile;
        try {
            randomAccessFile = new RandomAccessFile(file, "r");//以只讀的方式打開
        } catch (FileNotFoundException fnfe) {
            sendError(ctx, NOT_FOUND);
            return;
        }
        long fileLength = randomAccessFile.length();//獲取文件長度
        HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK);
        setContentLength(response, fileLength);//設置內容長度
        setContentTypeHeader(response, file);//設置內容類型
        //是否長連進
        if (isKeepAlive(request)) {
            response.headers().set(CONNECTION, KEEP_ALIVE);
        }
        ctx.write(response);//發送響應消息
        ChannelFuture sendFileFutrue;
        //將文件寫入緩衝區
        sendFileFutrue = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
        sendFileFutrue.addListener(new ChannelProgressiveFutureListener() {
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
                if (total < 0) {
                    System.err.println("progress:" + progress);
                } else {
                    System.err.println("progress:" + progress + "/" + total); //打印進度
                }
            }

            public void operationComplete(ChannelProgressiveFuture future) {
                System.err.println("complete");//發送完成
            }
        });

        ChannelFuture lastChannelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);//標識消息發送完畢
        if (!isKeepAlive(request)) {
            lastChannelFuture.addListener(ChannelFutureListener.CLOSE);//關閉鏈接
        }
    }

    private void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
        channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    //url包裝
    public String sanitizeUri(String uri) {
        //解碼
        try {
            uri = URLDecoder.decode(uri, "UTF-8");
        } catch (Exception e) {
            try {
                uri = URLDecoder.decode(uri, "ISO-8859-1");
            } catch (Exception ew) {
                ew.printStackTrace();
            }
        }
        //合法性判斷
        if (!uri.startsWith(url)) {
            return null;
        }
        if (!uri.startsWith("/")) {
            return null;
        }
        //替換爲本地操做系統的文件路徑分隔符
        uri = uri.replace('/', File.separatorChar);
        //合法性判斷
        if (uri.contains(File.separator + '.') || uri.startsWith(".") || uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) {
            return null;
        }
        return System.getProperty("user.dir") + File.separator + uri;//返回絕對路徑
    }

    //設置消息類型
    private void setContentTypeHeader(HttpResponse httpResponse, File file) {
        MimetypesFileTypeMap mimetypesFileTypeMap = new MimetypesFileTypeMap();
        httpResponse.headers().set(CONTENT_TYPE, mimetypesFileTypeMap.getContentType(file.getPath()));
    }

    //發送目錄給瀏覽器
    private void senfListing(ChannelHandlerContext channelHandlerContext, File dir) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
        response.headers().set(CONTENT_TYPE, "text/html;charset=UTF-8");//設置消息頭
        StringBuilder builder = new StringBuilder();//構造響應消息體
        String dirPath = dir.getPath();
        builder.append("<!DOCTYPE html> \r\n");
        builder.append("<html><head><title>");
        builder.append(dirPath);
        builder.append("目錄:");
        builder.append("</title></head><body>\r\n");
        builder.append("<h3>");
        builder.append(dirPath).append("目錄:");
        builder.append("</h3>\r\n");
        builder.append("<ul>");
        builder.append("<li>連接:<a href=\"../\">..</a></li>\r\n");
        //展現目錄及其文件夾
        for (File f : dir.listFiles()) {
            if (f.isHidden() || !f.canRead()) {
                continue;
            }
            String fname = f.getName();
            if (!ALLOWED_FILE_NAME.matcher(fname).matches()) {
                continue;
            }
            builder.append("<li>連接:<a href=\" ");
            builder.append(fname);
            builder.append("\" >");
            builder.append(fname);
            builder.append("</a></li>\r\n");
        }
        builder.append("</ul></body></html>\r\n");
        ByteBuf byteBuf = Unpooled.copiedBuffer(builder, CharsetUtil.UTF_8);//分配對應消息的緩衝區對象
        response.content().writeBytes(byteBuf);//將緩衝區消息放入響應消息中
        byteBuf.release();//釋放緩衝區
        channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);//將響應消息發送到緩衝區病刷新到socket
    }

    //設置重定向
    private void sendRedirect(ChannelHandlerContext channelHandlerContext, String newUri) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
        response.headers().set(LOCATION, newUri);
        channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}

10.3Netty HTTP+XML協議棧開發

  10.3.1開發場景介紹

  10.3.2HTTP+XML協議棧設計

  10.3.3高效的XML綁定框架JiBx

  10.3.4HTTP+XML編解碼框架開發

    1.請求消息編碼類

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import java.net.InetAddress;
import java.util.List;

//請求消息編碼類
public class HttpXmlRequestEncoder extends AbstractHttpXmlEncoder<HttpXmlRequest> {
    @Override
    protected void encode(ChannelHandlerContext ctx, HttpXmlRequest msg, List<Object> out) throws Exception {
        ByteBuf body = encode0(ctx, msg.getBody());//調用父類解碼器將pojo對象解碼成xml
        FullHttpRequest request = msg.getRequest();
        if (request == null) {
            //構造新的消息頭
            request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/do", body);
            HttpHeaders headers = request.headers();
            headers.set(HttpHeaders.Names.HOST, InetAddress.getLocalHost().getHostAddress());
            headers.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
            headers.set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP.toString() + ',' + HttpHeaders.Values.DEFLATE.toString());
            headers.set(HttpHeaders.Names.ACCEPT_CHARSET, "ISO-8859-1,utf-8;q=0.7,*;q=0.7");
            headers.set(HttpHeaders.Names.ACCEPT_LANGUAGE, "zh");
            headers.set(HttpHeaders.Names.USER_AGENT, "Netty xml Http Client side");
            headers.set(HttpHeaders.Names.ACCEPT, "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
        }
        HttpHeaders.setContentLength(request, body.readableBytes());//設置消息體長度
        out.add(request);
    }
}

    2.請求消息編碼基類

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.io.StringWriter;
import java.nio.charset.Charset;
import org.jibx.runtime.BindingDirectory;
import org.jibx.runtime.IBindingFactory;
import org.jibx.runtime.IMarshallingContext;

//請求消息編碼基類
public abstract class AbstractHttpXmlEncoder<T> extends MessageToMessageEncoder<T> {
    IBindingFactory factory = null;
    StringWriter writer = null;
    final static String CHARSET_NAME = "UTF-8";
    final static Charset UTF_8 = Charset.forName(CHARSET_NAME);
    
    //將pojo對象解碼成xml
    protected ByteBuf encode0(ChannelHandlerContext ctx, Object body) throws Exception {
        factory = BindingDirectory.getFactory(body.getClass());
        writer = new StringWriter();
        IMarshallingContext mctx = factory.createMarshallingContext();
        mctx.setIndent(2);
        mctx.marshalDocument(body, CHARSET_NAME, null, writer);
        String xmlStr = writer.toString();
        writer.close();
        writer = null;
        ByteBuf encodeBuf = Unpooled.copiedBuffer(xmlStr, UTF_8);
        return encodeBuf;
    }

    @Skip
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 釋放資源
        if (writer != null) {
            writer.close();
            writer = null;
        }
    }
}

    3.請求消息類

import io.netty.handler.codec.http.FullHttpRequest;

//請求消息
public class HttpXmlRequest {

    private FullHttpRequest request;
    private Object body;//編碼對象

    public HttpXmlRequest(FullHttpRequest request, Object body) {
    this.request = request;
    this.body = body;
    }

    public final FullHttpRequest getRequest() {
    return request;
    }

    public final void setRequest(FullHttpRequest request) {
    this.request = request;
    }

    public final Object getBody() {
    return body;
    }

    public final void setBody(Object body) {
    this.body = body;
    }

    @Override
    public String toString() {
    return "HttpXmlRequest [request=" + request + ", body =" + body + "]";
    }
}

    4.請求消息解碼

import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import java.util.List;
//請求消息解碼
public class HttpXmlRequestDecoder extends AbstractHttpXmlDecoder<FullHttpRequest> {

    public HttpXmlRequestDecoder(Class<?> clazz) {
        this(clazz, false);
    }

    public HttpXmlRequestDecoder(Class<?> clazz, boolean isPrint) {
        super(clazz, isPrint);
    }

    @Override
    protected void decode(ChannelHandlerContext arg0, FullHttpRequest arg1, List<Object> arg2) throws Exception {
        //請求消息自己是否解碼成功
        if (!arg1.getDecoderResult().isSuccess()) {
            sendError(arg0, BAD_REQUEST);
            return;
        }
        HttpXmlRequest request = new HttpXmlRequest(arg1, decode0(arg0, arg1.content()));
        arg2.add(request);//添加到解碼結果列表
    }

    //構造處理結果異常的http應答消息
    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}

    5.請求消息解碼基類

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.io.StringReader;
import java.nio.charset.Charset;

import org.jibx.runtime.BindingDirectory;
import org.jibx.runtime.IBindingFactory;
import org.jibx.runtime.IUnmarshallingContext;

//請求消息解碼基類
public abstract class AbstractHttpXmlDecoder<T> extends MessageToMessageDecoder<T> {

    private IBindingFactory factory;
    private StringReader reader;
    private Class<?> clazz;
    private boolean isPrint;
    private final static String CHARSET_NAME = "UTF-8";
    private final static Charset UTF_8 = Charset.forName(CHARSET_NAME);

    protected AbstractHttpXmlDecoder(Class<?> clazz) {
        this(clazz, false);
    }

    protected AbstractHttpXmlDecoder(Class<?> clazz, boolean isPrint) {
        this.clazz = clazz;
        this.isPrint = isPrint;
    }

    //將xml轉換成pojo對象
    protected Object decode0(ChannelHandlerContext arg0, ByteBuf body) throws Exception {
        factory = BindingDirectory.getFactory(clazz);
        String content = body.toString(UTF_8);
        if (isPrint) {
            System.out.println("The body is : " + content);
        }
        reader = new StringReader(content);
        IUnmarshallingContext uctx = factory.createUnmarshallingContext();
        Object result = uctx.unmarshalDocument(reader);
        reader.close();
        reader = null;
        return result;
    }

    @Skip
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 釋放資源
        if (reader != null) {
            reader.close();
            reader = null;
        }
    }
}

    6.應答消息

import io.netty.handler.codec.http.FullHttpResponse;

//應答消息
public class HttpXmlResponse {
    private FullHttpResponse httpResponse;
    private Object result;//pojo對象

    public HttpXmlResponse(FullHttpResponse httpResponse, Object result) {
        this.httpResponse = httpResponse;
        this.result = result;
    }

    public final FullHttpResponse getHttpResponse() {
        return httpResponse;
    }

    public final void setHttpResponse(FullHttpResponse httpResponse) {
        this.httpResponse = httpResponse;
    }

    public final Object getResult() {
        return result;
    }

    public final void setResult(Object result) {
        this.result = result;
    }

    @Override
    public String toString() {
        return "HttpXmlResponse [httpResponse=" + httpResponse + ", result=" + result + "]";
    }
}

    7.應答消息編碼類

import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;

import java.util.List;

//應答消息編碼類
public class HttpXmlResponseEncoder extends
    AbstractHttpXmlEncoder<HttpXmlResponse> {

    protected void encode(ChannelHandlerContext ctx, HttpXmlResponse msg, List<Object> out) throws Exception {
    ByteBuf body = encode0(ctx, msg.getResult());
    FullHttpResponse response = msg.getHttpResponse();
    //構造新的http應答消息
    if (response == null) {
        response = new DefaultFullHttpResponse(HTTP_1_1, OK, body);
    } else {
        response = new DefaultFullHttpResponse(msg.getHttpResponse().getProtocolVersion(), msg.getHttpResponse().getStatus(), body);
    }
    response.headers().set(CONTENT_TYPE, "text/xml");//設置消息體內容格式
    setContentLength(response, body.readableBytes());
    out.add(response);//添加到解碼列表
    }
}

    8.應答消息解碼類

//應答消息解碼類
public class HttpXmlResponseDecoder extends
        AbstractHttpXmlDecoder<DefaultFullHttpResponse> {

    public HttpXmlResponseDecoder(Class<?> clazz) {
        this(clazz, false);
    }

    public HttpXmlResponseDecoder(Class<?> clazz, boolean isPrintlog) {
        super(clazz, isPrintlog);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, DefaultFullHttpResponse msg, List<Object> out) throws Exception {
        HttpXmlResponse resHttpXmlResponse = new HttpXmlResponse(msg, decode0(ctx, msg.content()));
        out.add(resHttpXmlResponse);
    }
}

     9.客戶端啓動類

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;

import java.net.InetSocketAddress;

import com.phei.netty.protocol.http.xml.codec.HttpXmlRequestEncoder;
import com.phei.netty.protocol.http.xml.codec.HttpXmlResponseDecoder;
import com.phei.netty.protocol.http.xml.pojo.Order;

//客戶端啓動類
public class HttpXmlClient {

    public void connect(int port) throws Exception {
        // 配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("http-decoder", new HttpResponseDecoder());//將二進制解碼成http應答消息
                            ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));//合併成完整的http消息
                            ch.pipeline().addLast("xml-decoder", new HttpXmlResponseDecoder(Order.class, true));
                            ch.pipeline().addLast("http-encoder", new HttpRequestEncoder());
                            ch.pipeline().addLast("xml-encoder", new HttpXmlRequestEncoder());
                            ch.pipeline().addLast("xmlClientHandler", new HttpXmlClientHandle());
                        }
                    });

            // 發起異步鏈接操做
            ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();

            // 當代客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new HttpXmlClient().connect(port);
    }
}

    10.客戶端業務邏輯類

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import com.phei.netty.protocol.http.xml.codec.HttpXmlRequest;
import com.phei.netty.protocol.http.xml.codec.HttpXmlResponse;
import com.phei.netty.protocol.http.xml.pojo.OrderFactory;

//客戶端業務邏輯類
public class HttpXmlClientHandle extends
        SimpleChannelInboundHandler<HttpXmlResponse> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        HttpXmlRequest request = new HttpXmlRequest(null, OrderFactory.create(123));
        ctx.writeAndFlush(request);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, HttpXmlResponse msg) throws Exception {
        System.out.println("The client receive response of http header is : " + msg.getHttpResponse().headers().names());
        System.out.println("The client receive response of http body is : " + msg.getResult());
    }
}

    11.訂購對象工廠

//訂購對象工廠
public class OrderFactory {
    public static Order create(long orderID) {
        Order order = new Order();
        order.setOrderNumber(orderID);
        order.setTotal(9999.999f);
        Address address = new Address();
        address.setCity("南京市");
        address.setCountry("中國");
        address.setPostCode("123321");
        address.setState("江蘇省");
        address.setStreet1("龍眠大道");
        order.setBillTo(address);
        Customer customer = new Customer();
        customer.setCustomerNumber(orderID);
        customer.setFirstName("李");
        customer.setLastName("林峯");
        order.setCustomer(customer);
        order.setShipping(Shipping.INTERNATIONAL_MAIL);
        order.setShipTo(address);
        return order;
    }
}

    12.服務端主程序

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

import java.net.InetSocketAddress;

import com.phei.netty.protocol.http.xml.codec.HttpXmlRequestDecoder;
import com.phei.netty.protocol.http.xml.codec.HttpXmlResponseEncoder;
import com.phei.netty.protocol.http.xml.pojo.Order;

//服務端主程序
public class HttpXmlServer {
    public void run(final int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
                            ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                            ch.pipeline().addLast("xml-decoder", new HttpXmlRequestDecoder(Order.class, true));
                            ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
                            ch.pipeline().addLast("xml-encoder", new HttpXmlResponseEncoder());
                            ch.pipeline().addLast("xmlServerHandler", new HttpXmlServerHandler());
                        }
                    });
            ChannelFuture future = b.bind(new InetSocketAddress(port)).sync();
            System.out.println("HTTP訂購服務器啓動,網址是 : " + "http://localhost:" + port);
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        new HttpXmlServer().run(port);
    }
}

    13.服務端處理類

import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.ArrayList;
import java.util.List;

import com.phei.netty.protocol.http.xml.codec.HttpXmlRequest;
import com.phei.netty.protocol.http.xml.codec.HttpXmlResponse;
import com.phei.netty.protocol.http.xml.pojo.Address;
import com.phei.netty.protocol.http.xml.pojo.Order;

//服務端處理類
public class HttpXmlServerHandler extends SimpleChannelInboundHandler<HttpXmlRequest> {

    @Override
    public void messageReceived(final ChannelHandlerContext ctx, HttpXmlRequest xmlRequest) throws Exception {
        HttpRequest request = xmlRequest.getRequest();
        Order order = (Order) xmlRequest.getBody();
        System.out.println("Http server receive request : " + order);
        dobusiness(order);//處理訂購消息
        //發送應答消息
        ChannelFuture future = ctx.writeAndFlush(new HttpXmlResponse(null, order));
        if (!isKeepAlive(request)) {
            future.addListener(new GenericFutureListener<Future<? super Void>>() {
                public void operationComplete(Future future) throws Exception {
                    ctx.close();
                }
            });
        }
    }

    private void dobusiness(Order order) {
        order.getCustomer().setFirstName("狄");
        order.getCustomer().setLastName("仁杰");
        List<String> midNames = new ArrayList<String>();
        midNames.add("李元芳");
        order.getCustomer().setMiddleNames(midNames);
        Address address = order.getBillTo();
        address.setCity("洛陽");
        address.setCountry("大唐");
        address.setState("河南道");
        address.setPostCode("123456");
        order.setBillTo(address);
        order.setShipTo(address);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        if (ctx.channel().isActive()) {
            sendError(ctx, INTERNAL_SERVER_ERROR);
        }
    }

    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("失敗: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}

 第11章   WebSocket協議開發

11.1HTTP協議的弊端

11.2WebSocket入門

  11.2.1WebSocket背景

  11.2.2WebSocket鏈接創建

  11.2.3WebSocket生命週期

  11.2.4WebSocket鏈接關閉

11.3WebSocket協議開發

  11.3.1WebSocket服務端功能介紹

  11.3.2WebSocket服務端開發

    1.服務端啓動類

 

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {
    public void run(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("http-codec", new HttpServerCodec());//將請求消息編解碼爲http消息
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//組合成完整的http消息
                            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//向客戶端發送和html5文件
                            pipeline.addLast("handler", new WebSocketServerHandler());
                        }
                    });

            Channel ch = b.bind(port).sync().channel();
            System.out.println("Web socket server started at port " + port + '.');
            System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        new WebSocketServer().run(port);
    }
}

 

    2.服務端處理類

import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

import java.util.logging.Level;
import java.util.logging.Logger;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());

    private WebSocketServerHandshaker handshaker;

    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg) {
        // 傳統的HTTP接入
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) { // WebSocket接入
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // 若是HTTP解碼失敗,返回HHTP異常
        if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // 構造握手響應返回,本機測試
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判斷是不是關閉鏈路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判斷是不是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程僅支持文本消息,不支持二進制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }

        // 返回應答消息
        String request = ((TextWebSocketFrame) frame).text();
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("%s received %s", ctx.channel(), request));
        }
        ctx.channel().write(new TextWebSocketFrame(request + " , 歡迎使用Netty WebSocket服務,如今時刻:" + new java.util.Date().toString()));
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // 返回應答給客戶端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            setContentLength(res, res.content().readableBytes());
        }

        // 若是是非Keep-Alive,關閉鏈接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

    3.頁面

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    Netty WebSocket 時間服務器
</head>
<br>
<body>
<br>
<script type="text/javascript">
    var socket;
    if (!window.WebSocket) {
        window.WebSocket = window.MozWebSocket;
    }
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:8080/websocket");
        socket.onmessage = function (event) {
            var ta = document.getElementById('responseText');
            ta.value = "";
            ta.value = event.data
        };
        socket.onopen = function (event) {
            var ta = document.getElementById('responseText');
            ta.value = "打開WebSocket服務正常,瀏覽器支持WebSocket!";
        };
        socket.onclose = function (event) {
            var ta = document.getElementById('responseText');
            ta.value = "";
            ta.value = "WebSocket 關閉!";
        };
    } else {
        alert("抱歉,您的瀏覽器不支持WebSocket協議!");
    }

    function send(message) {
        if (!window.WebSocket) {
            return;
        }
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("WebSocket鏈接沒有創建成功!");
        }
    }
</script>
<form onsubmit="return false;">
    <input type="text" name="message" value="Netty最佳實踐"/>
    <br><br>
    <input type="button" value="發送WebSocket請求消息" onclick="send(this.form.message.value)"/>
    <hr color="blue"/>
    <h3>服務端返回的應答消息</h3>
    <textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>

第12章  私有協議棧開發

12.1私有協議介紹

12.2Netty協議棧功能設計

  12.2.1網絡拓撲圖

 

  12.2.2協議棧功能描述

 

  12.2.3通訊模型

 

  12.2.4消息定義

 

  12.2.5協議支持的字段類型

 

  12.2.6編解碼規範

    1.編碼

 

    2.解碼

 

  12.2.7鏈路的創建

 

  12.2.8鏈路的關閉

 

  12.2.9可靠性設計

    1.心跳機制

 

    2.重連機制

 

    3.重複登陸保護

 

    4.消息緩存重發

 

  12.2.10安全性設計

 

  12.2.11可擴展性設計

 

12.3Netty協議棧開發

  12.3.1數據庫結構定義

    1.NettyMessage類定義

 

public final class NettyMessage {
    private Header header;//消息頭
    private Object body;//消息體

    public final Header getHeader() {
        return header;
    }

    public final void setHeader(Header header) {
        this.header = header;
    }

    public final Object getBody() {
        return body;
    }

    public final void setBody(Object body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return "NettyMessage [header=" + header + "]";
    }
}

 

    2.消息頭

 

import java.util.HashMap;
import java.util.Map;

public final class Header {
    private int crcCode = 0xabef0101;
    private int length;// 消息長度
    private long sessionID;// 會話ID
    private byte type;// 消息類型
    private byte priority;// 消息優先級
    private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件

    public final int getCrcCode() {
        return crcCode;
    }

    public final void setCrcCode(int crcCode) {
        this.crcCode = crcCode;
    }

    public final int getLength() {
        return length;
    }

    public final void setLength(int length) {
        this.length = length;
    }

    public final long getSessionID() {
        return sessionID;
    }

    public final void setSessionID(long sessionID) {
        this.sessionID = sessionID;
    }

    public final byte getType() {
        return type;
    }

    public final void setType(byte type) {
        this.type = type;
    }

    public final byte getPriority() {
        return priority;
    }

    public final void setPriority(byte priority) {
        this.priority = priority;
    }

    public final Map<String, Object> getAttachment() {
        return attachment;
    }

    public final void setAttachment(Map<String, Object> attachment) {
        this.attachment = attachment;
    }

    @Override
    public String toString() {
        return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionID=" + sessionID + ", type=" + type + ", priority=" + priority + ", attachment=" + attachment + "]";
    }
}

 

  12.3.2消息編解碼

    1.編碼器

 

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.io.IOException;
import java.util.Map;

import com.phei.netty.protocol.netty.struct.NettyMessage;

public final class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    MarshallingEncoder marshallingEncoder;

    public NettyMessageEncoder() throws IOException {
        this.marshallingEncoder = new MarshallingEncoder();
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
        if (msg == null || msg.getHeader() == null) {
            throw new Exception("The encode message is null");
        }
        sendBuf.writeInt((msg.getHeader().getCrcCode()));
        sendBuf.writeInt((msg.getHeader().getLength()));
        sendBuf.writeLong((msg.getHeader().getSessionID()));
        sendBuf.writeByte((msg.getHeader().getType()));
        sendBuf.writeByte((msg.getHeader().getPriority()));
        sendBuf.writeInt((msg.getHeader().getAttachment().size()));
        String key;
        byte[] keyArray;
        Object value;
        for (Map.Entry<String, Object> param : msg.getHeader().getAttachment().entrySet()) {
            key = param.getKey();
            keyArray = key.getBytes("UTF-8");
            sendBuf.writeInt(keyArray.length);
            sendBuf.writeBytes(keyArray);
            value = param.getValue();
            marshallingEncoder.encode(value, sendBuf);
        }
        if (msg.getBody() != null) {
            marshallingEncoder.encode(msg.getBody(), sendBuf);
        } else
            sendBuf.writeInt(0);
        sendBuf.setInt(4, sendBuf.readableBytes() - 8);
    }
}

 

    2.編碼消息工具類

 

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;

import java.io.IOException;

import org.jboss.marshalling.Marshaller;

@Sharable
public class MarshallingEncoder {

    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    Marshaller marshaller;

    public MarshallingEncoder() throws IOException {
        marshaller = MarshallingCodecFactory.buildMarshalling();
    }

    protected void encode(Object msg, ByteBuf out) throws Exception {
        try {
            int lengthPos = out.writerIndex();
            out.writeBytes(LENGTH_PLACEHOLDER);
            ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
            marshaller.start(output);
            marshaller.writeObject(msg);
            marshaller.finish();
            out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
        } finally {
            marshaller.close();
        }
    }
}

    3.解碼器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.phei.netty.protocol.netty.struct.Header;
import com.phei.netty.protocol.netty.struct.NettyMessage;

public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

    MarshallingDecoder marshallingDecoder;

    public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        marshallingDecoder = new MarshallingDecoder();
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;//是半包消息
        }

        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setCrcCode(frame.readInt());
        header.setLength(frame.readInt());
        header.setSessionID(frame.readLong());
        header.setType(frame.readByte());
        header.setPriority(frame.readByte());

        int size = frame.readInt();
        if (size > 0) {
            Map<String, Object> attch = new HashMap<>(size);
            int keySize = 0;
            byte[] keyArray;
            String key;
            for (int i = 0; i < size; i++) {
                keySize = frame.readInt();
                keyArray = new byte[keySize];
                frame.readBytes(keyArray);
                key = new String(keyArray, "UTF-8");
                attch.put(key, marshallingDecoder.decode(frame));
            }
            header.setAttachment(attch);
        }
        if (frame.readableBytes() > 4) {
            message.setBody(marshallingDecoder.decode(frame));
        }
        message.setHeader(header);
        return message;
    }
}

    4.消息解碼工具類

 

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;

public class MarshallingDecoder {

    private final Unmarshaller unmarshaller;

    public MarshallingDecoder() throws IOException {
        unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
    }

    protected Object decode(ByteBuf in) throws Exception {
        int objectSize = in.readInt();
        ByteBuf buf = in.slice(in.readerIndex(), objectSize);
        ByteInput input = new ChannelBufferByteInput(buf);
        try {
            unmarshaller.start(input);
            Object obj = unmarshaller.readObject();
            unmarshaller.finish();
            in.readerIndex(in.readerIndex() + objectSize);
            return obj;
        } finally {
            unmarshaller.close();
        }
    }
}

  12.3.3握手和安全驗證

    1.消息枚舉

 

public enum MessageType {

    SERVICE_REQ((byte) 0), 
    SERVICE_RESP((byte) 1), 
    ONE_WAY((byte) 2), 
    LOGIN_REQ((byte) 3), 
    LOGIN_RESP((byte) 4), 
    HEARTBEAT_REQ((byte) 5), 
    HEARTBEAT_RESP((byte) 6);

    private byte value;

    private MessageType(byte value) {
        this.value = value;
    }

    public byte value() {
        return this.value;
    }
}

 

    2.握手認證客戶端

 

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import com.phei.netty.protocol.netty.MessageType;
import com.phei.netty.protocol.netty.struct.Header;
import com.phei.netty.protocol.netty.struct.NettyMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LoginAuthReqHandler extends ChannelHandlerAdapter {

    private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(buildLoginReq());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        NettyMessage message = (NettyMessage) msg;

        // 若是是握手應答消息,須要判斷是否定證成功
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
            byte loginResult = (byte) message.getBody();
            if (loginResult != (byte) 0) {
                // 握手失敗,關閉鏈接
                ctx.close();
            } else {
                LOG.info("Login is ok : " + message);
                ctx.fireChannelRead(msg);
            }
        } else
            ctx.fireChannelRead(msg);
    }

    //構造握手請求消息
    private NettyMessage buildLoginReq() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_REQ.value());
        message.setHeader(header);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.fireExceptionCaught(cause);
    }
}

 

    3.握手認證服務端

 

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.phei.netty.protocol.netty.MessageType;
import com.phei.netty.protocol.netty.struct.Header;
import com.phei.netty.protocol.netty.struct.NettyMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LoginAuthRespHandler extends ChannelHandlerAdapter {

    private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class);

    private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();//重複登陸保護
    private String[] whitekList = {"127.0.0.1", "192.168.1.104"};//ip認證白名單列表

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        NettyMessage message = (NettyMessage) msg;

        // 若是是握手請求消息,處理,其它消息透傳
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_REQ.value()) {
            String nodeIndex = ctx.channel().remoteAddress().toString();
            NettyMessage loginResp;
            // 重複登錄,拒絕
            if (nodeCheck.containsKey(nodeIndex)) {
                loginResp = buildResponse((byte) -1);
            } else {
                //白名單驗證
                InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = address.getAddress().getHostAddress();
                boolean isOK = false;
                for (String WIP : whitekList) {
                    if (WIP.equals(ip)) {
                        isOK = true;
                        break;
                    }
                }
                loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1);
                if (isOK) {
                    nodeCheck.put(nodeIndex, true);
                }
            }
            LOG.info("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]");
            ctx.writeAndFlush(loginResp);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private NettyMessage buildResponse(byte result) {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_RESP.value());
        message.setHeader(header);
        message.setBody(result);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        nodeCheck.remove(ctx.channel().remoteAddress().toString());// 刪除緩存
        ctx.close();
        ctx.fireExceptionCaught(cause);
    }
}

  12.3.4心跳檢測機制

    1.客戶端

 

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.phei.netty.protocol.netty.MessageType;
import com.phei.netty.protocol.netty.struct.Header;
import com.phei.netty.protocol.netty.struct.NettyMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class HeartBeatReqHandler extends ChannelHandlerAdapter {

    private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class);

    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        NettyMessage message = (NettyMessage) msg;
        // 握手成功,主動發送心跳消息
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
            heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
        } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
            LOG.info("Client receive server heart beat message : ---> " + message);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;

        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            NettyMessage heatBeat = buildHeatBeat();
            LOG.info("Client send heart beat messsage to server : ---> " + heatBeat);
            ctx.writeAndFlush(heatBeat);
        }

        private NettyMessage buildHeatBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_REQ.value());
            message.setHeader(header);
            return message;
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}

 

    2.服務端

 

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import com.phei.netty.protocol.netty.MessageType;
import com.phei.netty.protocol.netty.struct.Header;
import com.phei.netty.protocol.netty.struct.NettyMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class HeartBeatRespHandler extends ChannelHandlerAdapter {

    private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        NettyMessage message = (NettyMessage) msg;
        // 返回心跳應答消息
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {
            LOG.info("Receive client heart beat message : ---> " + message);
            NettyMessage heartBeat = buildHeatBeat();
            LOG.info("Send heart beat response message to client : ---> " + heartBeat);
            ctx.writeAndFlush(heartBeat);
        } else
            ctx.fireChannelRead(msg);
    }

    private NettyMessage buildHeatBeat() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.HEARTBEAT_RESP.value());
        message.setHeader(header);
        return message;
    }
}

 

  12.3.5斷連和重連

 

            // 全部資源釋放完成以後,清空資源,再次發起重連操做
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        try {
                            connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 發起重連操做
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

 

  12.3.6客戶端

 

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.phei.netty.protocol.netty.NettyConstant;
import com.phei.netty.protocol.netty.codec.NettyMessageDecoder;
import com.phei.netty.protocol.netty.codec.NettyMessageEncoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class NettyClient {

    private static final Log LOG = LogFactory.getLog(NettyClient.class);

    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    EventLoopGroup group = new NioEventLoopGroup();

    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO線程組
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));//爲了防止因爲單條消息過大致使的內存溢出或者畸形碼流致使解碼錯位引發內存分配失敗,加上了消息長度限制
                            ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder());//消息自動編碼
                            ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));//讀超時
                            ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler());//握手請求
                            ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());//心跳消息
                        }
                    });
            // 發起異步鏈接操做,服務端重複登陸保護
            ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync();
            // 當對應的channel關閉的時候,就會返回對應的channel。
            // Returns the ChannelFuture which will be notified when this channel is closed. This method always returns the same future instance.
            future.channel().closeFuture().sync();
        } finally {
            // 全部資源釋放完成以後,清空資源,再次發起重連操做
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        try {
                            connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 發起重連操做
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
    }
}

 

  12.3.7服務端

 

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.io.IOException;

import com.phei.netty.protocol.netty.NettyConstant;
import com.phei.netty.protocol.netty.codec.NettyMessageDecoder;
import com.phei.netty.protocol.netty.codec.NettyMessageEncoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class NettyServer {

    private static final Log LOG = LogFactory.getLog(NettyServer.class);

    public void bind() throws Exception {
        // 配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws IOException {
                        ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
                        ch.pipeline().addLast(new NettyMessageEncoder());
                        ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
                        ch.pipeline().addLast(new LoginAuthRespHandler());
                        ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());
                    }
                });

        // 綁定端口,同步等待成功
        b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
        LOG.info("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
    }

    public static void main(String[] args) throws Exception {
        new NettyServer().bind();
    }
}

 

12.4運行協議棧

  12.4.1正常場景

  12.4.2服務器宕機重啓

 

  12.4.3客戶端宕機重啓

第13章  服務端建立

13.1原生NIO類庫的複雜性

13.2Netty服務端建立源碼分析

  13.2.1Netty服務端建立時序圖

 

  13.2.2Netty服務端建立源碼分析

  13.2.3客戶端接入源碼分析

第14章  客戶端建立

14.1Netty客戶端建立流程分析

  14.1.1時序圖

 

  14.1.2流程分析

14.2源碼分析

  14.2.1鏈接輔助類Bootstrap

  14.2.2鏈接操做

  14.2.3結果通知

  14.2.4鏈接超時機制

第15章  ByteBuf和相關輔助類

15.1ByteBuf功能說明

  15.1.1ByteBuf功能原理

  15.1.2ByteBuf的功能介紹

    1.順序讀操做

    2.順序寫操做

    3.readIndex和writeIndex

 

    4.Discardable bytes

    5.Readable bytes(可讀空間)和Writable bytes(可寫空間)

    6.Clear操做

    7.Mask和Rest

    8.查找操做

      indexOf、bytesBefore、forEachByte、forEachDesc

    9.Derived buffers

    10.轉換成標準的ByteBuffer

    11.隨機讀寫

15.2ByteBuf源碼分析

  15.2.1ByteBuf主要繼承關係

 

  15.2.2AbstractByteBuf源碼分析

    1.主要成員變量

    2.讀操做簇

    3.寫操做簇

    4.操做索引

    5.重用緩衝區

    6.skipBytes

  15.2.3AbstractReferenceCountedByteBuf源碼分析

    1.主要成員變量

    2.對象計數器

  15.2.4UnpooledHeapByteBuf源碼分析

    1.成員變量

    2.動態擴展緩衝區

    3.字節數組複製

    4.轉換成JDK ByteBuffer

    5.子類實現相關的方法

  15.2.5PooledByteBuf內存池原理分析

    1.PoolArena

 

    2.PoolChunk

    3.PoolSubpage

    4.內存回收策略

  15.2.6PooledDirectByteBuf源碼分析

    1.建立字節緩衝區實例

    2.複製新的字節緩衝區實例

 

 

 

    3.子類實現相關的方法

15.3ByteBuf相關的輔助類介紹

  15.3.1ByteBufHolder

  15.3.2ByteBufAllocator字節緩衝區分配器

  15.3.3CompositeByteBuf

  15.3.4ByteBufUtil

第16章  Channel和Unsafe

16.1Channel功能說明

  16.1.1Channel的工做原理

  16.1.2Channel的功能介紹

    1.網絡I/O操做

    2.其餘經常使用的API功能說明

16.2Channel源碼分析

  16.2.1Channel的主要繼承關係圖

  16.2.2AbstractChannel源碼分析

    1.成員變量定義

    2.核心API源碼分析

  16.2.3AbstractNioChannel源碼分析

    1.成員變量定義

 

    2.核心API源碼分析

  16.2.4AbstractNioByteChannel源碼分析

  16.2.5AbstractNioMessageChannel源碼分析

  16.2.6AbstractNioMessageServerChannel源碼分析

  16.2.7NioServerSocketChannel源碼分析

  16.2.8NioSocketChannel源碼分析

    1.鏈接操做

 

    2.寫半包

    3.讀寫操做

16.3Unsafe功能說明

16.4Unsafe源碼分析

  16.4.1Unsafe繼承關係圖

  16.4.2AbstractUnsafe源碼分析

    1.register將channal註冊到EventLoop的多路複用器

    2.bind方法用於綁定指定端口

    3.disconnect

    4.close方法

    5.write方法

 

 

    6.flush方法

 

  16.4.3AbstractNioUnsafe源碼分析

    1.connect

    2.finishConnect

  16.4.4NioByteUnsafe源碼分析

第17章  ChannelPipeline和ChannelHandler

17.1ChannelPipeline的功能說明

  17.1.1ChannelPipeline的事件處理

 

  17.1.2自定義攔截器

  17.1.3構建pipeline

  17.1.4ChannelPipeline的主要特性

17.2ChannelPipeline源碼分析

  17.2.1ChannelPipeline的類繼承關係圖

  17.2.2ChannelPipeline對ChannelHandler的管理

  17.2.3ChannelPipeline的inbound事件

  17.2.4ChannelPipeline的outbound事件

17.3ChannelHandler功能說明

  17.3.1ChannelHandlerAdapter功能說明

  17.3.2ByteToMessageDecoder功能說明

  17.3.3MessageToMessageDecoder功能說明

  17.3.4LengthFieldBasedFrameDecoder功能說明

  17.3.5MessageToByteEncoder功能說明

  17.3.6MessageToMessageEncoder功能說明

  17.3.7LengthFieldPrepender功能說明

17.4ChannelHandler源碼分析

  17.4.1ChannelHandler的類繼承關係

 

  17.4.2ByteToMessageDecoder源碼分析

  17.4.3MessageToMessageDecoder源碼分析

  17.4.4LengthFieldBasedFrameDecoder源碼分析

  17.4.5MessageToByteEncoder源碼分析

  17.4.6MessageToMessageEncoder源碼分析

  17.4.7LengthFieldPrepender源碼分析

第18 章  EventLoop和EventLoopGroup

18.1Netty的線程模型

  18.1.1Reactor單線程模型

  18.1.2Reactor多線程模型

  18.1.3主從Reactor多線程模型

  18.1.4Netty的線程模型

  18.1.5最佳實踐

18.2EventLoop源碼分析

  18.2.1EventLoop設計原理

  18.2.2繼承關係圖

  18.2.3EventLoop

第19章  Future和Promise

19.1Future的功能

  1.ChannelFuture功能介紹

19.2ChannelFuture源碼分析

  1.AbstractFuture

19.3Promise功能介紹

19.4Promise源碼分析

  19.4.1Promise繼承關係圖

  19.4.2DefaultPromise

第20章  Netty架構剖析

20.1Netty邏輯架構

  20.1.1Reactor通訊層調度

  20.1.2職責鏈ChannelPipeline

  20.1.3業務邏輯編排

20.2關鍵架構質量屬性

  20.2.1高性能

  20.2.2可靠性

    1.鏈路有效性檢測

 

    2.內存保護機制

    3.優雅停機

 

  20.2.3可定製型

  20.2.4可擴展性

 

第21章  Java多線程在Netty中的應用

21.1Java內存模型與多線程編程

  21.1.1硬件的發展和多任務處理

  21.1.2Java內存模型

    1.工做內存和主內存

    2.Java內存交互協議

    3.Java多線程

21.2Netty的併發編程實踐

  21.2.1對共享的可變數據進行正確的同步

  21.2.2正確使用鎖

  21.2.3volatile的正確使用

  21.2.4CAS指令和原子類

  21.2.5線程安全類的應用

  21.2.6讀寫鎖的應用

  21.2.7線程安全性文檔說明

  21.2.8不用依賴線程優先級

第22章  高性能之道

22.1RPC調用性能模型分析

  22.1.1傳統RPC調用性能差的緣由

    1.網絡傳輸方式問題

    2.序列化性能差

    3.線程模型問題

  22.1.2I/O通訊性能原則

22.2Netty高性能之道

  22.2.1異步非阻塞通訊

  22.2.2高效的Reactor線程模型

  22.2.3無鎖化差串行設計

  22.2.4高效的併發編程

  22.2.5高性能的序列化框架

  22.2.6零拷貝

  22.2.7內存池

  22.2.8靈活的TCP參數配置

22.3主流NIO框架性能對比

第23章  可靠性

23.1可靠性需求

  23.1.1宕機的代價

  23.1.2Netty可靠性需求

23.2Netty可靠性需求設計

  23.2.1網絡通訊類故障

    1.客戶端鏈接超時

    2.通訊對端強制關閉鏈接

    3.鏈路關閉

    4.定製I/O故障

  23.2.2鏈路的有效性檢測

  23.2.3Reactor線程的保護

    1.異常處理要謹慎

    2.規避NIO BUG

  23.2.4內存保護

    1.緩衝區的內存泄漏保護

    2.緩衝區溢出保護

  23.2.5流量整形

 

    1.全局流量整形

    2.鏈路級流量整形

  23.2.6優雅停機接口

23.3優化建議

  23.3.1發送隊列容量上限控制

  23.3.2回推發送失敗的消息

第24章  安全性

24.1嚴峻的安全形勢

  24.1.1OpenSSL Heart bleed漏洞

  24.1.2安全漏洞的代價

  24.1.3Netty面臨的安全風險

24.2Netty SSL安全特性

  24.2.1SSL單向認證

  24.2.2SSL雙向認證

  24.2.3第三方CA認證

24.3Netty SSL源碼分析

  24.3.1客戶端

  24.3.2服務端

  24.3.3消息讀取

  24.3.4消息發送

24.4Netty擴展的安全性

  24.4.1IP地址黑名單機制

  24.4.2接入認證

第25章  Netty將來展望

25.1應用範圍

25.2技術演進

25.3社區活躍度

25.4Road Map

相關文章
相關標籤/搜索