搭建生產級的Netty項目

Netty是Trustin Lee在2004年開發的一款高性能的網絡應用程序框架。相比於JDK自帶的NIO,Netty作了至關多的加強,且隔離了jdk nio的實現細節,API也比較友好,還支持流量整形等高級特性。在咱們常見的一些開源項目中已經廣泛的應用到了Netty,好比Dubbo、Elasticsearch、Zookeeper等。java

Netty的具體開發
提示:因代碼相對較多,這裏只展現其主要部分,至於項目中用到的編解碼器、工具類,請直接拉到最後下載源碼!也歡迎順手給個Star~react

須要的依賴算法

<dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
</dependency>

<dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
</dependency>
<dependency>
        <groupId>io.dropwizard.metrics</groupId>
        <artifactId>metrics-core</artifactId>
        <version>4.1.1</version>
</dependency>
<dependency>
        <groupId>io.dropwizard.metrics</groupId>
        <artifactId>metrics-jmx</artifactId>
        <version>4.1.1</version>
</dependency>
<dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.29.Final</version>
</dependency>

Client端代碼apache

package com.example.nettydemo.client;

import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
import com.example.nettydemo.common.RequestMessage;
import com.example.nettydemo.common.string.StringOperation;
import com.example.nettydemo.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import javax.net.ssl.SSLException;
import java.util.concurrent.ExecutionException;

public class Client {

public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException {

    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);

    //客戶端鏈接服務器最大容許時間,默認爲30s
    bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s

    NioEventLoopGroup group = new NioEventLoopGroup();
    try {

        bootstrap.group(group);

        RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);

        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                pipeline.addLast(new FrameDecoder());
                pipeline.addLast(new FrameEncoder());

                pipeline.addLast(new ProtocolEncoder());
                pipeline.addLast(new ProtocolDecoder());

                pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
                pipeline.addLast(new OperationToRequestMessageEncoder());

//                    pipeline.addLast(loggingHandler);

            }
        });

        //鏈接服務
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
        //由於future是異步執行,因此須要先鏈接上後,再進行下一步操做
        channelFuture.sync();

        long streamId = IdUtil.nextId();
        /**
         * 發送數據測試,按照定義的規則組裝數據
         */
//            OrderOperation orderOperation =  new OrderOperation(1001, "你好啊,hi");
                        RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi"));

        //將future放入center
        OperationResultFuture operationResultFuture = new OperationResultFuture();
        requestPendingCenter.add(streamId, operationResultFuture);

        //發送消息
        for (int i = 0; i < 10; i++) {
            channelFuture.channel().writeAndFlush(requestMessage);
        }

        //阻塞等待結果,結果來了以後會調用ResponseDispatcherHandler去set結果

// OperationResult operationResult = operationResultFuture.get();
// //將結果打印
// System.out.println("返回:"+operationResult);bootstrap

channelFuture.channel().closeFuture().get();

    } finally {
        group.shutdownGracefully();
    }

}

}
Server端代碼api

package com.example.nettydemo.server;

import com.example.nettydemo.server.codec.FrameDecoder;
import com.example.nettydemo.server.codec.FrameEncoder;
import com.example.nettydemo.server.codec.ProtocolDecoder;
import com.example.nettydemo.server.codec.ProtocolEncoder;
import com.example.nettydemo.server.handler.MetricsHandler;
import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
import com.example.nettydemo.server.handler.ServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import lombok.extern.slf4j.Slf4j;

import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException;

/**
 * netty server 入口
 */
@Slf4j
public class Server {

public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException {

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //設置channel模式,由於是server因此使用NioServerSocketChannel
    serverBootstrap.channel(NioServerSocketChannel.class);

    //最大的等待鏈接數量
    serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
    //設置是否啓用 Nagle 算法:用將小的碎片數據鏈接成更大的報文 來提升發送效率。
    //若是須要發送一些較小的報文,則須要禁用該算法
    serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);

    //設置netty自帶的log,並設置級別
    serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));

    //thread
    //用戶指定線程名
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
    NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
    UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));

    //只能使用一個線程,因GlobalTrafficShapingHandler比較輕量級
    NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS"));

    try {
        //設置react方式
        serverBootstrap.group(bossGroup, workGroup);

        //metrics
        MetricsHandler metricsHandler = new MetricsHandler();

        //trafficShaping流量整形
        //long writeLimit 寫入時控制, long readLimit 讀取時控制 具體設置看業務修改
        GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024);

        //log
        LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
        LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO);

        //設置childHandler,按執行順序放
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {

                ChannelPipeline pipeline = ch.pipeline();

                pipeline.addLast("debugLog", debugLogHandler);
                pipeline.addLast("tsHandler", globalTrafficShapingHandler);
                pipeline.addLast("metricHandler", metricsHandler);
                pipeline.addLast("idleHandler", new ServerIdleCheckHandler());

                pipeline.addLast("frameDecoder", new FrameDecoder());
                pipeline.addLast("frameEncoder", new FrameEncoder());
                pipeline.addLast("protocolDecoder", new ProtocolDecoder());
                pipeline.addLast("protocolEncoder", new ProtocolEncoder());

                pipeline.addLast("infoLog", infoLogHandler);
                //對flush加強,減小flush次數犧牲延遲加強吞吐量
                pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
                //爲業務處理指定單獨的線程池
                pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
            }
        });

        //綁定端口並阻塞啓動
        ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();

        channelFuture.channel().closeFuture().sync();

    } finally {
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
        businessGroup.shutdownGracefully();
        eventLoopGroupForTrafficShaping.shutdownGracefully();
    }

}

}
最後
以上介紹了Netty的基本用法,在代碼中也作了一部分的關鍵註釋,但可能還會有許多不足,也不可能知足全部人的要求,你們可根據本身的實際需求去改造此項目服務器

相關文章
相關標籤/搜索