java併發實戰:鏈接池實現

池化技術簡介

在咱們使用數據庫的過程當中,咱們每每使用數據庫鏈接池而不是直接使用數據庫鏈接進行操做,這是由於每個數據庫鏈接的建立和銷燬的代價是昂貴的,而池化技術則預先建立了資源,這些資源是可複用的,這樣就保證了在多用戶狀況下只能使用指定數目的資源,避免了一個用戶建立一個鏈接資源,形成程序運行開銷過大。java

鏈接池實現原理

這裏只實現一個簡易的鏈接池,更多複雜的需求可根據該鏈接池進行改進,該鏈接池主要參數以下:數據庫

  1. 一個繁忙隊列busy
  2. 一個空閒隊列idle
  3. 鏈接池最大活動鏈接數maxActive
  4. 鏈接池最大等待時間maxWait
  5. 鏈接池的活動鏈接數activeSize

程序流程圖以下:
圖片描述併發

代碼實現

泛型接口ConnectionPool.javaide

public interface ConnectionPool<T> {

    /**
     * 初始化池資源
     * @param maxActive 池中最大活動鏈接數
     * @param maxWait 最大等待時間
     */
    void init(Integer maxActive, Long maxWait);

    /**
     * 從池中獲取資源
     * @return 鏈接資源
     */
    T getResource() throws Exception;

    /**
     * 釋放鏈接
     * @param connection 正在使用的鏈接
     */
    void release(T connection) throws Exception;

    /**
     * 釋放鏈接池資源
     */
    void close();


}

以zookeeper爲例,實現zookeeper鏈接池,ZookeeperConnectionPool.java測試

public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> {
    //最大活動鏈接數
    private Integer maxActive; 
    //最大等待時間
    private Long maxWait; 
    //空閒隊列
    private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>();
    //繁忙隊列
    private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>();
    //鏈接池活動鏈接數
    private AtomicInteger activeSize = new AtomicInteger(0);
    //鏈接池關閉標記
    private AtomicBoolean isClosed = new AtomicBoolean(false);
    //總共獲取的鏈接記數
    private AtomicInteger createCount = new AtomicInteger(0);
    //等待zookeeper客戶端建立完成的計數器
    private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1));

    public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {
        this.init(maxActive, maxWait);
    }

    @Override
    public void init(Integer maxActive, Long maxWait) {
        this.maxActive = maxActive;
        this.maxWait = maxWait;
    }

    @Override
    public ZooKeeper getResource() throws Exception {
        ZooKeeper zooKeeper;
        Long nowTime = System.currentTimeMillis();
        final CountDownLatch countDownLatch = latchThreadLocal.get();
        
        //空閒隊列idle是否有鏈接
        if ((zooKeeper = idle.poll()) == null) {
            //判斷池中鏈接數是否小於maxActive
            if (activeSize.get() < maxActive) {
                //先增長池中鏈接數後判斷是否小於等於maxActive
                if (activeSize.incrementAndGet() <= maxActive) {
                    //建立zookeeper鏈接
                    zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {
                        if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {
                            countDownLatch.countDown();
                        }
                    });
                    countDownLatch.await();
                    System.out.println("Thread:" + Thread.currentThread().getId() + "獲取鏈接:" + createCount.incrementAndGet() + "條");
                    busy.offer(zooKeeper);
                    return zooKeeper;
                } else {
                    //如增長後發現大於maxActive則減去增長的
                    activeSize.decrementAndGet();
                }
            }
            //若活動線程已滿則等待busy隊列釋放鏈接
            try {
                System.out.println("Thread:" + Thread.currentThread().getId() + "等待獲取空閒資源");
                Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);
                zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new Exception("等待異常");
            }
            //判斷是否超時
            if (zooKeeper != null) {
                System.out.println("Thread:" + Thread.currentThread().getId() + "獲取鏈接:" + createCount.incrementAndGet() + "條");
                busy.offer(zooKeeper);
                return zooKeeper;
            } else {
                System.out.println("Thread:" + Thread.currentThread().getId() + "獲取鏈接超時,請重試!");
                throw new Exception("Thread:" + Thread.currentThread().getId() + "獲取鏈接超時,請重試!");
            }
        }
        //空閒隊列有鏈接,直接返回
        busy.offer(zooKeeper);
        return zooKeeper;
    }

    @Override
    public void release(ZooKeeper connection) throws Exception {
        if (connection == null) {
            System.out.println("connection 爲空");
            return;
        }
        if (busy.remove(connection)){
            idle.offer(connection);
        } else {
            activeSize.decrementAndGet();
            throw new Exception("釋放失敗");
        }
    }

    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            idle.forEach((zooKeeper) -> {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            busy.forEach((zooKeeper) -> {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

測試用例

這裏建立20個線程併發測試鏈接池,Test.javathis

public class Test {

    public static void main(String[] args) throws Exception {
        int threadCount = 20;
        Integer maxActive = 10;
        Long maxWait = 10000L;
        ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                countDownLatch.countDown();
                try {
                    countDownLatch.await();
                    ZooKeeper zooKeeper = pool.getResource();
                    Thread.sleep(2000);
                    pool.release(zooKeeper);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }).start();
        }
        while (true){

        }
    }
}
相關文章
相關標籤/搜索