Spring Boot 搭建TCP Server

本示例首選介紹Java原生API實現BIO通訊,而後進階實現NIO通訊,最後利用Netty實現NIO通訊及Netty主要模塊組件介紹。java

Netty 是一個異步事件驅動的網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。git

BIO(Blocking I/O) 方案

BIO通訊(一請求一應答)模型圖以下github

採用 BIO 通訊模型 的服務端,一般由一個獨立的 Acceptor 線程負責監聽客戶端的鏈接。咱們通常經過在while(true) 循環中服務端會調用 accept() 方法等待接收客戶端的鏈接的方式監聽請求,一旦接收到一個鏈接請求,就能夠在這個通訊套接字上進行讀寫操做,此時不能再接收其餘客戶端鏈接請求,只能等待當前鏈接的客戶端的操做執行完成, 若是要讓 BIO 通訊模型 可以同時處理多個客戶端請求,就必須使用多線程(主要緣由是socket.accept()、socket.read()、socket.write() 涉及的三個主要函數都是同步阻塞的)spring

代碼實現

BIO服務端

BIOServer.javaapache

package com.easy.javaBio;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;

@Slf4j
public class BIOServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(10002);
        while (true) {
            Socket client = server.accept(); //等待客戶端的鏈接,若是沒有獲取鏈接  ,在此步一直等待
            new Thread(new ServerThread(client)).start(); //爲每一個客戶端鏈接開啓一個線程
        }
        //server.close();
    }
}

@Slf4j
class ServerThread extends Thread {

    private Socket client;

    public ServerThread(Socket client) {
        this.client = client;
    }

    @SneakyThrows
    @Override
    public void run() {
        log.info("客戶端:" + client.getInetAddress().getLocalHost() + "已鏈接到服務器");
        BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
        //讀取客戶端發送來的消息
        String mess = br.readLine();
        log.info("客戶端:" + mess);
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
        bw.write(mess + "\n");
        bw.flush();
    }
}

BIO客戶端

BIOClient.java編程

package com.easy.javaBio;

import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.net.Socket;

@Slf4j
public class BIOClient {

    public static void main(String[] args) throws IOException {
        Socket s = new Socket("0.0.0.0", 10002);

        InputStream input = s.getInputStream();
        OutputStream output = s.getOutputStream();

        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(output));
        bw.write("客戶端給服務端發消息測試\n");  //向服務器端發送一條消息
        bw.flush();

        BufferedReader br = new BufferedReader(new InputStreamReader(input));  //讀取服務器返回的消息
        String mess = br.readLine();
        log.info("服務器:" + mess);
    }
}

運行示例

運行BIO服務端,而後再運行BIO客戶端,觀察控制檯bootstrap

BIOServer控制檯輸出:segmentfault

Connected to the target VM, address: '127.0.0.1:64346', transport: 'socket'
17:29:52.519 [Thread-1] INFO com.easy.javaBio.ServerThread - 客戶端:YHE6OR5UXQJ6D35/192.168.9.110已鏈接到服務器
17:29:52.523 [Thread-1] INFO com.easy.javaBio.ServerThread - 客戶端:客戶端給服務端發消息測試

BIOClient控制檯輸出:瀏覽器

Connected to the target VM, address: '127.0.0.1:64355', transport: 'socket'
17:29:52.527 [main] INFO com.easy.javaBio.BIOClient - 服務器:客戶端給服務端發消息測試
Disconnected from the target VM, address: '127.0.0.1:64355', transport: 'socket'

這表示咱們實現了一個最簡單的BIO通訊了緩存

這種方式爲每一個客戶端開啓一個線程,高併發時消耗資源較多,容易浪費,甚至致使服務端崩潰,對性能形成負面影響,高併發下不推薦使用。

NIO(New I/O)方案

NIO通訊模型圖以下

NIO是一種同步非阻塞的I/O模型,在Java 1.4 中引入了 NIO 框架,對應 java.nio 包,提供了 Channel , Selector,Buffer等抽象。

NIO中的N能夠理解爲Non-blocking,不單純是New。它支持面向緩衝的,基於通道的I/O操做方法。 NIO提供了與傳統BIO模型中的 Socket 和 ServerSocket 相對應的 SocketChannel 和 ServerSocketChannel 兩種不一樣的套接字通道實現,兩種通道都支持阻塞和非阻塞兩種模式。阻塞模式使用就像傳統中的支持同樣,比較簡單,可是性能和可靠性都很差;非阻塞模式正好與之相反。對於低負載、低併發的應用程序,可使用同步阻塞I/O來提高開發速率和更好的維護性;對於高負載、高併發的(網絡)應用,應使用 NIO 的非阻塞模式來開發。

NIO服務端

NIOServer.java

package com.easy.javaBio;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;

@Slf4j
public class NIOServer {
    private InetAddress addr;
    private int port;
    private Selector selector;

    private static int BUFF_SIZE = 1024;

    public NIOServer(InetAddress addr, int port) throws IOException {
        this.addr = addr;
        this.port = port;
        startServer();
    }

    private void startServer() throws IOException {
        // 得到selector及通道(socketChannel)
        this.selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // 綁定地址及端口
        InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
        serverChannel.socket().bind(listenAddr);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        log.info("NIOServer運行中...按下Ctrl-C中止服務");

        while (true) {
            log.info("服務器等待新的鏈接和selector選擇…");
            this.selector.select();

            // 選擇key工做
            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();

                // 防止出現重複的key,處理完需及時移除
                keys.remove();

                //無效直接跳過
                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    this.accept(key);
                } else if (key.isReadable()) {
                    this.read(key);
                } else if (key.isWritable()) {
                    this.write(key);
                } else if (key.isConnectable()) {
                    this.connect(key);
                }
            }
        }
    }

    private void connect(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        if (channel.finishConnect()) {
            // 成功
            log.info("成功鏈接了");
        } else {
            // 失敗
            log.info("失敗鏈接");
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);
        channel.register(this.selector, SelectionKey.OP_READ);

        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();
        log.info("鏈接到: " + remoteAddr);
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(BUFF_SIZE);
        int numRead = channel.read(buffer);
        if (numRead == -1) {
            log.info("關閉客戶端鏈接: " + channel.socket().getRemoteSocketAddress());
            channel.close();
            return;
        }
        String msg = new String(buffer.array()).trim();
        log.info("獲得了: " + msg);

        // 回覆客戶端
        String reMsg = msg + " 你好,這是BIOServer給你的回覆消息:" + System.currentTimeMillis();
        channel.write(ByteBuffer.wrap(reMsg.getBytes()));
    }

    private void write(SelectionKey key) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUFF_SIZE);
        byteBuffer.flip();
        SocketChannel clientChannel = (SocketChannel) key.channel();
        while (byteBuffer.hasRemaining()) {
            clientChannel.write(byteBuffer);
        }
        byteBuffer.compact();
    }

    public static void main(String[] args) throws IOException {
        new NIOServer(null, 10002);
    }
}

使用NIO, 能夠用Selector最終決定哪一組註冊的socket準備執行I/O

NIO客戶端

NIOClient.java

package com.easy.javaBio;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;

@Slf4j
public class NIOClient {
    private static int BUFF_SIZE = 1024;

    public static void main(String[] args) throws IOException, InterruptedException {

        InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", 10002);
        SocketChannel socketChannel = SocketChannel.open(socketAddress);

        log.info("鏈接 BIOServer 服務,端口:10002...");

        ArrayList<String> companyDetails = new ArrayList<>();

        // 建立消息列表
        companyDetails.add("騰訊");
        companyDetails.add("阿里巴巴");
        companyDetails.add("京東");
        companyDetails.add("百度");
        companyDetails.add("google");

        for (String companyName : companyDetails) {
            socketChannel.write(ByteBuffer.wrap(companyName.getBytes()));
            log.info("發送: " + companyName);

            ByteBuffer buffer = ByteBuffer.allocate(BUFF_SIZE);
            buffer.clear();
            socketChannel.read(buffer);
            String result = new String(buffer.array()).trim();
            log.info("收到NIOServer回覆的消息:" + result);

            // 等待2秒鐘再發送下一條消息
            Thread.sleep(2000);
        }

        socketChannel.close();
    }
}

運行示例

首先運行咱們的NIOServer,而後再運行NIOClient,觀察控制檯輸出

NIOServer控制檯輸出

17:35:40.921 [main] INFO com.easy.javaBio.NIOServer - NIOServer運行中...按下Ctrl-C中止服務
17:35:40.924 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…
17:36:29.188 [main] INFO com.easy.javaBio.NIOServer - 鏈接到: /192.168.9.110:64443
17:36:29.188 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…
17:36:29.194 [main] INFO com.easy.javaBio.NIOServer - 獲得了: 騰訊
17:36:29.194 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…
17:36:31.194 [main] INFO com.easy.javaBio.NIOServer - 獲得了: 阿里巴巴
17:36:31.195 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…
17:36:33.195 [main] INFO com.easy.javaBio.NIOServer - 獲得了: 京東
17:36:33.195 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…
17:36:35.196 [main] INFO com.easy.javaBio.NIOServer - 獲得了: 百度
17:36:35.197 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…
17:36:37.197 [main] INFO com.easy.javaBio.NIOServer - 獲得了: google
17:36:37.198 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…
17:36:39.198 [main] INFO com.easy.javaBio.NIOServer - 關閉客戶端鏈接: /192.168.9.110:64443
17:36:39.198 [main] INFO com.easy.javaBio.NIOServer - 服務器等待新的鏈接和selector選擇…

NIOClient控制檯輸出

17:36:29.189 [main] INFO com.easy.javaBio.NIOClient - 鏈接 BIOServer 服務,端口:10002...
17:36:29.194 [main] INFO com.easy.javaBio.NIOClient - 發送: 騰訊
17:36:29.194 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回覆的消息:騰訊 你好,這是BIOServer給你的回覆消息:1576229789194
17:36:31.194 [main] INFO com.easy.javaBio.NIOClient - 發送: 阿里巴巴
17:36:31.195 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回覆的消息:阿里巴巴 你好,這是BIOServer給你的回覆消息:1576229791194
17:36:33.195 [main] INFO com.easy.javaBio.NIOClient - 發送: 京東
17:36:33.196 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回覆的消息:京東 你好,這是BIOServer給你的回覆消息:1576229793195
17:36:35.196 [main] INFO com.easy.javaBio.NIOClient - 發送: 百度
17:36:35.197 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回覆的消息:百度 你好,這是BIOServer給你的回覆消息:1576229795197
17:36:37.197 [main] INFO com.easy.javaBio.NIOClient - 發送: google
17:36:37.198 [main] INFO com.easy.javaBio.NIOClient - 收到NIOServer回覆的消息:google 你好,這是BIOServer給你的回覆消息:1576229797198

NIO服務端每隔兩秒會收到客戶端的請求,並對客戶端的消息作出回覆。

直接使用Java NIO API構建應用程序是能夠的,但要作到正確和安全並不容易。特別是在高負載下,可靠和高效地處理和調度I/O操做是一項繁瑣並且容易出錯的任務。能夠選中Netty, Apache Mina等高性能網絡編程框架。

Netty 構建 NIO 通訊服務 方案

使用JDK原生網絡應用程序API,會存在的問題

  • NIO的類庫和API繁雜,使用麻煩,你須要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
  • 須要具有其它的額外技能作鋪墊,例如熟悉Java多線程編程,由於NIO編程涉及到Reactor模式,你必須對多線程和網路編程很是熟悉,才能編寫出高質量的NIO程序
  • 可靠性能力補齊,開發工做量和難度都很是大。例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處理等等,NIO編程的特色是功能開發相對容易,可是可靠性能力補齊工做量和難度都很是大

Netty對JDK自帶的NIO的API進行封裝,解決上述問題,主要特色有

  • 高併發

Netty是一款基於NIO(Nonblocking I/O,非阻塞IO)開發的網絡通訊框架,對比於BIO(Blocking I/O,阻塞IO),他的併發性能獲得了很大提升 。

  • 傳輸快

Netty的傳輸快其實也是依賴了NIO的一個特性——零拷貝。

  • 封裝好

Netty封裝了NIO操做的不少細節,提供易於使用的API。

Netty框架的優點

  • API使用簡單,開發門檻低;
  • 功能強大,預置了多種編解碼功能,支持多種主流協議;
  • 定製能力強,能夠經過ChannelHandler對通訊框架進行靈活地擴展;
  • 性能高,經過與其餘業界主流的NIO框架對比,Netty的綜合性能最優;
  • 成熟、穩定,Netty修復了已經發現的全部JDK NIO BUG,業務開發人員不須要再爲NIO的BUG而煩惱;
  • 社區活躍,版本迭代週期短,發現的BUG能夠被及時修復,同時,更多的新功能會加入;
  • 經歷了大規模的商業應用考驗,質量獲得驗證。在互聯網、大數據、網絡遊戲、企業應用、電信軟件等衆多行業獲得成功商用,證實了它已經徹底可以知足不一樣行業的商業應用了。

代碼實現

pom.xml依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.easy</groupId>
    <artifactId>netty</artifactId>
    <version>0.0.1</version>
    <name>netty</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.43.Final</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <modules>
        <module>java-tcp</module>
        <module>netty-server</module>
        <module>netty-client</module>
    </modules>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

搭建 Netty 服務端

NettyServer.java

package com.easy.nettyServer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;

@Component
@Slf4j
public class NettyServer {
    /**
     * boss 線程組用於處理鏈接工做
     */
    private EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * work 線程組用於數據處理
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    @Value("${netty.port}")
    private Integer port;

    /**
     * 啓動Netty Server
     *
     * @throws InterruptedException
     */
    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, work)
                // 指定Channel
                .channel(NioServerSocketChannel.class)
                //使用指定的端口設置套接字地址
                .localAddress(new InetSocketAddress(port))

                //服務端可鏈接隊列數,對應TCP/IP協議listen函數中backlog參數
                .option(ChannelOption.SO_BACKLOG, 1024)

                //設置TCP長鏈接,通常若是兩個小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //將小的數據包包裝成更大的幀進行傳送,提升網絡的負載
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(new ServerChannelInitializer());
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            log.info("啓動 Netty Server");
        }
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        log.info("關閉Netty");
    }
}

NettyServerHandler.java

package com.easy.nettyServer;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 客戶端鏈接會觸發
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel active......");
    }

    /**
     * 客戶端發消息會觸發
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服務器收到消息: {}", msg.toString());
        ctx.write("我是服務端,我收到你的消息了!");
        ctx.flush();
    }

    /**
     * 發生異常觸發
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ServerChannelInitializer.java

package com.easy.nettyServer;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //添加編解碼
        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }
}

建立 Netty 客戶端

NettyClient.java

package com.easy.nettyClient;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class NettyClient {

    private EventLoopGroup group = new NioEventLoopGroup();

    @Value("${netty.port}")
    private Integer port;

    @Value("${netty.host}")
    private String host;

    private SocketChannel socketChannel;

    /**
     * 發送消息
     */
    public void sendMsg(String msg) {
        socketChannel.writeAndFlush(msg);
    }

    @PostConstruct
    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new NettyClientInitializer());
        ChannelFuture future = bootstrap.connect();
        //客戶端斷線重連邏輯
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("鏈接Netty服務端成功");
            } else {
                log.info("鏈接失敗,進行斷線重連");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }
}

NettyClientHandler.java

package com.easy.nettyClient;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客戶端Active .....");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客戶端收到消息: {}", msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

NettyClientInitializer.java

package com.easy.nettyClient;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast("decoder", new StringDecoder());
        socketChannel.pipeline().addLast("encoder", new StringEncoder());
        socketChannel.pipeline().addLast(new NettyClientHandler());
    }
}

運行示例

打開瀏覽器,地址欄輸入:http://localhost:8091/send?msg=%E4%BD%A0%E5%A5%BD,觀察服務端和客戶端控制檯

服務端控制檯輸出

2019-12-13 18:01:37.901  INFO 11288 --- [           main] com.easy.nettyServer.NettyServer         : 啓動 Netty Server
2019-12-13 18:01:45.834  INFO 11288 --- [ntLoopGroup-3-1] com.easy.nettyServer.NettyServerHandler  : Channel active......
2019-12-13 18:02:07.858  INFO 11288 --- [ntLoopGroup-3-1] com.easy.nettyServer.NettyServerHandler  : 服務器收到消息: 你好

客戶端控制檯輸出

2019-12-13 18:01:45.822  INFO 11908 --- [ntLoopGroup-2-1] com.easy.nettyClient.NettyClient         : 鏈接Netty服務端成功
2019-12-13 18:01:45.822  INFO 11908 --- [ntLoopGroup-2-1] com.easy.nettyClient.NettyClientHandler  : 客戶端Active .....
2019-12-13 18:02:08.005  INFO 11908 --- [ntLoopGroup-2-1] com.easy.nettyClient.NettyClientHandler  : 客戶端收到消息: 我是服務端,我收到你的消息了!

表示使用Netty實現了咱們的NIO通訊了

Netty 模塊組件

Bootstrap、ServerBootstrap

一個Netty應用一般由一個Bootstrap開始,主要做用是配置整個Netty程序,串聯各個組件,Netty中Bootstrap類是客戶端程序的啓動引導類,ServerBootstrap是服務端啓動引導類。

Future、ChannelFuture

在Netty中全部的IO操做都是異步的,不能馬上得知消息是否被正確處理,可是能夠過一會等它執行完成或者直接註冊一個監聽,具體的實現就是經過Future和ChannelFuture,他們能夠註冊一個監聽,當操做執行成功或失敗時監聽會自動觸發註冊的監聽事件。

Channel

Netty網絡通訊組件,可以用於執行網絡I/O操做。Channel爲用戶提供:

  • 當前網絡鏈接的通道的狀態(例如是否打開?是否已鏈接?)
  • 網絡鏈接的配置參數 (例如接收緩衝區大小)
  • 提供異步的網絡I/O操做(如創建鏈接,讀寫,綁定端口),異步調用意味着任何I/O調用都將當即返回,而且不保證在調用結束時所請求的I/O操做已完成。調用當即返回一個ChannelFuture實例,經過註冊監聽器到ChannelFuture上,能夠I/O操做成功、失敗或取消時回調通知調用方。
  • 支持關聯I/O操做與對應的處理程序

不一樣協議、不一樣阻塞類型的鏈接都有不一樣的 Channel 類型與之對應,下面是一些經常使用的 Channel 類型

  • NioSocketChannel,異步的客戶端 TCP Socket 鏈接
  • NioServerSocketChannel,異步的服務器端 TCP Socket 鏈接
  • NioDatagramChannel,異步的 UDP 鏈接
  • NioSctpChannel,異步的客戶端 Sctp 鏈接
  • NioSctpServerChannel,異步的 Sctp 服務器端鏈接

Selector

Netty基於Selector對象實現I/O多路複用,經過 Selector, 一個線程能夠監聽多個鏈接的Channel事件, 當向一個Selector中註冊Channel 後,Selector 內部的機制就能夠自動不斷地查詢(select) 這些註冊的Channel是否有已就緒的I/O事件(例如可讀, 可寫, 網絡鏈接完成等),這樣程序就能夠很簡單地使用一個線程高效地管理多個 Channel

NioEventLoop

NioEventLoop中維護了一個線程和任務隊列,支持異步提交執行任務,線程啓動時會調用NioEventLoop的run方法,執行I/O任務和非I/O任務:

  • I/O任務 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法觸發。
  • 非IO任務 添加到taskQueue中的任務,如register0、bind0等任務,由runAllTasks方法觸發。

兩種任務的執行時間比由變量ioRatio控制,默認爲50,則表示容許非IO任務執行的時間與IO任務的執行時間相等。

NioEventLoopGroup

NioEventLoopGroup,主要管理eventLoop的生命週期,能夠理解爲一個線程池,內部維護了一組線程,每一個線程(NioEventLoop)負責處理多個Channel上的事件,而一個Channel只對應於一個線程。

ChannelHandler

ChannelHandler是一個接口,處理I/O事件或攔截I/O操做,並將其轉發到其ChannelPipeline(業務處理鏈)中的下一個處理程序。

ChannelHandlerContext

保存Channel相關的全部上下文信息,同時關聯一個ChannelHandler對象

ChannelPipline

保存ChannelHandler的List,用於處理或攔截Channel的入站事件和出站操做。 ChannelPipeline實現了一種高級形式的攔截過濾器模式,使用戶能夠徹底控制事件的處理方式,以及Channel中各個的ChannelHandler如何相互交互。

資料

Spring Boot、Cloud 學習項目
相關文章
相關標籤/搜索