宜人貸蜂巢API網關技術解密之Netty使用實踐

1、背景

宜人貸蜂巢團隊,由Michael創立於2013年,經過使用互聯網科技手段助力金融生態和諧健康發展。自成立起一直致力於多維度數據閉環平臺建設。目前團隊規模超過百人,涵蓋徵信、電商、金融、社交、五險一金和保險等用戶授信數據的抓取解析業務,輔以先進的數據分析、挖掘和機器學習等技術對用戶信用級別、欺詐風險進行預測評定,全面對外輸出金融反欺詐、社交圖譜、自動化模型定製等服務或產品。html

目前宜人貸蜂巢基於用戶受權數據實時抓取解析技術,並結合頂尖大數據技術,快速迭代和自主的創新,已造成了強大而領先的聚合和輸出能力。java

爲了適應完成宜人貸蜂巢強大的服務輸出能力,蜂巢設計開發了本身的API網關係統,集中實現了鑑權、加解密、路由、限流等功能,使各業務抓取團隊關注其核心抓取和分析工做,而API網關係統更專一於安全、流量、路由等問題,從而更好的保障蜂巢服務系統的質量。今天帶着你們解密API網關的Netty線程池技術實踐細節。segmentfault

API網關做爲宜人貸蜂巢數據開放平臺的統一入口,全部的客戶端及消費端經過統一的API來使用各種抓取服務。從面向對象設計的角度看,它與外觀模式相似,包裝各種不一樣的實現細節,對外表現出統一的調用形式。後端

本文首先簡要地介紹API網關的項目框架,其次對比BIO和NIO的特色,再引入Netty做爲項目的基礎框架,而後介紹Netty線程池的原理,最後深刻Netty線程池的初始化、ServerBootstrap的初始化與啓動及channel與線程池的綁定過程,讓讀者瞭解Netty在承載高併發訪問的設計路思。安全

2、項目框架

圖1 - API網關項目框架

圖1 - API網關項目框架服務器

圖中描繪了API網關係統的處理流程,以及與服務註冊發現、日誌分析、報警系統、各種爬蟲的關係。其中API網關係統接收請求,對請求進行編解碼、鑑權、限流、加解密,再基於Eureka服務註冊發現模塊,將請求發送到有效的服務節點上;網關及抓取系統的日誌,會被收集到elk平臺中,作業務分析及報警處理。併發

3、BIO vs NIO

API網關承載數倍於爬蟲的流量,提高服務器的併發處理能力、縮短系統的響應時間,通訊模型的選擇是相當重要的,是選擇BIO,仍是NIO?框架

Streamvs Buffer & 阻塞 vs 非阻塞機器學習

BIO是面向流的,io的讀寫,每次只能處理一個或者多個bytes,若是數據沒有讀寫完成,線程將一直等待於此,而不能暫時跳過io或者等待io讀寫完成異步通知,線程滯留在io讀寫上,不能充分利用機器有限的線程資源,形成server的吞吐量較低,見圖2。而NIO與此不一樣,面向Buffer,線程不須要滯留在io讀寫上,採用操做系統的epoll模式,在io數據準備好了,才由線程來處理,見圖3。異步

圖

圖2 – BIO 從流中讀取數據

圖

圖3 – NIO 從Buffer中讀取數據

Selectors

NIO的selector使一個線程能夠監控多個channel的讀寫,多個channel註冊到一個selector上,這個selector能夠監測到各個channel的數據準備狀況,從而使用有限的線程資源處理更多的鏈接,見圖4。因此能夠這樣說,NIO極大的提高了服務器接受併發請求的能力,而服務器性能仍是要取決於業務處理時間和業務線程池模型。

圖

圖4 – NIO 單一線程管理多個鏈接

而BIO採用的是request-per-thread模式,用一個線程負責接收TCP鏈接請求,並創建鏈路,而後將請求dispatch給負責業務邏輯處理的線程,見圖5。一旦訪問量過多,就會形成機器的線程資源緊張,形成請求延遲,甚至服務宕機。

圖

圖5 – BIO 一鏈接一線程

對比JDK NIO與諸多NIO框架後,鑑於Netty優雅的設計、易用的API、優越的性能、安全性支持、API網關使用Netty做爲通訊模型,實現了基礎框架的搭建。

4、Netty線程池

考慮到API網關的高併發訪問需求,線程池設計,見圖6。

圖

圖6 – API網關線程池設計

Netty的線程池理念有點像ForkJoinPool,不是一個線程大池子併發等待一條任務隊列,而是每條線程都有一個任務隊列。並且Netty的線程,並不僅是簡單的阻塞地拉取任務,而是在每一個循環中作三件事情:

  • 先SelectKeys()處理NIO的事件

  • 而後獲取本線程的定時任務,放到本線程的任務隊列裏

  • 最後執行其餘線程提交給本線程的任務

每一個循環裏處理NIO事件與其餘任務的時間消耗比例,還能經過ioRatio變量來控制,默認是各佔50%。可見,Netty的線程根本沒有阻塞等待任務的悠閒日子,因此也不使用有鎖的BlockingQueue來作任務隊列了,而是使用無鎖的MpscLinkedQueue(Mpsc 是Multiple Producer, Single Consumer的縮寫)

5、NioEventLoopGroup初始化

下面分析下Netty線程池NioEventLoopGroup的設計與實現細節,NioEventLoopGroup的類層次關係見圖7

圖

圖7 –NioEvenrLoopGroup類層次關係

其建立過程——方法調用,見下圖

圖

圖8 –NioEvenrLoopGroup建立調用關係

圖

NioEvenrLoopGroup的建立,具體執行過程是執行類MultithreadEventExecutorGroup的構造方法

/**

 * Create a new instance.

 *

 * @param nThreads          the number of threads that will be used by this instance.

 * @param executor          the Executor to use, or {@code null} if the default should be used.

 * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.

 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call

 */

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

                                        EventExecutorChooserFactory chooserFactory, Object... args) {

    if (nThreads <= 0) {

        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));

    }

    if (executor == null) {

        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {

        boolean success = false;

        try {

            children[i] = newChild(executor, args);

            success = true;

        } catch (Exception e) { 

            throw new IllegalStateException("failed to create a child event loop", e);

        } finally {

            if (!success) {

                for (int j = 0; j < i; j ++) {

                    children[j].shutdownGracefully();

                }

                for (int j = 0; j < i; j ++) {

                    EventExecutor e = children[j];

                    try {

                        while (!e.isTerminated()) {

                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

                        }

                    } catch (InterruptedException interrupted) {

                        // Let the caller handle the interruption.

                        Thread.currentThread().interrupt();

                        break;

                    }

                }

            }

        }

    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {

        @Override

        public void operationComplete(Future<Object> future) throws Exception {

            if (terminatedChildren.incrementAndGet() == children.length) {

                terminationFuture.setSuccess(null);

            }

        }

    };

    for (EventExecutor e: children) {

        e.terminationFuture().addListener(terminationListener);

    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);

    Collections.addAll(childrenSet, children);

    readonlyChildren = Collections.unmodifiableSet(childrenSet);

}

其中,建立細節以下:

  • 線程池中的線程數nThreads必須大於0;

  • 若是executor爲null,建立默認executor,executor用於建立線程(newChild方法使用executor對象);

  • 依次建立線程池中的每個線程即NioEventLoop,若是其中有一個建立失敗,將關閉以前建立的全部線程;

  • chooser爲線程池選擇器,用來選擇下一個EventExecutor,能夠理解爲,用來選擇一個線程來執行task;

chooser的建立細節,以下:

DefaultEventExecutorChooserFactory根據線程數建立具體的EventExecutorChooser,線程數若是等於2^n,可以使用按位與替代取模運算,節省cpu的計算資源,見源碼:

@SuppressWarnings("unchecked")

@Override

public EventExecutorChooser newChooser(EventExecutor[] executors) {

    if (isPowerOfTwo(executors.length)) {

        return new PowerOfTowEventExecutorChooser(executors);

    } else {

        return new GenericEventExecutorChooser(executors);

    }

} 

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {

        private final AtomicInteger idx = new AtomicInteger();

        private final EventExecutor[] executors;



        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {

            this.executors = executors;

        }



        @Override

        public EventExecutor next() {

            return executors[idx.getAndIncrement() & executors.length - 1];

        }

    }



    private static final class GenericEventExecutorChooser implements EventExecutorChooser {

        private final AtomicInteger idx = new AtomicInteger();

        private final EventExecutor[] executors;



        GenericEventExecutorChooser(EventExecutor[] executors) {

            this.executors = executors;

        }



        @Override

        public EventExecutor next() {

            return executors[Math.abs(idx.getAndIncrement() % executors.length)];

        }

    }

newChild(executor, args)的建立細節,以下

MultithreadEventExecutorGroup的newChild方法是一個抽象方法,故使用NioEventLoopGroup的newChild方法,即調用NioEventLoop的構造函數。

圖

@Override

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {

        return new NioEventLoop(this, executor, (SelectorProvider) args[0],

            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);

    }

在這裏先看下NioEventLoop的類層次關係

圖

NioEventLoop的繼承關係比較複雜,在AbstractScheduledEventExecutor 中, Netty 實現了 NioEventLoop 的 schedule 功能, 即咱們能夠經過調用一個 NioEventLoop 實例的 schedule 方法來運行一些定時任務. 而在 SingleThreadEventLoop 中, 又實現了任務隊列的功能, 經過它, 咱們能夠調用一個NioEventLoop 實例的 execute 方法來向任務隊列中添加一個 task, 並由 NioEventLoop 進行調度執行.

一般來講, NioEventLoop 肩負着兩種任務, 第一個是做爲 IO 線程, 執行與 Channel 相關的 IO 操做, 包括調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等; 而第二個任務是做爲任務隊列, 執行 taskQueue 中的任務, 例如用戶調用 eventLoop.schedule 提交的定時任務也是這個線程執行的.

具體的構造過程,以下

圖

圖

建立任務隊列tailTasks(內部爲有界的LinkedBlockingQueue)

圖

建立線程的任務隊列taskQueue(內部爲有界的LinkedBlockingQueue),以及任務過多防止系統宕機的拒絕策略rejectedHandler

其中tailTasks和taskQueue均是任務隊列,而優先級不一樣,taskQueue的優先級高於tailTasks,定時任務的優先級高於taskQueue。

6、ServerBootstrap初始化及啓動

瞭解了Netty線程池NioEvenrLoopGroup的建立過程後,下面看下API網關服務ServerBootstrap的是如何使用線程池引入服務中,爲高併發訪問服務的。

API網關ServerBootstrap初始化及啓動代碼,以下:

serverBootstrap = new ServerBootstrap();

bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());

workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());



serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

        .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())

        .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())

        .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())

        // Memory pooled

        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

        .childHandler(channelInitializer);

 

ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();

log.info("API-gateway started on port: {}", config.getPort());

future.channel().closeFuture().sync();

API網關係統使用netty自帶的線程池,共有三組線程池,分別爲bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暫不做介紹)。其中,bossGroup用於接收客戶端的TCP鏈接,workerGroup用於處理I/O、執行系統task和定時任務,executorGroup用於處理網關業務加解密、限流、路由,及將請求轉發給後端的抓取服務等業務操做。

7、Channel與線程池的綁定

ServerBootstrap初始化後,經過調用bind(port)方法啓動Server,bind的調用鏈以下:

AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister

其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化過程指定channel爲NioServerSocketChannel.class,至此將NioServerSocketChannel與bossGroup綁定到一塊兒,bossGroup負責客戶端鏈接的創建。那麼NioSocketChannel是如何與workerGroup綁定到一塊兒的?

調用鏈AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {

    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {

        try {

            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {

                logger.warn("Unknown channel option: " + e);

            }

        } catch (Throwable t) {

            logger.warn("Failed to set a channel option: " + child, t);

        }

    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {

        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

    }

    try {

        childGroup.register(child).addListener(new ChannelFutureListener() {

            @Override

            public void operationComplete(ChannelFuture future) throws Exception {

                if (!future.isSuccess()) {

                    forceClose(child, future.cause());

                }

            }

        });

    } catch (Throwable t) {

        forceClose(child, t);

    }

}

其中,childGroup.register(child)就是將NioSocketChannel與workderGroup綁定到一塊兒,那又是什麼觸發了ServerBootstrapAcceptor的channelRead方法?

其實當一個 client 鏈接到 server 時, Java 底層的 NIO ServerSocketChannel 會有一個SelectionKey.OP_ACCEPT 就緒事件, 接着就會調用到 NioServerSocketChannel.doReadMessages方法

@Override

protected int doReadMessages(List<Object> buf) throws Exception {

    SocketChannel ch = javaChannel().accept();

    try {

        if (ch != null) {

            buf.add(new NioSocketChannel(this, ch));

            return 1;

        }

    } catch (Throwable t) {

        …

    }

    return 0;

}

javaChannel().accept() 會獲取到客戶端新鏈接的SocketChannel,實例化爲一個 NioSocketChannel, 而且傳入 NioServerSocketChannel 對象(即 this), 由此可知, 咱們建立的這個NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 .

接下來就經由 Netty 的 ChannelPipeline 機制, 將讀取事件逐級發送到各個 handler 中, 因而就會觸發前面咱們提到的 ServerBootstrapAcceptor.channelRead 方法啦。

至此,分析了Netty線程池的初始化、ServerBootstrap的啓動及channel與線程池的綁定過程,可以看出Netty中線程池的優雅設計,使用不一樣的線程池負責鏈接的創建、IO讀寫等,爲API網關項目的高併發訪問提供了技術基礎。

8、總結

至此,對API網關技術的Netty實踐分享就到這裏,各位若是對中間的各個環節有什麼疑問和建議,歡迎你們指正,咱們一塊兒討論,共同窗習提升。

參考

http://tutorials.jenkov.com/java-nio/nio-vs-io.html

http://netty.io/wiki/user-guide-for-4.x.html

http://netty.io/

http://www.tuicool.com/articles/mUFnqeM

http://www.javashuo.com/article/p-zbirqpbl-ga.html

http://www.javashuo.com/article/p-ngtqdeei-ho.html

做者:蜂巢團隊

來源:宜信技術學院

相關文章
相關標籤/搜索