《精通併發與Netty》學習筆記(08 - netty4+springboot項目案例)

本節經過案例介紹springboot與netty的集成java

第一步:新建Spring Initializr 項目spring

我這裏選擇Gradle項目,也可選擇Maven項目bootstrap

(注意:最好選擇本身下載gradle,以下圖)springboot

而後修改build.gradle文件,加入依賴(須要安裝Lombok插件)服務器

plugins {
    id 'org.springframework.boot' version '2.1.5.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.spring.netty'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
targetCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile(
            "junit:junit:4.12",
            "io.netty:netty-all:4.1.36.Final",
            "org.springframework.boot:spring-boot-starter",
            "org.springframework.boot:spring-boot-starter-test",
            "org.projectlombok:lombok:1.18.8"
 ) }

 接下來編寫服務端程序app

package com.spring.netty.springbootnettydemo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * description:
 * @since 2019/05/21
 **/
@Component
public class NettyTcpServer {

    private static final Logger log = LoggerFactory.getLogger(NettyTcpServer.class);
    //boss事件輪詢線程組
    //處理Accept鏈接事件的線程,這裏線程數設置爲1便可,netty處理連接事件默認爲單線程,過分設置反而浪費cpu資源
    private EventLoopGroup boss = new NioEventLoopGroup(1);
    //worker事件輪詢線程組
    //處理hadnler的工做線程,其實也就是處理IO讀寫 。線程數據默認爲 CPU 核心數乘以2
    private EventLoopGroup worker = new NioEventLoopGroup();

    @Autowired
    ServerChannelInitializer serverChannelInitializer;

    @Value("${netty.tcp.client.port}")
    private Integer port;

    //與客戶端創建鏈接後獲得的通道對象
    private Channel channel;

    /**
     * 存儲client的channel
     * key:ip,value:Channel
     */
    public static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>();

    /**
     * 開啓Netty tcp server服務
     *
     * @return
     */
    public ChannelFuture start() {
        //啓動類
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)//組配置,初始化ServerBootstrap的線程組
                .channel(NioServerSocketChannel.class)///構造channel通道工廠//bossGroup的通道,只是負責鏈接
                .childHandler(serverChannelInitializer)//設置通道處理者ChannelHandler////workerGroup的處理器
                .option(ChannelOption.SO_BACKLOG, 1024)//socket參數,當服務器請求處理程全滿時,用於臨時存放已完成三次握手請求的隊列的最大長度。若是未設置或所設置的值小於1,Java將使用默認值50。
                .childOption(ChannelOption.SO_KEEPALIVE, true);//啓用心跳保活機制,tcp,默認2小時發一次心跳
        //Future:異步任務的生命週期,可用來獲取任務結果
        ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//綁定端口,開啓監聽,同步等待
        if (channelFuture1 != null && channelFuture1.isSuccess()) {
            channel = channelFuture1.channel();//獲取通道
            log.info("Netty tcp server start success, port = {}", port);
        } else {
            log.error("Netty tcp server start fail");
        }
        return channelFuture1;
    }

    /**
     * 中止Netty tcp server服務
     */
    @PreDestroy
    public void destroy() {
        if (channel != null) {
            channel.close();
        }
        try {
            Future<?> future = worker.shutdownGracefully().await();
            if (!future.isSuccess()) {
                log.error("netty tcp workerGroup shutdown fail, {}", future.cause());
            }
            Future<?> future1 = boss.shutdownGracefully().await();
            if (!future1.isSuccess()) {
                log.error("netty tcp bossGroup shutdown fail, {}", future1.cause());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Netty tcp server shutdown success");
    }

}
package com.spring.netty.springbootnettydemo;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * description: 通道初始化,主要用於設置各類Handler
 * @since 2019/05/21
 **/
@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    ServerChannelHandler serverChannelHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //IdleStateHandler心跳機制,若是超時觸發Handle中userEventTrigger()方法
        pipeline.addLast("idleStateHandler",
                new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
        //字符串編解碼器
        pipeline.addLast(
                new StringDecoder(),
                new StringEncoder()
        );
        //自定義Handler
        pipeline.addLast("serverChannelHandler", serverChannelHandler);
    }
}
package com.spring.netty.springbootnettydemo;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * description:
 * @since 2019/05/21
 **/
@Component
@ChannelHandler.Sharable
@Slf4j
public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {

    /**
     * 拿到傳過來的msg數據,開始處理
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Netty tcp server receive msg : " + msg);
        ctx.channel().writeAndFlush(" response msg ").syncUninterruptibly();
    }

    /**
     * 活躍的、有效的通道
     * 第一次鏈接成功後進入的方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("tcp client " + getRemoteAddress(ctx) + " connect success");
        //往channel map中添加channel信息
        NettyTcpServer.map.put(getIPString(ctx), ctx.channel());
    }

    /**
     * 不活動的通道
     * 鏈接丟失後執行的方法(client端可據此實現斷線重連)
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //刪除Channel Map中的失效Client
        NettyTcpServer.map.remove(getIPString(ctx));
        ctx.close();
    }

    /**
     * 異常處理
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        //發生異常,關閉鏈接
        log.error("引擎 {} 的通道發生異常,即將斷開鏈接", getRemoteAddress(ctx));
        ctx.close();//再次建議close
    }

    /**
     * 心跳機制,超時處理
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        String socketString = ctx.channel().remoteAddress().toString();
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("Client: " + socketString + " READER_IDLE 讀超時");
                ctx.disconnect();//斷開
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("Client: " + socketString + " WRITER_IDLE 寫超時");
                ctx.disconnect();
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("Client: " + socketString + " ALL_IDLE 總超時");
                ctx.disconnect();
            }
        }
    }

    /**
     * 獲取client對象:ip+port
     *
     * @param ctx
     * @return
     */
    public String getRemoteAddress(ChannelHandlerContext ctx) {
        String socketString = "";
        socketString = ctx.channel().remoteAddress().toString();
        return socketString;
    }

    /**
     * 獲取client的ip
     *
     * @param ctx
     * @return
     */
    public String getIPString(ChannelHandlerContext ctx) {
        String ipString = "";
        String socketString = ctx.channel().remoteAddress().toString();
        int colonAt = socketString.indexOf(":");
        ipString = socketString.substring(1, colonAt);
        return ipString;
    }

}

編寫客戶端代碼異步

package com.spring.netty.springbootnettydemo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * description:
 * @since 2019/05/21
 **/
@Component
public class NettyTcpClient {

    private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);

    @Value(("${netty.tcp.server.host}"))
    String HOST;
    @Value("${netty.tcp.server.port}")
    int PORT;

    @Autowired
    ClientChannelInitializer clientChannelInitializer;

    //與服務端創建鏈接後獲得的通道對象
    private Channel channel;

    /**
     * 初始化 `Bootstrap` 客戶端引導程序
     *
     * @return
     */
    private final Bootstrap getBootstrap() {
        Bootstrap b = new Bootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        b.group(group)
                .channel(NioSocketChannel.class)//通道鏈接者
                .handler(clientChannelInitializer)//通道處理者
                .option(ChannelOption.SO_KEEPALIVE, true);//心跳報活
        return b;
    }

    /**
     * 創建鏈接,獲取鏈接通道對象
     *
     * @return
     */
    public void connect() {
        ChannelFuture channelFuture = getBootstrap().connect(HOST, PORT).syncUninterruptibly();
        if (channelFuture != null && channelFuture.isSuccess()) {
            channel = channelFuture.channel();
            log.info("connect tcp server host = {}, port = {} success", HOST, PORT);
        } else {
            log.error("connect tcp server host = {}, port = {} fail", HOST, PORT);
        }
    }

    /**
     * 向服務器發送消息
     *
     * @param msg
     * @throws Exception
     */
    public void sendMsg(Object msg) throws Exception {
        if (channel != null) {
            channel.writeAndFlush(msg).sync();
        } else {
            log.warn("消息發送失敗,鏈接還沒有創建!");
        }
    }

}
package com.spring.netty.springbootnettydemo;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * description: 通道初始化,主要用於設置各類Handler
 * @since 2019/05/21
 **/
@Component
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    ClientChannelHandler clientChannelHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //IdleStateHandler心跳機制,若是超時觸發Handle中userEventTrigger()方法
        pipeline.addLast("idleStateHandler",
                new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
        //字符串編解碼器
        pipeline.addLast(
                new StringDecoder(),
                new StringEncoder()
        );
        //自定義Handler
        pipeline.addLast("clientChannelHandler", clientChannelHandler);
    }
}
package com.spring.netty.springbootnettydemo;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.stereotype.Component;

/**
 * description:
 * @since 2019/05/21
 **/
@Component
@ChannelHandler.Sharable
public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {

    /**
     * 從服務器接收到的msg
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Netty tcp client receive msg : " + msg);
    }

}

最後修改springboot啓動程序socket

package com.spring.netty.springbootnettydemo;

import io.netty.channel.ChannelFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootNettyDemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootNettyDemoApplication.class, args);
    }

    @Autowired
    NettyTcpServer nettyTcpServer;
    @Autowired
    NettyTcpClient nettyTcpClient;

    @Override
    public void run(String... args) throws Exception {
        //啓動服務端
        ChannelFuture start = nettyTcpServer.start();

        //啓動客戶端,發送數據
        nettyTcpClient.connect();
        for (int i = 0; i < 10; i++) {
            nettyTcpClient.sendMsg("hello world" + i);
        }

        //服務端管道關閉的監聽器並同步阻塞,直到channel關閉,線程纔會往下執行,結束進程
        start.channel().closeFuture().syncUninterruptibly();
    }

}

運行測試效果以下:maven

本節咱們經過一個案例將springboot與netty結合實現了異步通訊,下節咱們將詳細介紹Netty的相關知識tcp

相關文章
相關標籤/搜索