在上篇文章中大概介紹了 Java 中細粒度鎖的幾種實現方式,而且針對每種方式都作了優缺點說明,在使用的時候就須要根據業務需求選擇更合適的一種。上篇文章中的最後一種弱引用鎖的實現方式,我在裏面也說了其實還有更優雅的實現,其實也算不上更優雅,只是看起來更優雅,原理仍是同樣的,今天我打算用一篇文章的篇幅來好好說下。java
首先,咱們來再次回顧一下,這裏爲何能夠利用弱引用的特性拿掉分段鎖呢?分段鎖在這裏主要是爲了保證每次在建立和移除鎖時的線程安全,而採用了弱引用以後,咱們不須要每次建立以後都進行移除,由於當弱引用指向的對象引用被釋放以後 Java 會在下一次的 GC 將這弱引用指向的對象回收掉,在通過 GC 以後,當弱引用指向的對象被回收時,弱引用將會進入建立時指定的隊列,而後咱們經過隊列中的值來將這些存放在 Map 中的弱引用移除掉,因此咱們纔可以順利的拿掉分段鎖。spring
你注意看弱引用鎖的代碼實現,裏面在咱們獲取鎖的時候有個手動去清理 Map 中被回收的鎖的過程,若是你看過以前的 談談 Java 中的各類引用類型 這篇文章的話,你應該知道 Java 提供了一個 WeakHashMap 類,他是使用弱引用做爲 key,它在 GC 決定將弱引用所指向的 key 對象回收以後,會將當前保存的 entry 也自動移除,這個是怎麼實現的呢?安全
其實原理也是同樣的,利用弱引用指向的對象被回收時,弱引用將會進入建立時指定的隊列這一特性,而後經過輪詢隊列來移除元素。只不過將移除的操做徹底包裹在 WeakHashMap 類裏面了,你能夠看到裏面全部的 public 的增刪改查方法都直接或間接調用了expuntgeStaleEntries() 方法,而 expuntgeStaleEntries 方法中就是在輪詢隊列移除被回收的 key 所對應的元素。微信
private void expungeStaleEntries() { for (Object x; (x = queue.poll()) != null; ) { synchronized (queue) { @SuppressWarnings("unchecked") Entry<K,V> e = (Entry<K,V>) x; int i = indexFor(e.hash, table.length); Entry<K,V> prev = table[i]; Entry<K,V> p = prev; while (p != null) { Entry<K,V> next = p.next; if (p == e) { if (prev == e) table[i] = next; else prev.next = next; // Must not null out e.next; // stale entries may be in use by a HashIterator e.value = null; // Help GC size--; break; } prev = p; p = next; } } } }
既然 Java 已經給咱們提供了相應功能的類,那咱們是否是能夠在弱引用鎖的實現中直接使用 WeakHashMap 呢?這樣咱們就不用在獲取鎖的時候作手動移除的操做了,WeakHashMap 內部已經幫咱們作了。併發
但若是你稍微看一下 WeakHashMap 類的描述就能發現他不是線程安全的,在該類裏面有這樣一段描述:分佈式
Like most collection classes, this class is not synchronized. A synchronized {@code WeakHashMap} may be constructed using the {@link Collections#synchronizedMap Collections.synchronizedMap} method.
正由於如此,在弱引用的實現中才採用 ConcurrentHashMap 來保存鎖,只不過 ConcurrentHashMap 類沒有提供弱引用的實現,也就沒有提供自動爲咱們移除元素的功能,因此纔會在獲取鎖的時候作一個移除元素的操做,相信看到這裏你應該大概明白了使用弱引用做爲 key 的 WeakHashMap 是怎麼作到當弱引用被回收的時候自動把對應的元素給移除了。ide
那若是說按照上面描述裏面所說的經過 Collections 工具類的 synchronizedMap 方法來實現線程安全呢?先來看代碼實現:微服務
public class WeakHashLock<T> { public final Map<T, WeakReference<ReentrantLock>> weakHashMap = Collections.synchronizedMap(new WeakHashMap<>()); public ReentrantLock get(T key){ return this.weakHashMap.computeIfAbsent(key, lock -> new WeakReference<>(new ReentrantLock())).get(); } }
上面代碼中 WeakHashLock 類中只有一個 get 方法根據 key 獲取鎖對象,不存在的話建立一個新的鎖對象返回,看起來是否是很簡單,但不幸的是經過 Collections 工具類的 synchronizedMap 方法來實現的線程安全方式性能不是很好,爲何這麼說呢,咱們能夠看下 synchronizedMap 方法實現:工具
// synchronizedMap 方法實現 public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) { return new SynchronizedMap<>(m); } // SynchronizedMap 類構造方法 SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} }
從代碼實現能夠看出,synchronizedMap 方法會建立一個SynchronizedMap 實例返回,在該實例的構造方法中將本身賦值給用來同步的對象,而後 SynchronizedMap 類中的方法都使用該同步的對象進行同步,以至於咱們作的每個操做都須要進行同步,其實就至關於給 WeakHashMap 類中實例方法都加上了 synchronized 關鍵字,這種實現方式性能不免會大打折扣。性能
這種方式不可取的緣由主要是由於 WeakHashMap 不是線程安全的,那有沒有線程安全的而且實現了弱引用來保存元素的 Map 呢?固然上篇文章中的實現是一種方式,那若是也想像 WeakHashMap 同樣將這些移除的操做徹底封裝到 Map 類裏面呢。咱們能夠看下 org.springframework.util 包下的 ConcurrentReferenceHashMap 類,該類就很好的實現了咱們想要的效果,在該類的描述中就提到了這樣一段話:
This class can be used as an alternative to {@code Collections.synchronizedMap(new WeakHashMap<K, Reference<V>>())} in order to support better performance when accessed concurrently. This implementation follows the same design constraints as {@link ConcurrentHashMap} with the exception that {@code null} values and {@code null} keys are supported.
從描述中能夠看到 ConcurrentReferenceHashMap 類能夠用來替代使用 synchronizedMap 方法保證線程安全的 WeakHashMap 類,以便在併發訪問時提供更好的性能。那就來看下采用 ConcurrentReferenceHashMap 類的實現方式:
public class WeakHashLock<T> { private static final int DEFAULT_INITIAL_CAPACITY = 16; private static final float DEFAULT_LOAD_FACTOR = 0.75f; private static final int DEFAULT_CONCURRENCY_LEVEL = 16; private static final ConcurrentReferenceHashMap.ReferenceType DEFAULT_REFERENCE_TYPE = ConcurrentReferenceHashMap.ReferenceType.WEAK; private final ConcurrentReferenceHashMap<T, ReentrantLock> referenceHashMap; /** * Create mutex factory with default settings. */ public WeakHashLock() { this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_REFERENCE_TYPE); } public WeakHashLock(int concurrencyLevel, ConcurrentReferenceHashMap.ReferenceType referenceType) { this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, concurrencyLevel, referenceType); } public ReentrantLock get(T key) { return this.referenceHashMap.computeIfAbsent(key, lock -> new ReentrantLock()); } }
上面代碼實現一樣很是簡單,相比上面 WeakHashMap 的方式多了兩個構造方法而已,但不一樣於使用 synchronizedMap 方法來保證線程安全的方式,性能會提升不少。若是你感興趣的話能夠去看下這個類的內部實現,原理都是利用了弱引用的特性,只不過實現方式有點不一樣而已。
這裏我想要提醒兩點,一個是 ConcurrentReferenceHashMap 中默認的引用類型是軟引用。
private static final ReferenceType DEFAULT_REFERENCE_TYPE = ReferenceType.SOFT;
另一個要注意的是 ConcurrentReferenceHashMap 中有的方法返回的結果是 GC 以後但尚未清理被回收元素以前的結果,什麼意思呢,咱們來看一個示例:
ConcurrentReferenceHashMap<String, String> referenceHashMap = new ConcurrentReferenceHashMap<>(16, 0.75f, 1, ConcurrentReferenceHashMap.ReferenceType.WEAK); referenceHashMap.put("key", "value"); // 通過 GC 標記以後,弱引用已經進入建立時指定的隊列中,這時能夠去輪詢隊列移除元素了 System.gc(); // isEmpty 和 size 方法返回的結果是尚未移除元素的結果 System.out.println(referenceHashMap.isEmpty()); // false System.out.println(referenceHashMap.size()); // 1 // get 方法中調用了移除元素的方法 System.out.println(referenceHashMap.get("key")); // null System.out.println(referenceHashMap.isEmpty()); // true System.out.println(referenceHashMap.size()); // 0
上面測試結果能夠看到,在 GC 標記以後調用 isEmpty 和 size 方法獲得的返回結果都代表集合中是還有元素,而調用 get 方法獲得的倒是個 null,而後再調用 isEmpty 和 size 方法獲得的結果表示集合爲空,這實際上是由於前面兩個方法裏面沒有作移除元素的操做,而 get 方法是先作了一次移除元素而後再去獲取值,這裏提醒下這個細節問題,避免覺得 ConcurrentReferenceHashMap 沒有實現移除元素的功能。
好了,上面都是利用弱引用特性再配合 ReentrantLock 實現了細粒度鎖,這裏就再順便看下利用弱引用特性配合 synchronized 關鍵字的實現方式吧。一樣,原理是同樣,只不過從 ReentrantLock 再回到 synchronized,前面說了這麼多的原理,就再也不贅述了,直接看代碼實現吧:
// 用於同步的對象 public class Mutex<T> { private final T key; public Mutex(T key) { this.key = key; } public static <T> Mutex<T> of(T key) { return new Mutex<>(key); } public T getKey() { return key; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Mutex<?> xMutex = (Mutex<?>) o; return Objects.equals(key, xMutex.key); } @Override public int hashCode() { return Objects.hash(key); } }
public class MutexFactory<T> { private static final int DEFAULT_INITIAL_CAPACITY = 16; private static final float DEFAULT_LOAD_FACTOR = 0.75f; private static final int DEFAULT_CONCURRENCY_LEVEL = 16; private static final ConcurrentReferenceHashMap.ReferenceType DEFAULT_REFERENCE_TYPE = ConcurrentReferenceHashMap.ReferenceType.WEAK; private final ConcurrentReferenceHashMap<T, Mutex<T>> referenceHashMap; public MutexFactory() { this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_REFERENCE_TYPE); } public MutexFactory(int concurrencyLevel, ConcurrentReferenceHashMap.ReferenceType referenceType) { this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, concurrencyLevel, referenceType); } public Mutex<T> getMutex(T key) { return this.referenceHashMap.computeIfAbsent(key, Mutex::new); } // 提供強制移除已經被回收的弱引用元素 public void purgeUnreferenced() { this.referenceHashMap.purgeUnreferencedEntries(); } }
因爲咱們通常實現的細粒度基本上是基於用戶或者其餘的須要同步的對象,上面是經過構建一個互斥對象做爲 ConcurrentReferenceHashMap 的 value,而後咱們就可使用 synchronized 關鍵字來鎖定該 value 對象達到同步的效果,使用方式以下:
MutexFactory<String> mutexFactory = new MutexFactory<>(); public void save(String userId) throws InterruptedException { synchronized (mutexFactory.getMutex(userId)){ // do something } }
這種同步方式業務代碼看起來簡單些,對於一些簡單的需求就能夠直接使用這種方式,固然若是須要提供 API 級別的加鎖方式或者須要構建帶條件的加鎖方式那仍是使用 ReentrantLock。
對於加鎖這一塊雖說了這麼多,也許你已經打算採用這些方式去實現你想要的效果了,但是呢隨着微服務大行其道,一個系統每每啓動了好幾個實例,每一個實例對應一個 JVM 虛擬機,而咱們前面說的這些都是在只有一個虛擬機的前提下才有用,這就意味着咱們前面說的這些加鎖方式基本上已經派不上用場了。
那隨之而來的解決方案就是咱們常常聽到而且感受很高大上,卻不多用到的分佈式鎖了,這一塊我雖然使用過,也去查閱過相關資料,但我自認爲沒有徹底真正掌握底層的原理,還須要進一步的實踐,只好再找機會整理整理後再輸出了。
微信公衆號:rookiedev,Java 後臺開發,勵志終身學習,堅持原創乾貨輸出,你可選擇如今就關注我,或者看看歷史文章再關注也不遲。長按二維碼關注,咱們一塊兒努力變得更優秀!