netty實戰-netty client鏈接池設計

概述

最近有不少網友在諮詢netty client中,netty的channel鏈接池應該如何設計。這是個稍微有些複雜的主題,牽扯到蠻多技術點,要想在網上找到相關的又相對完整的參考文章,確實不太容易。java

在本篇文章中,會給出其中一種解決方案,而且附帶完整的可運行的代碼。若是網友有更好的方案,能夠回覆本文,咱們一塊兒討論討論,一塊兒開闊思路和眼界。bootstrap

閱讀本文以前須要具有一些基礎知識api

一、知道netty的一些基礎知識,好比ByteBuf類的相關api;
二、知道netty的執行流程;
三、 必須閱讀過我以前寫的netty實戰-自定義解碼器處理半包消息,由於本文部分代碼來自這篇文章。數組

如今微服務很是的熱門,也有不少公司在用。微服務框架中,若是是使用thrift、grpc來做爲數據序列化框架的話,一般都會生成一個SDK給客戶端用戶使用。客戶端只要使用這個SDK,就能夠方便的調用服務端的微服務接口。本文討論的就是使用SDK的netty客戶端,它的netty channel鏈接池的設計方案。至於netty http client的channel鏈接池設計,基於http的,是另一個主題了,須要另外寫文章來討論的。緩存


netty channel鏈接池設計

DB鏈接池中,當某個線程獲取到一個db connection後,在讀取數據或者寫數據時,若是線程沒有操做完,這個db connection一直被該線程獨佔着,直到線程執行完任務。若是netty client的channel鏈接池設計也是使用這種獨佔的方式的話,有幾個問題。服務器

一、netty中channel的writeAndFlush方法,調用完後是不用等待返回結果的,writeAndFlush一被調用,立刻返回。對於這種狀況,是徹底不必讓線程獨佔一個channel的。
二、使用相似DB pool的方式,從池子裏拿鏈接,用完後返回,這裏的一進一出,須要考慮併發鎖的問題。另外,若是請求量很大的時候,鏈接會不夠用,其餘線程也只能等待其餘線程釋放鏈接。併發

所以不太建議使用上面的方式來設計netty channel鏈接池,channel獨佔的代價太大了。可使用Channel數組的形式, 複用netty的channel。當線程要須要Channel的時候,隨機從數組選中一個Channel,若是Channel還未創建,則建立一個。若是線程選中的Channel已經創建了,則複用這個Channel。框架

這裏寫圖片描述
這裏寫圖片描述

假設channel數組的長度爲4dom

private Channel[] channels = new Channel[4];複製代碼

當外部系統請求client的時候,client從channels數組中隨機挑選一個channel,若是該channel還沒有創建,則觸發創建channel的邏輯。不管有多少請求,都是複用這4個channel。假設有10個線程,那麼部分線程可能會使用相同的channel來發送數據和接收數據。由於是隨機選擇一個channel的,多個線程命中同一個channel的機率仍是很大的。以下圖異步

這裏寫圖片描述
這裏寫圖片描述

10個線程中,可能有3個線程都是使用channel2來發送數據的。這個會引入另一個問題。thread1經過channel2發送一條消息msg1到服務端,thread2也經過channel2發送一條消息msg2到服務端,當服務端處理完數據,經過channel2返回數據給客戶端的時候,如何區分哪條消息是哪一個線程的呢?若是不作區分,萬一thread1拿到的結果實際上是thread2要的結果,怎麼辦?

那麼如何作到讓thread1和thread2拿到它們本身想要的結果呢?

以前我在netty實戰-自定義解碼器處理半包消息一文中提到,自定義消息的時候,一般會在消息中加入一個序列號,用來惟一標識消息的。當thread1發送消息時,往消息中插入一個惟一的消息序列號,同時爲thread1創建一個callback回調程序,當服務端返回消息的時候,根據消息中的序列號從對應的callback程序獲取結果。這樣就能夠解決上面說到的問題。

消息格式

這裏寫圖片描述
這裏寫圖片描述


消息、消息seq以及callback對應關係

這裏寫圖片描述
這裏寫圖片描述

這裏寫圖片描述
這裏寫圖片描述

OK,下面就基於上面的設計來進行編碼。


代碼

先來實現netty客戶端,設置10個線程併發獲取channel,爲了達到真正的併發,利用CountDownLatch來作開關,同時channel鏈接池設置4個channel。

package nettyinaction.nettyclient.channelpool.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import nettyinaction.nettyclient.channelpool.ChannelUtils;
import nettyinaction.nettyclient.channelpool.IntegerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        //當全部線程都準備後,開閘,讓全部線程併發的去獲取netty的channel
        final CountDownLatch countDownLatchBegin = new CountDownLatch(1);

        //當全部線程都執行完任務後,釋放主線程,讓主線程繼續執行下去
        final CountDownLatch countDownLatchEnd = new CountDownLatch(10);

        //netty channel池
        final NettyChannelPool nettyChannelPool = new NettyChannelPool();

        final Map<String, String> resultsMap = new HashMap<>();
        //使用10個線程,併發的去獲取netty channel
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //先讓線程block住
                        countDownLatchBegin.await();

                        Channel channel = null;
                        try {
                            channel = nettyChannelPool.syncGetChannel();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        //爲每一個線程創建一個callback,當消息返回的時候,在callback中獲取結果
                        CallbackService callbackService = new CallbackService();
                        //給消息分配一個惟一的消息序列號
                        int seq = IntegerFactory.getInstance().incrementAndGet();
                        //利用Channel的attr方法,創建消息與callback的對應關係
                        ChannelUtils.putCallback2DataMap(channel,seq,callbackService);

                        synchronized (callbackService) {
                            UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
                            ByteBuf buffer = allocator.buffer(20);
                            buffer.writeInt(ChannelUtils.MESSAGE_LENGTH);

                            buffer.writeInt(seq);
                            String threadName = Thread.currentThread().getName();
                            buffer.writeBytes(threadName.getBytes());
                            buffer.writeBytes("body".getBytes());

                            //給netty 服務端發送消息,異步的,該方法會馬上返回
                            channel.writeAndFlush(buffer);

                            //等待返回結果
                            callbackService.wait();

                            //解析結果,這個result在callback中已經解析到了。
                            ByteBuf result = callbackService.result;
                            int length = result.readInt();
                            int seqFromServer = result.readInt();

                            byte[] head = new byte[8];
                            result.readBytes(head);
                            String headString = new String(head);

                            byte[] body = new byte[4];
                            result.readBytes(body);
                            String bodyString = new String(body);
                            resultsMap.put(threadName, seqFromServer + headString + bodyString);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    finally {
                        countDownLatchEnd.countDown();
                    }
                }
            }).start();
        }

        //開閘,讓10個線程併發獲取netty channel
        countDownLatchBegin.countDown();

        //等10個線程執行完後,打印最終結果
        countDownLatchEnd.await();
        System.out.println("resultMap="+resultsMap);
    }

    public static class CallbackService{
        public volatile ByteBuf result;
        public void receiveMessage(ByteBuf receiveBuf) throws Exception {
            synchronized (this) {
                result = receiveBuf;
                this.notify();
            }
        }
    }
}複製代碼

其中IntegerFactory類用於生成消息的惟一序列號

package nettyinaction.nettyclient.channelpool;


import java.util.concurrent.atomic.AtomicInteger;

public class IntegerFactory {
    private static class SingletonHolder {
        private static final AtomicInteger INSTANCE = new AtomicInteger();
    }

    private IntegerFactory(){}

    public static final AtomicInteger getInstance() {
        return SingletonHolder.INSTANCE;
    }
}複製代碼

而ChannelUtils類則用於創建channel、消息序列號和callback程序的對應關係。

package nettyinaction.nettyclient.channelpool;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;

import java.util.Map;

public class ChannelUtils {
    public static final int MESSAGE_LENGTH = 16;
    public static final AttributeKey<Map<Integer, Object>> DATA_MAP_ATTRIBUTEKEY = AttributeKey.valueOf("dataMap");
    public static <T> void putCallback2DataMap(Channel channel, int seq, T callback) {
        channel.attr(DATA_MAP_ATTRIBUTEKEY).get().put(seq, callback);
    }

    public static <T> T removeCallback(Channel channel, int seq) {
        return (T) channel.attr(DATA_MAP_ATTRIBUTEKEY).get().remove(seq);
    }
}複製代碼

NettyChannelPool則負責建立netty的channel。

package nettyinaction.nettyclient.channelpool.client;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.Attribute;
import nettyinaction.nettyclient.channelpool.ChannelUtils;
import nettyinaction.nettyclient.channelpool.SelfDefineEncodeHandler;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

public class NettyChannelPool {
    private Channel[] channels;
    private Object [] locks;
    private static final int MAX_CHANNEL_COUNT = 4;

    public NettyChannelPool() {
        this.channels = new Channel[MAX_CHANNEL_COUNT];
        this.locks = new Object[MAX_CHANNEL_COUNT];
        for (int i = 0; i < MAX_CHANNEL_COUNT; i++) {
            this.locks[i] = new Object();
        }
    }

    /** * 同步獲取netty channel */
    public Channel syncGetChannel() throws InterruptedException {
        //產生一個隨機數,隨機的從數組中獲取channel
        int index = new Random().nextInt(MAX_CHANNEL_COUNT);
        Channel channel = channels[index];
        //若是能獲取到,直接返回
        if (channel != null && channel.isActive()) {
            return channel;
        }

        synchronized (locks[index]) {
            channel = channels[index];
            //這裏必須再次作判斷,當鎖被釋放後,以前等待的線程已經能夠直接拿到結果了。
            if (channel != null && channel.isActive()) {
                return channel;
            }

            //開始跟服務端交互,獲取channel
            channel = connectToServer();

            channels[index] = channel;
        }

        return channel;
    }

    private Channel connectToServer() throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                 .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline pipeline = ch.pipeline();
                         pipeline.addLast(new SelfDefineEncodeHandler());
                         pipeline.addLast(new SocketClientHandler());
                     }
                 });

        ChannelFuture channelFuture = bootstrap.connect("localhost", 8899);
        Channel channel = channelFuture.sync().channel();

        //爲剛剛建立的channel,初始化channel屬性
        Attribute<Map<Integer,Object>> attribute = channel.attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
        ConcurrentHashMap<Integer, Object> dataMap = new ConcurrentHashMap<>();
        attribute.set(dataMap);
        return channel;
    }
}複製代碼

先使用構造方法,初始化channels數組,長度爲4。NettyChannelPool類有兩個關鍵的地方。
第一個是獲取channel的時候必須加上鎖。另一個是當獲取到channel後,利用channel的屬性,建立一個Map,後面須要利用這個Map創建消息序列號和callback程序的對應關係。

//初始化channel屬性
        Attribute<Map<Integer,Object>> attribute = channel.attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
        ConcurrentHashMap<Integer, Object> dataMap = new ConcurrentHashMap<>();
        attribute.set(dataMap);複製代碼

這個map就是咱們上面看到的

這裏寫圖片描述
這裏寫圖片描述

Map的put的動做,就是在SocketClient類中的

ChannelUtils.putCallback2DataMap(channel,seq,callbackService);複製代碼

執行的。客戶端處理消息還須要兩個hanlder輔助,一個是處理半包問題,一個是接收服務端的返回的消息。

SelfDefineEncodeHandler類用於處理半包消息

package nettyinaction.nettyclient.channelpool;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
        if (bufferIn.readableBytes() < 4) {
            return;
        }

        int beginIndex = bufferIn.readerIndex();
        int length = bufferIn.readInt();

        if (bufferIn.readableBytes() < length) {
            bufferIn.readerIndex(beginIndex);
            return;
        }

        bufferIn.readerIndex(beginIndex + 4 + length);

        ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);

        otherByteBufRef.retain();

        out.add(otherByteBufRef);
    }
}複製代碼

SocketClientHandler類用於接收服務端返回的消息,而且根據消息序列號獲取對應的callback程序

package nettyinaction.nettyclient.channelpool.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import nettyinaction.nettyclient.channelpool.ChannelUtils;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();

        ByteBuf responseBuf = (ByteBuf)msg;
        responseBuf.markReaderIndex();

        int length = responseBuf.readInt();
        int seq = responseBuf.readInt();

        responseBuf.resetReaderIndex();

        //獲取消息對應的callback
        SocketClient.CallbackService callbackService = ChannelUtils.<SocketClient.CallbackService>removeCallback(channel, seq);
        callbackService.receiveMessage(responseBuf);
    }
}複製代碼

到此客戶端程序編寫完畢。至於服務端的代碼,則比較簡單,這裏直接貼上代碼。

package nettyinaction.nettyclient.channelpool.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import nettyinaction.nettyclient.channelpool.SelfDefineEncodeHandler;

public class SocketServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                           .channel(NioServerSocketChannel.class)
                           .handler(new LoggingHandler(LogLevel.INFO))
                           .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    pipeline.addLast(new SelfDefineEncodeHandler());
                                    pipeline.addLast(new BusinessServerHandler());
                                }
                           });

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

package nettyinaction.nettyclient.channelpool.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import nettyinaction.nettyclient.channelpool.ChannelUtils;

public class BusinessServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        ByteBuf buf = (ByteBuf)msg;
        //一、讀取消息長度
        int length = buf.readInt();

        //二、讀取消息序列號
        int seq = buf.readInt();

        //三、讀取消息頭部
        byte[] head = new byte[8];
        buf.readBytes(head);
        String headString = new String(head);

        //四、讀取消息體
        byte[] body = new byte[4];
        buf.readBytes(body);
        String bodyString = new String(body);

        //五、新創建一個緩存區,寫入內容,返回給客戶端
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf responseBuf = allocator.buffer(20);
        responseBuf.writeInt(ChannelUtils.MESSAGE_LENGTH);
        responseBuf.writeInt(seq);
        responseBuf.writeBytes(headString.getBytes());
        responseBuf.writeBytes(bodyString.getBytes());

        //六、將數據寫回到客戶端
        channel.writeAndFlush(responseBuf);
    }
}複製代碼

運行服務端代碼和客戶端代碼,指望的結果是

10個線程發送消息後,能從服務端獲取到正確的對應的返回信息,這些信息不會發生錯亂,各個線程都能拿到本身想要的結果,不會發生錯讀的狀況。

運行後的結果以下


Thread-3=9 Thread-3body,
Thread-4=8 Thread-4body,
Thread-5=5Thread-5body,
Thread-6=1Thread-6body,
Thread-7=3Thread-7body,
Thread-8=10Thread-8body,
Thread-9=4Thread-9body,
Thread-0=7Thread-0body,
Thread-1=6Thread-1body,
Thread-2=2Thread-2body


經過觀察結果,能夠知道10個線程併發獲取channel後,部分線程共享一個channel,可是10個線程能仍然能正確獲取到結果。


代碼細節解析

一、等待服務端的返回

因爲 channel.writeAndFlush是異步的,必須有一種機制來讓線程等待服務端返回結果。這裏採用最原始的wait和notify方法。當writeAndFlush調用後,馬上讓當前線程wait住,放置在callbackservice對象的等待列表中,當服務器端返回消息時,客戶端的SocketClientHandler類中的channelRead方法會被執行,解析完數據後,從channel的attr屬性中獲取DATA_MAP_ATTRIBUTEKEY 這個key對應的map。並根據解析出來的seq從map中獲取事先放置好的callbackservice對象,執行它的receiveMessage方法。將receiveBuf這個存放結果的緩存區對象賦值到callbackservice的result屬性中。並調用callbackservice對象的notify方法,喚醒wait在callbackservice對象的線程,讓其繼續往下執行。


二、產生消息序列號

int seq = IntegerFactory.getInstance().incrementAndGet();複製代碼

爲了演示的方便,這裏是產生單服務器全局惟一的序列號。若是請求量大的話,就算是AtomicInteger是CAS操做,也會產生不少的競爭。建議產生channel級別的惟一序列號,下降競爭。只要保證在一個channel內的消息的序列號是不重複的便可。

至於其餘的一些代碼細節,讀者能夠本身再細看。


原文連接

netty實戰-netty client鏈接池設計

相關文章
相關標籤/搜索