【追光者系列】HikariCP源碼分析之ConcurrentBag&J.U.C SynchronousQueue、CopyOnWriteArrayList

摘要: 原創出處 微信公衆號「工匠小豬豬的技術世界」歡迎轉載,保留摘要,謝謝!html

1.這是一個系列,有興趣的朋友能夠持續關注
2.若是你有HikariCP使用上的問題,能夠給我留言,咱們一塊兒溝通討論
3.但願你們能夠提供我一些案例,我也但願能夠支持大家作一些調優node


🙂🙂🙂關注**微信公衆號:【工匠小豬豬的技術世界】**有福利:數據庫

  1. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  2. 新的源碼解析文章實時收到通知。每兩週更新一篇左右

ConcurrentBag的定義

HikariCP contains a custom lock-free collection called a ConcurrentBag. The idea was borrowed from the C# .NET ConcurrentBag class, but the internal implementation quite different. The ConcurrentBag provides...數組

  • A lock-free design
  • ThreadLocal caching
  • Queue-stealing
  • Direct hand-off optimizations

...resulting in a high degree of concurrency, extremely low latency, and minimized occurrences of false-sharing.緩存

https://en.wikipedia.org/wiki/False_sharing安全

  • CopyOnWriteArrayList:負責存放ConcurrentBag中所有用於出借的資源
  • ThreadLocal:用於加速線程本地化資源訪問
  • SynchronousQueue:用於存在資源等待線程時的第一手資源交接

ConcurrentBag取名來源於C# .NET的同名類,可是實現卻不同。它是一個lock-free集合,在鏈接池(多線程數據交互)的實現上具備比LinkedBlockingQueue和LinkedTransferQueue更優越的併發讀寫性能。bash

ConcurrentBag源碼解析

ConcurrentBag內部同時使用了ThreadLocal和CopyOnWriteArrayList來存儲元素,其中CopyOnWriteArrayList是線程共享的。ConcurrentBag採用了queue-stealing的機制獲取元素:首先嚐試從ThreadLocal中獲取屬於當前線程的元素來避免鎖競爭,若是沒有可用元素則掃描公共集合、再次從共享的CopyOnWriteArrayList中獲取。(ThreadLocal列表中沒有被使用的items在借用線程沒有屬於本身的時候,是能夠被「竊取」的) ThreadLocal和CopyOnWriteArrayList在ConcurrentBag中都是成員變量,線程間不共享,避免了僞共享(false sharing)的發生。 其使用專門的AbstractQueuedLongSynchronizer來管理跨線程信號,這是一個"lock-less「的實現。 這裏要特別注意的是,ConcurrentBag中經過borrow方法進行數據資源借用,經過requite方法進行資源回收,注意其中borrow方法只提供對象引用,不移除對象。因此從bag中「借用」的items實際上並無從任何集合中刪除,所以即便引用廢棄了,垃圾收集也不會發生。所以使用時經過borrow取出的對象必須經過requite方法進行放回,不然會致使內存泄露,只有"remove"方法才能徹底從bag中刪除一個對象。微信

好了,咱們一塊兒看一下ConcurrentBag源碼概覽:多線程

上節提過,CopyOnWriteArrayList負責存放ConcurrentBag中所有用於出借的資源,就是private final CopyOnWriteArrayList sharedList; 以下圖所示,sharedList中的資源經過add方法添加,remove方法出借併發

add方法向bag中添加bagEntry對象,讓別人能夠借用

/**
    * Add a new object to the bag for others to borrow.
    *
    * @param bagEntry an object to add to the bag
    */
   public void add(final T bagEntry)
   {
      if (closed) {
         LOGGER.info("ConcurrentBag has been closed, ignoring add()");
         throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
      }


      sharedList.add(bagEntry);//新添加的資源優先放入CopyOnWriteArrayList


      // spin until a thread takes it or none are waiting
      // 當有等待資源的線程時,將資源交到某個等待線程後才返回(SynchronousQueue)
      while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
         yield();
      }
   }

複製代碼

remove方法用來從bag中刪除一個bageEntry,該方法只能在borrow(long, TimeUnit)和reserve(T)時被使用

/**
    * Remove a value from the bag.  This method should only be called
    * with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
    *
    * @param bagEntry the value to remove
    * @return true if the entry was removed, false otherwise
    * @throws IllegalStateException if an attempt is made to remove an object
    *         from the bag that was not borrowed or reserved first
    */
   public boolean remove(final T bagEntry)
   {
   // 若是資源正在使用且沒法進行狀態切換,則返回失敗
      if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
         return false;
      }

      final boolean removed = sharedList.remove(bagEntry);// 從CopyOnWriteArrayList中移出
      if (!removed && !closed) {
         LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
      }

      return removed;
   }
複製代碼

ConcurrentBag中經過borrow方法進行數據資源借用

/**
    * The method will borrow a BagEntry from the bag, blocking for the
    * specified timeout if none are available.
    *
    * @param timeout how long to wait before giving up, in units of unit
    * @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
    * @return a borrowed instance from the bag or null if a timeout occurs
    * @throws InterruptedException if interrupted while waiting
    */
   public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // Try the thread-local list first
      // 優先查看有沒有可用的本地化的資源
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }

      // Otherwise, scan the shared list ... then poll the handoff queue
      final int waiting = waiters.incrementAndGet();
      try {
      // 當無可用本地化資源時,遍歷所有資源,查看是否存在可用資源
      // 所以被一個線程本地化的資源也可能被另外一個線程「搶走」
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               // If we may have stolen another waiter's connection, request another bag add. if (waiting > 1) { // 由於可能「搶走」了其餘線程的資源,所以提醒包裹進行資源添加 listener.addBagItem(waiting - 1); } return bagEntry; } } listener.addBagItem(waiting); timeout = timeUnit.toNanos(timeout); do { final long start = currentTime(); // 當現有所有資源所有在使用中,等待一個被釋放的資源或者一個新資源 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000); return null; } finally { waiters.decrementAndGet(); } } 複製代碼
/**
    * This method will return a borrowed object to the bag.  Objects
    * that are borrowed from the bag but never "requited" will result
    * in a memory leak.
    *
    * @param bagEntry the value to return to the bag
    * @throws NullPointerException if value is null
    * @throws IllegalStateException if the bagEntry was not borrowed from the bag
    */
   public void requite(final T bagEntry)
   {
   // 將狀態轉爲未在使用
      bagEntry.setState(STATE_NOT_IN_USE);

// 判斷是否存在等待線程,若存在,則直接轉手資源
      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
         else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
            yield();
         }
      }

 // 不然,進行資源本地化
      final List<Object> threadLocalList = threadList.get();
      threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
複製代碼

上述代碼中的 weakThreadLocals 是用來判斷是否使用弱引用,經過下述方法初始化:

/**
    * Determine whether to use WeakReferences based on whether there is a
    * custom ClassLoader implementation sitting between this class and the
    * System ClassLoader.
    *
    * @return true if we should use WeakReferences in our ThreadLocals, false otherwise
    */
   private boolean useWeakThreadLocals()
   {
      try {
      // 人工指定是否使用弱引用,可是官方不推薦進行自主設置。
         if (System.getProperty("com.zaxxer.hikari.useWeakReferences") != null) {   // undocumented manual override of WeakReference behavior
            return Boolean.getBoolean("com.zaxxer.hikari.useWeakReferences");
         }

// 默認經過判斷初始化的ClassLoader是不是系統的ClassLoader來肯定
         return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
      }
      catch (SecurityException se) {
         return true;
      }
   }
複製代碼

SynchronousQueue

SynchronousQueue主要用於存在資源等待線程時的第一手資源交接,以下圖所示:

在hikariCP中,選擇的是公平模式 this.handoffQueue = new SynchronousQueue<>(true);

公平模式總結下來就是:隊尾匹配隊頭出隊,先進先出,體現公平原則。

SynchronousQueue是一個是一個無存儲空間的阻塞隊列(是實現newFixedThreadPool的核心),很是適合作交換工做,生產者的線程和消費者的線程同步以傳遞某些信息、事件或者任務。 由於是無存儲空間的,因此與其餘阻塞隊列實現不一樣的是,這個阻塞peek方法直接返回null,無任何其餘操做,其餘的方法與阻塞隊列的其餘方法一致。這個隊列的特色是,必須先調用take或者poll方法,才能使用off,add方法。

做爲BlockingQueue中的一員,SynchronousQueue與其餘BlockingQueue有着不一樣特性(來自明姐http://cmsblogs.com/?p=2418):

  • SynchronousQueue沒有容量。與其餘BlockingQueue不一樣,SynchronousQueue是一個不存儲元素的BlockingQueue。每個put操做必需要等待一個take操做,不然不能繼續添加元素,反之亦然。
  • 由於沒有容量,因此對應 peek, contains, clear, isEmpty ... 等方法實際上是無效的。例如clear是不執行任何操做的,contains始終返回false,peek始終返回null。
  • SynchronousQueue分爲公平和非公平,默認狀況下采用非公平性訪問策略,固然也能夠經過構造函數來設置爲公平性訪問策略(爲true便可)。
  • 若使用 TransferQueue, 則隊列中永遠會存在一個 dummy node。

SynchronousQueue提供了兩個構造函數:

public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        // 經過 fair 值來決定公平性和非公平性
        // 公平性使用TransferQueue,非公平性採用TransferStack
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
複製代碼

TransferQueue、TransferStack繼承Transferer,Transferer爲SynchronousQueue的內部類,它提供了一個方法transfer(),該方法定義了轉移數據的規範

abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }
複製代碼

transfer()方法主要用來完成轉移數據的,若是e != null,至關於將一個數據交給消費者,若是e == null,則至關於從一個生產者接收一個消費者交出的數據。

SynchronousQueue採用隊列TransferQueue來實現公平性策略,採用堆棧TransferStack來實現非公平性策略,他們兩種都是經過鏈表實現的,其節點分別爲QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演着很是重要的做用,SynchronousQueue的put、take操做都是委託這兩個類來實現的。

公平模式

公平模式底層使用的TransferQueue內部隊列,一個head和tail指針,用於指向當前正在等待匹配的線程節點。 (來自https://blog.csdn.net/yanyan19880509/article/details/52562039) 初始化時,TransferQueue的狀態以下:

接着咱們進行一些操做:

一、線程put1執行 put(1)操做,因爲當前沒有配對的消費線程,因此put1線程入隊列,自旋一小會後睡眠等待,這時隊列狀態以下:

二、接着,線程put2執行了put(2)操做,跟前面同樣,put2線程入隊列,自旋一小會後睡眠等待,這時隊列狀態以下:

三、這時候,來了一個線程take1,執行了 take操做,因爲tail指向put2線程,put2線程跟take1線程配對了(一put一take),這時take1線程不須要入隊,可是請注意了,這時候,要喚醒的線程並非put2,而是put1。爲什麼? 你們應該知道咱們如今講的是公平策略,所謂公平就是誰先入隊了,誰就優先被喚醒,咱們的例子明顯是put1應該優先被喚醒。至於讀者可能會有一個疑問,明明是take1線程跟put2線程匹配上了,結果是put1線程被喚醒消費,怎麼確保take1線程必定能夠和次首節點(head.next)也是匹配的呢?其實你們能夠拿個紙畫一畫,就會發現真的就是這樣的。 公平策略總結下來就是:隊尾匹配隊頭出隊。 執行後put1線程被喚醒,take1線程的 take()方法返回了1(put1線程的數據),這樣就實現了線程間的一對一通訊,這時候內部狀態以下:

四、最後,再來一個線程take2,執行take操做,這時候只有put2線程在等候,並且兩個線程匹配上了,線程put2被喚醒, take2線程take操做返回了2(線程put2的數據),這時候隊列又回到了起點,以下所示:

以上即是公平模式下,SynchronousQueue的實現模型。總結下來就是:隊尾匹配隊頭出隊,先進先出,體現公平原則。

非公平模式

仍是使用跟公平模式下同樣的操做流程,對比兩種策略下有何不一樣。非公平模式底層的實現使用的是TransferStack, 一個棧,實現中用head指針指向棧頂,接着咱們看看它的實現模型:

一、線程put1執行 put(1)操做,因爲當前沒有配對的消費線程,因此put1線程入棧,自旋一小會後睡眠等待,這時棧狀態以下:

二、接着,線程put2再次執行了put(2)操做,跟前面同樣,put2線程入棧,自旋一小會後睡眠等待,這時棧狀態以下:

三、這時候,來了一個線程take1,執行了take操做,這時候發現棧頂爲put2線程,匹配成功,可是實現會先把take1線程入棧,而後take1線程循環執行匹配put2線程邏輯,一旦發現沒有併發衝突,就會把棧頂指針直接指向 put1線程

四、最後,再來一個線程take2,執行take操做,這跟步驟3的邏輯基本是一致的,take2線程入棧,而後在循環中匹配put1線程,最終所有匹配完畢,棧變爲空,恢復初始狀態,以下圖所示:

從上面流程看出,雖然put1線程先入棧了,可是倒是後匹配,這就是非公平的由來。

CopyOnWriteArrayList

CopyOnWriteArrayList負責存放ConcurrentBag中所有用於出借的資源。(引自http://www.importnew.com/25034.html)

CopyOnWriteArrayList,顧名思義,Write的時候老是要Copy,也就是說對於任何可變的操做(add、set、remove)都是伴隨複製這個動做的,是ArrayList 的一個線程安全的變體。

A thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads. The "snapshot" style iterator method uses a reference to the state of the array at the point that the iterator was created. This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException. The iterator will not reflect additions, removals, or changes to the list since the iterator was created. Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException. All elements are permitted, including null.

CopyOnWriteArrayList的add操做的源代碼以下:

public boolean add(E e) {
    //一、先加鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //二、拷貝數組
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        //三、將元素加入到新數組中
        newElements[len] = e;
        //四、將array引用指向到新數組
        setArray(newElements);
        return true;
    } finally {
       //五、解鎖
        lock.unlock();
    }
}
複製代碼

一次add大體經歷了幾個步驟:

一、加鎖
二、拿到原數組,獲得新數組的大小(原數組大小+1),實例化出一個新的數組來
三、把原數組的元素複製到新數組中去
四、新數組最後一個位置設置爲待添加的元素(由於新數組的大小是按照原數組大小+1來的)
五、把Object array引用指向新數組
六、解鎖

插入、刪除、修改操做也都是同樣,每一次的操做都是以對Object[] array進行一次複製爲基礎的

因爲全部的寫操做都是在新數組進行的,這個時候若是有線程併發的寫,則經過鎖來控制,若是有線程併發的讀,則分幾種狀況:
一、若是寫操做未完成,那麼直接讀取原數組的數據;
二、若是寫操做完成,可是引用還未指向新數組,那麼也是讀取原數組數據;
三、若是寫操做完成,而且引用已經指向了新的數組,那麼直接重新數組中讀取數據。

可見,CopyOnWriteArrayList的讀操做是能夠不用加鎖的。

經常使用的List有ArrayList、LinkedList、Vector,其中前兩個是線程非安全的,最後一個是線程安全的。Vector雖然是線程安全的,可是隻是一種相對的線程安全而不是絕對的線程安全,它只可以保證增、刪、改、查的單個操做必定是原子的,不會被打斷,可是若是組合起來用,並不能保證線程安全性。好比就像上面的線程1在遍歷一個Vector中的元素、線程2在刪除一個Vector中的元素同樣,勢必產生併發修改異常,也就是fail-fast。

因此這就是選擇CopyOnWriteArrayList這個併發組件的緣由,CopyOnWriteArrayList如何作到線程安全的呢?

CopyOnWriteArrayList使用了一種叫寫時複製的方法,當有新元素添加到CopyOnWriteArrayList時,先從原有的數組中拷貝一份出來,而後在新的數組作寫操做,寫完以後,再將原來的數組引用指向到新數組。

當有新元素加入的時候,以下圖,建立新數組,並往新數組中加入一個新元素,這個時候,array這個引用仍然是指向原數組的。

當元素在新數組添加成功後,將array這個引用指向新數組。

CopyOnWriteArrayList的整個add操做都是在鎖的保護下進行的。 這樣作是爲了不在多線程併發add的時候,複製出多個副本出來,把數據搞亂了,致使最終的數組數據不是咱們指望的。

CopyOnWriteArrayList反映的是三個十分重要的分佈式理念:

1)讀寫分離
咱們讀取CopyOnWriteArrayList的時候讀取的是CopyOnWriteArrayList中的Object[] array,可是修改的時候,操做的是一個新的Object[] array,讀和寫操做的不是同一個對象,這就是讀寫分離。這種技術數據庫用的很是多,在高併發下爲了緩解數據庫的壓力,即便作了緩存也要對數據庫作讀寫分離,讀的時候使用讀庫,寫的時候使用寫庫,而後讀庫、寫庫之間進行必定的同步,這樣就避免同一個庫上讀、寫的IO操做太多
2)最終一致
對CopyOnWriteArrayList來講,線程1讀取集合裏面的數據,未必是最新的數據。由於線程二、線程三、線程4四個線程都修改了CopyOnWriteArrayList裏面的數據,可是線程1拿到的仍是最老的那個Object[] array,新添加進去的數據並無,因此線程1讀取的內容未必準確。不過這些數據雖然對於線程1是不一致的,可是對於以後的線程必定是一致的,它們拿到的Object[] array必定是三個線程都操做完畢以後的Object array[],這就是最終一致。最終一致對於分佈式系統也很是重要,它經過容忍必定時間的數據不一致,提高整個分佈式系統的可用性與分區容錯性。固然,最終一致並非任何場景都適用的,像火車站售票這種系統用戶對於數據的實時性要求很是很是高,就必須作成強一致性的。
3)使用另外開闢空間的思路,來解決併發衝突

缺點:
一、由於CopyOnWrite的寫時複製機制,因此在進行寫操做的時候,內存裏會同時駐紮兩個對象的內存,舊的對象和新寫入的對象(注意:在複製的時候只是複製容器裏的引用,只是在寫的時候會建立新對象添加到新容器裏,而舊容器的對象還在使用,因此有兩份對象內存)。若是這些對象佔用的內存比較大,好比說200M左右,那麼再寫入100M數據進去,內存就會佔用300M,那麼這個時候頗有可能形成頻繁的Yong GC和Full GC。以前某系統中使用了一個服務因爲每晚使用CopyOnWrite機制更新大對象,形成了每晚15秒的Full GC,應用響應時間也隨之變長。針對內存佔用問題,能夠經過壓縮容器中的元素的方法來減小大對象的內存消耗,好比,若是元素全是10進制的數字,能夠考慮把它壓縮成36進制或64進制。或者不使用CopyOnWrite容器,而使用其餘的併發容器,如ConcurrentHashMap。
二、不能用於實時讀的場景,像拷貝數組、新增元素都須要時間,因此調用一個set操做後,讀取到數據可能仍是舊的,雖CopyOnWriteArrayList 能作到最終一致性,可是仍是無法知足實時性要求;
3.數據一致性問題。CopyOnWrite容器只能保證數據的最終一致性,不能保證數據的實時一致性。因此若是你但願寫入的的數據,立刻能讀到,請不要使用CopyOnWrite容器。關於C++的STL中,曾經也有過Copy-On-Write的玩法,參見陳皓的《C++ STL String類中的Copy-On-Write》,後來,由於有不少線程安全上的事,就被去掉了。https://blog.csdn.net/haoel/article/details/24058

隨着CopyOnWriteArrayList中元素的增長,CopyOnWriteArrayList的修改代價將愈來愈昂貴,所以,CopyOnWriteArrayList 合適讀多寫少的場景,不過這類慎用 由於誰也無法保證CopyOnWriteArrayList 到底要放置多少數據,萬一數據稍微有點多,每次add/set都要從新複製數組,這個代價實在過高昂了。在高性能的互聯網應用中,這種操做分分鐘引發故障。**CopyOnWriteArrayList適用於讀操做遠多於修改操做的併發場景中。**而HikariCP就是這種場景。

還有好比白名單,黑名單,商品類目的訪問和更新場景,假如咱們有一個搜索網站,用戶在這個網站的搜索框中,輸入關鍵字搜索內容,可是某些關鍵字不容許被搜索。這些不能被搜索的關鍵字會被放在一個黑名單當中,黑名單天天晚上更新一次。當用戶搜索時,會檢查當前關鍵字在不在黑名單當中,若是在,則提示不能搜索。

可是使用CopyOnWriteMap須要注意兩件事情:

  1. 減小擴容開銷。根據實際須要,初始化CopyOnWriteMap的大小,避免寫時CopyOnWriteMap擴容的開銷。

  2. 使用批量添加。由於每次添加,容器每次都會進行復制,因此減小添加次數,能夠減小容器的複製次數。

參考資料

http://www.cnblogs.com/taisenki/p/7699667.html http://cmsblogs.com/?p=2418 https://blog.csdn.net/yanyan19880509/article/details/52562039 http://www.importnew.com/25034.html https://blog.csdn.net/linsongbin1/article/details/54581787

相關文章
相關標籤/搜索