在咱們使用數據庫的過程當中,咱們每每使用數據庫鏈接池而不是直接使用數據庫鏈接進行操做,這是由於每個數據庫鏈接的建立和銷燬的代價是昂貴的,而池化技術則預先建立了資源,這些資源是可複用的,這樣就保證了在多用戶狀況下只能使用指定數目的資源,避免了一個用戶建立一個鏈接資源,形成程序運行開銷過大。java
這裏只實現一個簡易的鏈接池,更多複雜的需求可根據該鏈接池進行改進,該鏈接池主要參數以下:數據庫
程序流程圖以下:
併發
泛型接口ConnectionPool.javaide
public interface ConnectionPool<T> { /** * 初始化池資源 * @param maxActive 池中最大活動鏈接數 * @param maxWait 最大等待時間 */ void init(Integer maxActive, Long maxWait); /** * 從池中獲取資源 * @return 鏈接資源 */ T getResource() throws Exception; /** * 釋放鏈接 * @param connection 正在使用的鏈接 */ void release(T connection) throws Exception; /** * 釋放鏈接池資源 */ void close(); }
以zookeeper爲例,實現zookeeper鏈接池,ZookeeperConnectionPool.java測試
public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> { //最大活動鏈接數 private Integer maxActive; //最大等待時間 private Long maxWait; //空閒隊列 private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>(); //繁忙隊列 private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>(); //鏈接池活動鏈接數 private AtomicInteger activeSize = new AtomicInteger(0); //鏈接池關閉標記 private AtomicBoolean isClosed = new AtomicBoolean(false); //總共獲取的鏈接記數 private AtomicInteger createCount = new AtomicInteger(0); //等待zookeeper客戶端建立完成的計數器 private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1)); public ZookeeperConnectionPool(Integer maxActive, Long maxWait) { this.init(maxActive, maxWait); } @Override public void init(Integer maxActive, Long maxWait) { this.maxActive = maxActive; this.maxWait = maxWait; } @Override public ZooKeeper getResource() throws Exception { ZooKeeper zooKeeper; Long nowTime = System.currentTimeMillis(); final CountDownLatch countDownLatch = latchThreadLocal.get(); //空閒隊列idle是否有鏈接 if ((zooKeeper = idle.poll()) == null) { //判斷池中鏈接數是否小於maxActive if (activeSize.get() < maxActive) { //先增長池中鏈接數後判斷是否小於等於maxActive if (activeSize.incrementAndGet() <= maxActive) { //建立zookeeper鏈接 zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> { if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } }); countDownLatch.await(); System.out.println("Thread:" + Thread.currentThread().getId() + "獲取鏈接:" + createCount.incrementAndGet() + "條"); busy.offer(zooKeeper); return zooKeeper; } else { //如增長後發現大於maxActive則減去增長的 activeSize.decrementAndGet(); } } //若活動線程已滿則等待busy隊列釋放鏈接 try { System.out.println("Thread:" + Thread.currentThread().getId() + "等待獲取空閒資源"); Long waitTime = maxWait - (System.currentTimeMillis() - nowTime); zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new Exception("等待異常"); } //判斷是否超時 if (zooKeeper != null) { System.out.println("Thread:" + Thread.currentThread().getId() + "獲取鏈接:" + createCount.incrementAndGet() + "條"); busy.offer(zooKeeper); return zooKeeper; } else { System.out.println("Thread:" + Thread.currentThread().getId() + "獲取鏈接超時,請重試!"); throw new Exception("Thread:" + Thread.currentThread().getId() + "獲取鏈接超時,請重試!"); } } //空閒隊列有鏈接,直接返回 busy.offer(zooKeeper); return zooKeeper; } @Override public void release(ZooKeeper connection) throws Exception { if (connection == null) { System.out.println("connection 爲空"); return; } if (busy.remove(connection)){ idle.offer(connection); } else { activeSize.decrementAndGet(); throw new Exception("釋放失敗"); } } @Override public void close() { if (isClosed.compareAndSet(false, true)) { idle.forEach((zooKeeper) -> { try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }); busy.forEach((zooKeeper) -> { try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
這裏建立20個線程併發測試鏈接池,Test.javathis
public class Test { public static void main(String[] args) throws Exception { int threadCount = 20; Integer maxActive = 10; Long maxWait = 10000L; ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait); CountDownLatch countDownLatch = new CountDownLatch(20); for (int i = 0; i < threadCount; i++) { new Thread(() -> { countDownLatch.countDown(); try { countDownLatch.await(); ZooKeeper zooKeeper = pool.getResource(); Thread.sleep(2000); pool.release(zooKeeper); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }).start(); } while (true){ } } }