Netty服務器鏈接池管理設計思路

應用場景:java

在RPC框架中,使用Netty做爲高性能的網絡通訊框架時,每一次服務調用,都須要與Netty服務端創建鏈接的話,很容易致使Netty服務器資源耗盡。因此,想到鏈接池技術,將與同一個Netty服務器地址創建的鏈接放入池中維護,同一個地址的鏈接確保只創建一次。這樣,凡是鏈接同一個Netty服務器的客戶端,拿到的都是同一個鏈接,不須要新建鏈接,就能夠大大減小鏈接的個數,從而大幅度提高服務器性能。安全

 

用一張圖說明一下設計思路:服務器

解釋一下,途中主要定義了兩個類,ConnectClient是負責管理鏈接的,是一個抽象類,init和send是抽象方法,而NettyClient是負責與Netty服務器通訊的,它繼承了ConnectClient,並實現了它的幾個抽象方法,init方法中會與Netty服務器創建鏈接,send方法會向Netty服務器發送消息。網絡

值得注意的是,ConnectClient類中有一個非抽象方法,就是asyncSend(),它裏面調用了本身的send()抽象方法。抽象方法不能直接調用,必須拿到NettyClient這個具體實現類的實例對象才能調。這個asyncSend()方法有一個關鍵的入參,即Class<? extends ConnectClient> netImpl,它是一個Class類型,是從外部傳進來的,因此這就比較有靈活性了,好處就是這個ConnectClient類不須要依賴任何具體的實現,只要傳進一個本身的子類的Class便可,它就能夠用這個Class經過反射的方式建立出具體的實現類的實例對象,而後調用其send方法。能夠理解成ConnectClient用asyncSend方法包裝了NettyClient的send方法,目的是讓外部不要直接調用NettyClient中的send方法,而是調用本身的asyncSend方法,而後在這個asyncSend方法中,會先獲取鏈接,再調用NettyClient中的send方法發送消息。多線程

 

模擬代碼:併發

下面就經過幾段代碼來模擬Tcp客戶端和Tcp服務器端創建鏈接併發送消息的場景。
這裏並無真的使用Netty框架,由於本文不是講怎麼使用Netty框架,而是分享如何管理鏈接。

 

首先,模擬一個Tcp服務端程序(就當作是Netty的服務器):框架

 1 /**
 2  * 模擬TCP服務端
 3  *
 4  * @author syj
 5  */
 6 public class NetChannel {
 7 
 8     /**
 9      * 創建鏈接
10      *
11      * @param host
12      * @param port
13      */
14     public void connect(String host, int port) {
15         System.out.println("模擬鏈接TCP服務器成功: host=" + host + ",port=" + port);
16     }
17 
18     /**
19      * 發送消息
20      *
21      * @param msg
22      */
23     public void send(String msg) {
24         System.out.println("模擬向TCP服務器發送消息成功:" + msg);
25     }
26 }

 

定義一個Tcp客戶端,負責與Netty服務器通訊:dom

 1 /**
 2  * 模擬TCP客戶端
 3  *
 4  * @author syj
 5  */
 6 public class NetClient extends ConnectClient {
 7 
 8     // 模擬TCP服務器
 9     private NetChannel channel;
10 
11     /**
12      * 創建鏈接
13      *
14      * @param address 格式 host:port, 例如 192.168.1.103:9999
15      * @throws Exception
16      */
17     @Override
18     public void init(String address) throws Exception {
19         if (address == null || address.trim().length() == 0) {
20             throw new RuntimeException(">>>> address error");
21         }
22         String[] split = address.split(":");
23         if (split.length != 2) {
24             throw new RuntimeException(">>>> address error");
25         }
26         String host = split[0];
27         int port = Integer.valueOf(split[1]);
28         channel = new NetChannel();
29         channel.connect(host, port);
30     }
31 
32     /**
33      * 發送消息
34      *
35      * @param msg
36      * @throws Exception
37      */
38     @Override
39     public void send(String msg) throws Exception {
40         channel.send(msg);
41     }
42 }

 

鏈接管理類:async

該類使用一個ConcurrentHashMap做爲鏈接池,來保存與TCP服務器創建的鏈接,key是TCP服務器的地址,value是鏈接對象。ide

因爲是多線程環境,爲保證線程安全問題,使用synchronized加鎖,避免一個鏈接被建立屢次。

因爲可能會有不少針對同一個TCP服務器的鏈接請求,使用lockClientMap來管理鎖,同一個TCP服務器的請求使用同一把鎖,保證同一個TCP服務器的鏈接只建立一次。

這樣既保證了線程安全,又能下降性能消耗。

 1 import java.util.concurrent.ConcurrentHashMap;
 2 
 3 /**
 4  * TCP鏈接管理
 5  *
 6  * @author syj
 7  */
 8 public abstract class ConnectClient {
 9 
10     /**
11      * 創建鏈接
12      *
13      * @param address
14      * @throws Exception
15      */
16     public abstract void init(String address) throws Exception;
17 
18     /**
19      * 發送消息
20      *
21      * @param msg
22      * @throws Exception
23      */
24     public abstract void send(String msg) throws Exception;
25 
26     /**
27      * 發送消息
28      *
29      * @param address
30      * @param msg
31      * @param netImpl
32      * @throws Exception
33      */
34     public static void asyncSend(String address, String msg, Class<? extends ConnectClient> netImpl) throws Exception {
35         ConnectClient connect = ConnectClient.getConnect(address, netImpl);
36         connect.send(msg);
37     }
38 
39     // 鏈接池
40     private static volatile ConcurrentHashMap<String, ConnectClient> connectClientMap;
41     //
42     private static volatile ConcurrentHashMap<String, Object> lockClientMap = new ConcurrentHashMap<>();
43 
44     /**
45      * 獲取鏈接
46      * 確保同一個TCP服務器地址對應的鏈接只創建一次
47      *
48      * @param netImpl
49      * @return
50      * @throws Exception
51      */
52     public static ConnectClient getConnect(String address, Class<? extends ConnectClient> netImpl) throws Exception {
53         // 建立鏈接池
54         if (connectClientMap == null) {
55             synchronized (ConnectClient.class) {
56                 if (connectClientMap == null) {
57                     connectClientMap = new ConcurrentHashMap<>();
58                 }
59             }
60         }
61 
62         // 獲取鏈接
63         ConnectClient connectClient = connectClientMap.get(address);
64         if (connectClient != null) {
65             return connectClient;
66         }
67 
68         // 獲取鎖,同一個地址使用同一把鎖
69         Object lock = lockClientMap.get(address);
70         if (lock == null) {
71             lockClientMap.putIfAbsent(address, new Object());
72             lock = lockClientMap.get(address);
73         }
74         synchronized (lock) {
75             connectClient = connectClientMap.get(address);
76             if (connectClient != null) {
77                 return connectClient;
78             }
79 
80             // 新建鏈接
81             ConnectClient client = netImpl.newInstance();
82             client.init(address);
83             // 放入鏈接池
84             connectClientMap.put(address, client);
85         }
86         connectClient = connectClientMap.get(address);
87         return connectClient;
88     }
89 }

 

任務類用於併發鏈接測試:

 1 import java.util.UUID;
 2 
 3 /**
 4  * 任務
 5  *
 6  * @author syj
 7  */
 8 public class Task implements Runnable {
 9 
10     private Class<? extends ConnectClient> netType;// 客戶端類型
11     private String address;
12     private long count;
13 
14     public Task(String address, long count, Class<? extends ConnectClient> netType) {
15         this.address = address;
16         this.count = count;
17         this.netType = netType;
18     }
19 
20     @Override
21     public void run() {
22         try {
23             String uuid = UUID.randomUUID().toString().replace("-", "");
24             String msg = String.format("%s \t %s \t %s \t %s", Thread.currentThread().getName(), count, address, uuid);
25             ConnectClient.asyncSend(address, msg, netType);
26             // 模擬業務耗時
27             Thread.sleep((long) (Math.random() * 1000));
28         } catch (Exception e) {
29             e.printStackTrace();
30         }
31     }
32 }

 

測試類(模擬了10個TCP服務器的地址和端口):

經過一個死循環來模擬測試高併發場景下,鏈接的線程安全和性能表現。

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 /**
 5  * 模擬TCP客戶端併發獲取鏈接發送消息
 6  *
 7  * @author syj
 8  */
 9 public class App {
10 
11     // TCP服務器通訊地址和端口
12     public static final String[] NET_ADDRESS_ARR = {
13             "192.168.1.101:9999",
14             "192.168.1.102:9999",
15             "192.168.1.103:9999",
16             "192.168.1.104:9999",
17             "192.168.1.105:9999",
18             "192.168.1.106:9999",
19             "192.168.1.107:9999",
20             "192.168.1.108:9999",
21             "192.168.1.109:9999",
22             "192.168.1.110:9999"
23     };
24 
25     public static ExecutorService executorService = Executors.newCachedThreadPool();
26     public static volatile long count;// 統計任務執行總數
27 
28     public static void main(String[] args) {
29         while (true) {
30             try {
31                 Thread.sleep(5);// 防止 CPU 100%
32             } catch (InterruptedException e) {
33                 e.printStackTrace();
34             }
35             executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class));
36             executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class));
37         }
38     }
39 }

 

測試結果:

 

 1 模擬鏈接TCP服務器成功: host=192.168.1.107,port=9999
 2 模擬向TCP服務器發送消息成功:pool-1-thread-14      14      192.168.1.107:9999      3f31022b959e4962b00b0719fa206416
 3 模擬鏈接TCP服務器成功: host=192.168.1.108,port=9999
 4 模擬向TCP服務器發送消息成功:pool-1-thread-37      37      192.168.1.108:9999      2e4e4c6db63145f190f76d1dbe59f1c4
 5 模擬鏈接TCP服務器成功: host=192.168.1.106,port=9999
 6 模擬向TCP服務器發送消息成功:pool-1-thread-49      49      192.168.1.106:9999      e50ea4937c1c4425b647e4606ced7a1f
 7 模擬鏈接TCP服務器成功: host=192.168.1.103,port=9999
 8 模擬向TCP服務器發送消息成功:pool-1-thread-17      17      192.168.1.103:9999      21cfcd3665aa4688aea0ac90b68e5a22
 9 模擬鏈接TCP服務器成功: host=192.168.1.102,port=9999
10 模擬向TCP服務器發送消息成功:pool-1-thread-25      25      192.168.1.102:9999      bbdbde3e28ab4ac0901c1447ac3ddd3f
11 模擬鏈接TCP服務器成功: host=192.168.1.101,port=9999
12 模擬向TCP服務器發送消息成功:pool-1-thread-10      10      192.168.1.101:9999      08cc445cc06a44f5823a8487d05e3e30
13 模擬鏈接TCP服務器成功: host=192.168.1.105,port=9999
14 模擬向TCP服務器發送消息成功:pool-1-thread-45      45      192.168.1.105:9999      3e925cf96b874ba09c59e63613e60662
15 模擬鏈接TCP服務器成功: host=192.168.1.104,port=9999
16 模擬向TCP服務器發送消息成功:pool-1-thread-53      53      192.168.1.104:9999      2408dab5c0ca480b8c2593311f3ec7d5
17 模擬向TCP服務器發送消息成功:pool-1-thread-13      13      192.168.1.105:9999      5a3c0f86046f4cb99986d0281e567e31
18 模擬向TCP服務器發送消息成功:pool-1-thread-36      36      192.168.1.107:9999      b85d9d79461d4345a2da8f8dd00a572a
19 模擬向TCP服務器發送消息成功:pool-1-thread-9      9      192.168.1.102:9999      c2895f68a33745d7a4370034b6474461
20 模擬向TCP服務器發送消息成功:pool-1-thread-41      41      192.168.1.102:9999      a303193a58204e7fadaf64cec8eaa86d
21 模擬向TCP服務器發送消息成功:pool-1-thread-59      59      192.168.1.101:9999      08785c0acfc14c618cf3762d35055e9b
22 模擬向TCP服務器發送消息成功:pool-1-thread-54      54      192.168.1.107:9999      6fa8e3939a904271b03b78204d4a146a
23 模擬向TCP服務器發送消息成功:pool-1-thread-15      15      192.168.1.102:9999      229989d1405b49cdb31052b081a33869
24 模擬向TCP服務器發送消息成功:pool-1-thread-7      7      192.168.1.107:9999      8e3c8d1007a34a01b166101fae30449c
25 模擬鏈接TCP服務器成功: host=192.168.1.109,port=9999
26 模擬向TCP服務器發送消息成功:pool-1-thread-8      8      192.168.1.109:9999      ca63dd93685641d19c875e4809e9a8dc
27 模擬向TCP服務器發送消息成功:pool-1-thread-1      1      192.168.1.106:9999      cd9f473797de46ef8361f3b8b0a6d575
28 模擬向TCP服務器發送消息成功:pool-1-thread-27      27      192.168.1.102:9999      872d825fd64e409d8b992e12e0372daa
29 模擬向TCP服務器發送消息成功:pool-1-thread-3      3      192.168.1.103:9999      baace7f8f06242f68cac0c43337e49cf
30 模擬向TCP服務器發送消息成功:pool-1-thread-39      39      192.168.1.108:9999      bc0d70348f574cbba449496b3142e518
31 模擬向TCP服務器發送消息成功:pool-1-thread-55      55      192.168.1.106:9999      95ba7c57a1d84c18a6ab328eb01e85f1
32 模擬向TCP服務器發送消息成功:pool-1-thread-38      38      192.168.1.108:9999      a571001c573c4851a4bb1e0dcb9a204a
33 模擬向TCP服務器發送消息成功:pool-1-thread-4      4      192.168.1.104:9999      dcdd6093afc345e39453883cf049fa21
34 模擬向TCP服務器發送消息成功:pool-1-thread-28      28      192.168.1.106:9999      0ba4362898f84335bb336d17780855fc
35 模擬向TCP服務器發送消息成功:pool-1-thread-47      47      192.168.1.108:9999      db993121a9934558942a09a9d9a8e03f
36 模擬向TCP服務器發送消息成功:pool-1-thread-30      30      192.168.1.102:9999      a0e50592deca471b9c5982c83d00f303
37 模擬鏈接TCP服務器成功: host=192.168.1.110,port=9999
38 模擬向TCP服務器發送消息成功:pool-1-thread-51      51      192.168.1.110:9999      41703aba37ca47148d23d6826264a05a
39 模擬向TCP服務器發送消息成功:pool-1-thread-5      5      192.168.1.102:9999      15f453cc0a7743f79dc105963f39f946
40 模擬向TCP服務器發送消息成功:pool-1-thread-52      52      192.168.1.105:9999      9ca521963bf84c418335e7702e471fa9
41 模擬向TCP服務器發送消息成功:pool-1-thread-40      40      192.168.1.101:9999      bec1d265b7dc46f5afebc42fea10a313
42 模擬向TCP服務器發送消息成功:pool-1-thread-26      26      192.168.1.104:9999      a44662dc498045e78eb531b6ee6fc27b
43 模擬向TCP服務器發送消息成功:pool-1-thread-11      11      192.168.1.109:9999      6104c4fd2dab4d44af86f0cd1e3e272d
44 模擬向TCP服務器發送消息成功:pool-1-thread-24      24      192.168.1.105:9999      344025da2a6c4190a87403c5d96b321e
45 模擬向TCP服務器發送消息成功:pool-1-thread-22      22      192.168.1.110:9999      aa0b4c48527446738d28e99eef4957f5
46 模擬向TCP服務器發送消息成功:pool-1-thread-23      23      192.168.1.107:9999      79f5fc4278164cd68ac1a260322e6f68
47 模擬向TCP服務器發送消息成功:pool-1-thread-56      56      192.168.1.109:9999      39c38939ced140058f25fe903a3b1f4f
48 模擬向TCP服務器發送消息成功:pool-1-thread-18      18      192.168.1.109:9999      c29ea09b5f264b488f3e15e91c5f2bd5

 

可見,與每一個TCP服務器的鏈接只會創建一次,鏈接獲得複用。

相關文章
相關標籤/搜索