HikariPool源碼(二)設計思想借鑑

Java極客  |  做者  /  鏗然一葉
這是Java極客的第 52 篇原創文章

1.利用Java併發工具而非synchronized來保證線程安全

synchronized是重量級的鎖,在HikariPool中沒有一處使用,都是經過Java併發工具類來解決線程安全問題。咱們來看一些例子:java

1.一、經過volatile關鍵字保證可見性

volatile關鍵字定義的變量並不能保證線程安全,但他能保證一個線程的修改對另一個線程當即可見。例如在PoolEntry和ConcurrentBag中都使用了volatile關鍵字。數據庫

1.二、使用JUC包下的Atomic類

例如:
1.ConcurrentBag中用AtomicInteger來記錄等待獲取鏈接的線程數量。緩存

2.HikariDataSource中用AtomicBoolean記錄數據源是否已經關閉。安全

3.在PoolEntry中用AtomicIntegerFieldUpdater來更新PoolEntry的狀態。併發

stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state");
複製代碼

這樣使得PoolEntry類的state屬性的更新能夠保證原子性。dom

4.在ConcurrentBag中使用CopyOnWriteArrayList來記錄數據庫鏈接異步

CopyOnWriteArrayList適用於讀多寫少的場景,讀取時不加鎖,寫時才加鎖,但這樣怎麼保證線程安全?ide

一般咱們設計一個資源池,會將未使用資源放入一個可用資源池中,若是池中還有資源就從池中取出,不然就等待或者超時報錯,直到有新的資源回收到資源池中。工具

獲取資源和釋放資源的代碼以下:

Resource resource = resourcePool.remove(); // 從池中獲取資源,池中資源數量減小
reourcePool.add(resource);  // 將資源釋放會池中,池中資源數量增長
複製代碼

爲了保證線程安全,這兩個方法均要用synchronized關鍵字修飾。post

而在HikariPool中對於可用資源不是直接經過資源池的資源數量來決定,而是經過資源的狀態來決定,資源定義了以下幾個狀態:

// 池化資源的狀態定義
      int STATE_NOT_IN_USE = 0;
      int STATE_IN_USE = 1;
      int STATE_REMOVED = -1;
      int STATE_RESERVED = -2;
複製代碼

在獲取資源時經過遍歷資源池並判斷資源狀態獲得可用資源:

//ConcurrentBag.java
      try {
         // 遍歷全部資源
         for (T bagEntry : sharedList) {  // 這裏非線程安全
            // 得到未使用資源並更新狀態爲可用
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // 這裏是線程安全的
               // If we may have stolen another waiter's connection, request another bag add.
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }
複製代碼

所以,雖然CopyOnWriteArrayList的讀操做非線程安全,但可經過AtomicIntegerFieldUpdater來保證對池中的資源PoolEntry在狀態更新時的線程安全,所以整個操做是線程安全的。

這樣就避免了對池資源的出池和入池加鎖,性能獲得提高。

二、對性能的追求

咱們經過如何獲取鏈接來看下HikariPool對性能的追求。

在上一節咱們已經說起了如何獲取資源,但實際的獲取過程還不只如此,HikariPool獲取資源的過程以下:

2.一、先從ThrodLocal變量中獲取

//ConcurrentBag.java
      // Try the thread-local list first
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }
複製代碼

咱們知道ThrodLocal變量的特色是該變量在同一個線程中可見,這樣可不須要經過方法參數傳遞變量,而且是線程安全的,而在一次業務操做中有可能屢次獲取數據庫鏈接(注意:多個鏈接意味着事務問題須要解決),這時HikariPool會將釋放的鏈接放入ThrodLocal變量中,當前線程若是要再次使用鏈接就能夠直接從ThrodLocal變量中獲取。

//ConcurrentBag.java
      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
複製代碼

2.二、從資源池中獲取

這一步前面已介紹,從資源池中遍歷資源,經過判斷資源狀態是否可用來獲取資源。

2.三、資源不足時獲取資源的方式

通常的,當資源不足時,若是沒有超過最大資源數限制,就會新建一個新資源並返回,而HikariPool不是,它的獲取過程以下:


1.獲取資源的線程獲取資源
2.發現資源不足,則會異步調用建立資源的線程去建立資源
3.而後等待資源返回

//ConcurrentBag.java

         // 異步調用建立資源線程建立資源,其中waiting是等待獲取資源的線程數
         listener.addBagItem(waiting);

         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            // 等待獲取資源
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);
複製代碼

4.建立資源線程異步建立資源,建立資源時會判斷是否有須要獲取資源的線程在等待資源,若是有才建立,不然就不建立

//HikariPool.java
      // connectionBag.getWaitingThreadCount() > 0 判斷有等待資源的線程纔會繼續建立資源
      private synchronized boolean shouldCreateAnotherConnection() {
         return getTotalConnections() < config.getMaximumPoolSize() &&
            (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
      }
複製代碼

5.其餘使用資源的線程使用完資源後,會釋放資源,這時資源池中有了可用資源,會分給等待線程使用

//ConcurrentBag.java
   // 使用資源的線程釋放資源
   public void requite(final T bagEntry) {
      bagEntry.setState(STATE_NOT_IN_USE);

      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
         else if ((i & 0xff) == 0xff) { // 0xff 是255, 每隔256進去一次
            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
            yield();
         }
      }

      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
   }
複製代碼

HikariPool這麼作的好處是:

  1. 圖中a和b操做誰先執行完就用誰的資源,大併發狀況下,也可能b比a快,這樣性能有提高。
  2. 若是b先執行完,等待線程獲取到資源後,若是沒有新的等待線程,a就不會建立新資源,這樣就節省了一個資源,少了佔用鏈接,也節省了內存。

以上這個巧妙的處理方式藉助了SynchronousQueue來實現,咱們能夠模擬下這種處理方式:

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueTest {
    // 入參爲true,公平鎖,保證FIFO
    private SynchronousQueue<PoolEntry> queue = new SynchronousQueue(true);

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueueTest queueTest = new SynchronousQueueTest();
        queueTest.execute();
    }

    public void execute() {
        // 模擬生產者建立資源
        new Producer("Producer-generate-poolentry", queue, 2000).start();
        // 模擬其餘消費者釋放資源
        new Producer("OtherConsumer-release-poolentry", queue, 5000).start();
        // 等待上面兩個線程啓動
        sleep();
        // 模擬消費者
        new Consumer(queue).start();
    }

    private void sleep() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException E) {
            Thread.currentThread().interrupt();
        }
    }
}

// 用來模擬生產者和釋放資源的消費者
class Producer extends Thread {
    private SynchronousQueue<PoolEntry> queue;

    // 模擬執行耗時
    private long executeCostTimeMillis;

    public Producer(String name, SynchronousQueue queue, long executeCostTimeMillis) {
        this.queue = queue;
        setName(name);
        this.executeCostTimeMillis = executeCostTimeMillis;
    }


    @Override
    public void run() {
        try {
            while(true) {
                int random = (int) (Math.random()*10);
                PoolEntry poolEntry = new PoolEntry(random);
                System.out.println(Thread.currentThread().getName() + ", " + poolEntry.toString());
                // 資源入隊
                while(!queue.offer(poolEntry)) {
                    yield();
                }
                Thread.sleep(executeCostTimeMillis);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread {
    private SynchronousQueue<PoolEntry> queue;

    public Consumer(SynchronousQueue queue) {
        this.queue = queue;
        setName("Consumer");
    }

    @Override
    public void run() {
        try {
            while(true) {
                long timeout = 200;
                // 資源出隊
                PoolEntry poolEntry = queue.poll(timeout, TimeUnit.MILLISECONDS);
                if (poolEntry != null) {
                    System.out.println(Thread.currentThread().getName() + ", " + poolEntry.toString());
                } else {
// System.out.println("queue is null.");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// 資源類
class PoolEntry {
    int num;
    public PoolEntry(int i) {
        this.num = i;
    }

    @Override
    public String toString() {
        return "PoolEntry instance " + num;
    }
}
複製代碼

輸出:

Producer-generate-poolentry, PoolEntry instance 4
OtherConsumer-release-poolentry, PoolEntry instance 5
Consumer, PoolEntry instance 5
Consumer, PoolEntry instance 4
Producer-generate-poolentry, PoolEntry instance 8
Consumer, PoolEntry instance 8
Producer-generate-poolentry, PoolEntry instance 7
Consumer, PoolEntry instance 7
OtherConsumer-release-poolentry, PoolEntry instance 6
Consumer, PoolEntry instance 6
Producer-generate-poolentry, PoolEntry instance 6
Consumer, PoolEntry instance 6
Producer-generate-poolentry, PoolEntry instance 2
Consumer, PoolEntry instance 2
OtherConsumer-release-poolentry, PoolEntry instance 1
Consumer, PoolEntry instance 1
Producer-generate-poolentry, PoolEntry instance 2
Consumer, PoolEntry instance 2
Producer-generate-poolentry, PoolEntry instance 3
Consumer, PoolEntry instance 3
Producer-generate-poolentry, PoolEntry instance 7
Consumer, PoolEntry instance 7
OtherConsumer-release-poolentry, PoolEntry instance 9
Consumer, PoolEntry instance 9
Producer-generate-poolentry, PoolEntry instance 7
Consumer, PoolEntry instance 7
複製代碼

能夠看出:
1.生產者和其餘消費者誰先把資源入隊,消費者就先使用哪一個資源

2.沒有可用資源,消費者會一直等待

在使用池化資源大併發場景下,又追求極致性能時,這種處理方式值得借鑑。

三、使用弱引用節省內存

弱引用在調用垃圾回收後會被釋放,對於經過ThreadLocal變量緩存的資源,爲了不線程生命週期結束後資源不被及時回收,使用了弱引用來存儲資源,這樣當內存不足,調用GC操做時就會被回收,減小內存佔用。

//ConcurrentBag.java
      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
複製代碼

四、使用空方法使得代碼處理邏輯統一

實現類使用空方法使得處理邏輯統一,不須要添加if判斷來處理。相似編碼規範中對於返回一個集合的方法,建議不要返回NULL,而返回一個大小爲0的集合,這樣外部處理邏輯統一,不須要額外增長爲NULL的判斷,或者引發空指針異常。

4.一、ProxyLeakTask

4.1.一、空方法實現類

//ProxyLeakTask.java
   static
   {
      // 不須要監控鏈接泄露的ProxyLeakTask的實現類
      NO_LEAK = new ProxyLeakTask() {
         @Override
         void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}

         @Override
         public void run() {}  // 默認啥都不作

         @Override
         public void cancel() {} // 默認啥都不作
      };
   }
複製代碼

4.1.二、實例化

//ProxyLeakTaskFactory.java
   ProxyLeakTask schedule(final PoolEntry poolEntry) {
      // 根據配置來建立不一樣的代理泄露監控類
      return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry);
   }
複製代碼

4.1.三、調用點

//ProxyLeakTaskFactory.java
   private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
      ProxyLeakTask task = new ProxyLeakTask(poolEntry);
      // 這裏就不用加是否爲NULL的判斷
      task.schedule(executorService, leakDetectionThreshold);

      return task;
   }
複製代碼

五、總結

  1. 充分利用JUC工具解決併發問題和提高性能。
  2. 池化資源能夠經過資源狀態來獲取可用資源,而不須要經過idle池的出隊,入隊來獲取,減小鎖的使用,提升性能。
  3. 在對使用內存有嚴格要求時,例如低端機不能佔用過多內存時,使用好弱引用,軟引用。
  4. 極致性能要考慮不少細節,如文中獲取資源的例子,通常狀況下不會想這麼細。
  5. 使用空方法實現,來統一外部處理邏輯。

end.


<--感謝三連擊,左邊點贊和關注。


相關閱讀:
HikariPool源碼(一)初識
HikariPool源碼(三)資源池動態伸縮
HikariPool源碼(四)資源狀態
HikariPool源碼(五)工做線程以及相關工具類
HikariPool源碼(六)使用到的一些有用JAVA特性
Google guava源碼之EventBus
JAVA基礎(二)內存優化-使用Java引用作緩存


Java極客站點: javageektour.com/

相關文章
相關標籤/搜索