Netty 框架學習 —— 傳輸


概述

流經網絡的數據老是具備相同的類型:字節,這些字節如何傳輸主要取決於咱們所說的網絡傳輸。用戶並不關心傳輸的細節,只在意字節是否被可靠地發送和接收java

若是使用 Java 網絡編程,你會發現,某些時候當你須要支持高併發鏈接,隨後你嘗試將阻塞傳輸切換爲非阻塞傳輸,那麼你會由於這兩種 API 的大相徑庭而遇到問題。Netty 提供了一個通用的 API,這使得轉換更加簡單。編程


傳統的傳輸方式

這裏介紹僅使用 JDK API 來實現應用程序的阻塞(OIO)和非阻塞版本(NIO)服務器

阻塞網絡編程以下:網絡

public class PlainOioServer {

    public void server(int port) throws IOException {
        // 將服務器綁定到指定端口
        final ServerSocket socket = new ServerSocket(port);
        try {
            while (true) {
                // 接收鏈接
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from " + clientSocket);
                // 建立一個新的線程來處理鏈接
                new Thread(() -> {
                    OutputStream out;
                    try {
                        out = clientSocket.getOutputStream();
                        // 將消息寫給已鏈接的客戶端
                        out.write("Hi\r\n".getBytes(StandardCharsets.UTF_8));
                        out.flush();
                        // 關閉鏈接x
                        clientSocket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

這段代碼能夠處理中等數量的併發客戶端,但隨着併發鏈接的增多,你決定改用異步網絡編程,但異步的 API 是徹底不一樣的併發

非阻塞版本以下:異步

public class PlainNioServer {

    public void server(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ssocket = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        // 將服務器綁定到選定的端口
        ssocket.bind(address);
        // 打開 Selector 來處理 Channel
        Selector selector = Selector.open();
        // 將 ServerSocket 註冊到 Selector 以接受鏈接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi\r\n".getBytes());
        while (true) {
            try {
                // 等待須要處理的新事件,阻塞將一直持續到下一個傳入事件
                selector.select();
            } catch (IOException e) {
                e.printStackTrace();
                break;
            }
            Set<SelectionKey> readKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    // 檢查事件是不是一個新的已經就緒能夠被接受的鏈接
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        // 接受客戶端,並將它註冊到選擇器
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                        System.out.println("Accepted connection from " + client);
                    }
                    // 檢查套接字是否已經準備好寫數據
                    if (key.isWritable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            // 將數據寫到已鏈接的客戶端
                            if (client.write(buffer) == 0) {
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException exception) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        cex.printStackTrace();
                    }
                }
            }
        }
    }
}

能夠看到,阻塞和非阻塞的代碼是大相徑庭的。若是爲了實現非阻塞而徹底重寫程序,無疑十分困難socket


基於 Netty 的傳輸

使用 Netty 的阻塞網絡處理以下:ide

public class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi\n\r", StandardCharsets.UTF_8));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    // 使用阻塞模式
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new SimpleChannelInboundHandler<>() {
                                        @Override
                                        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())
                                                    .addListener(ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

而非阻塞版本和阻塞版本幾乎如出一轍,只須要改動兩處地方高併發

EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class);

傳輸 API

傳輸 API 的核心是 interface Channel,它被用於全部的 IO 操做。每一個 Channel 都將被分配一個 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了該 Channel 的全部配置設置,ChannelPipeline 持有全部將應用於入站和出站數據以及事件的 ChannelHandler 實例oop

除了訪問所分配的 ChannelPipeline 和 ChannelConfig 以外,也能夠利用 Channel 的其餘方法

方法名 描述
eventLoop 返回分配給 Channel 的 EventLoop
pipeline 返回分配給 Channel 的 ChannelPipeline
isActive 若是 Channel 活動的,返回 true
localAddress 返回本地的 SocketAddress
remoteAddress 返回遠程的 SocketAddress
write 將數據寫到遠程節點
flush 將以前已寫的數據沖刷到底層傳輸
writeAndFlush 等同於調用 write() 並接着調用 flush()

內置的傳輸

Netty 內置了一些可開箱即用的傳輸,但它們所支持的協議不盡相同,所以你必須選擇一個和你的應用程序所使用協議相容的傳輸

名稱 描述
NIO io.netty.channel.socket.nio 使用 java.nio.channels 包做爲基礎
Epoll io.netty.channel.epoll 由 JNI 驅動的 epoll() 和非阻塞 IO,可支持只有在 Linux 上可用的多種特性,比 NIO 傳輸更快,且徹底非阻塞
OIO io.netty.channel.socket.oio 使用 java.net 包做爲基礎
Local io.netty.channel.local 能夠在 VM 內部經過管道進行通訊的本地傳輸
Embedded io.netty.channel.embedded Embedded 傳輸,容許使用 ChannelHandler 而不須要一個真正的基於網絡的傳輸,主要用於測試
相關文章
相關標籤/搜索