Java 併發實踐 — ConcurrentHashMap 與 CAS

最近在作接口限流時涉及到了一個有意思問題,牽扯出了關於concurrentHashMap的一些用法,以及CAS的一些概念。限流算法不少,我主要就以最簡單的計數器法來作引。先抽象化一下需求:統計每一個接口訪問的次數。一個接口對應一個url,也就是一個字符串,每調用一次對其進行加一處理。可能出現的問題主要有三個:html

  1. 多線程訪問,須要選擇合適的併發容器
  2. 分佈式下多個實例統計接口流量須要共享內存
  3. 流量統計應該儘量不損耗服務器性能

但此次的博客並非想描述怎麼去實現接口限流,而是主要想描述一下遇到的問題,因此,第二點暫時不考慮,即不使用Redis。java

說到併發的字符串統計,當即讓人聯想到的數據結構即是ConcurrentHashpMap<String,Long> urlCounter;
若是你剛剛接觸併發可能會寫出如代碼清單1的代碼面試

代碼清單1:算法

public class CounterDemo1 {
 
    private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
 
    //接口調用次數+1
    public long increase(String url) {
        Long oldValue = urlCounter.get(url);
        Long newValue = (oldValue == null) ? 1L : oldValue + 1;
        urlCounter.put(url, newValue);
        return newValue;
    }
 
    //獲取調用次數
    public Long getCount(String url){
        return urlCounter.get(url);
    }
 
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        final CounterDemo1 counterDemo = new CounterDemo1();
        int callTime = 100000;
        final String url = "http://localhost:8080/hello";
        CountDownLatch countDownLatch = new CountDownLatch(callTime);
        //模擬併發狀況下的接口調用統計
        for(int i=0;i<callTime;i++){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    counterDemo.increase(url);
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        //等待全部線程統計完成後輸出調用次數
        System.out.println("調用次數:"+counterDemo.getCount(url));
    }
}
 
console output:
調用次數:96526

都說concurrentHashMap是個線程安全的併發容器,因此沒有顯示加同步,實際效果呢並不如所願。shell

問題就出在increase方法,concurrentHashMap能保證的是每個操做(put,get,delete…)自己是線程安全的,可是咱們的increase方法,對concurrentHashMap的操做是一個組合,先get再put,因此多個線程的操做出現了覆蓋。若是對整個increase方法加鎖,那麼又違背了咱們使用併發容器的初衷,由於鎖的開銷很大。咱們有沒有方法改善統計方法呢?
代碼清單2羅列了concurrentHashMap父接口concurrentMap的一個很是有用可是又經常被忽略的方法。數據庫

代碼清單2:編程

/**
 * Replaces the entry for a key only if currently mapped to a given value.
 * This is equivalent to
 *  <pre> {@code
 * if (map.containsKey(key) && Objects.equals(map.get(key), oldValue)) {
 *   map.put(key, newValue);
 *   return true;
 * } else
 *   return false;
 * }</pre>
 *
 * except that the action is performed atomically.
 */
boolean replace(K key, V oldValue, V newValue);

這其實就是一個最典型的CAS操做,except that the action is performed atomically.這句話真是幫了大忙,咱們能夠保證比較和設置是一個原子操做,當A線程嘗試在increase時,舊值被修改的話就回致使replace失效,而咱們只須要用一個循環,不斷獲取最新值,直到成功replace一次,便可完成統計。安全

改進後的increase方法以下服務器

代碼清單3:數據結構

public long increase2(String url) {
        Long oldValue, newValue;
        while (true) {
            oldValue = urlCounter.get(url);
            if (oldValue == null) {
                newValue = 1l;
                //初始化成功,退出循環
                if (urlCounter.putIfAbsent(url, 1l) == null)
                    break;
                //若是初始化失敗,說明其餘線程已經初始化過了
            } else {
                newValue = oldValue + 1;
                //+1成功,退出循環
                if (urlCounter.replace(url, oldValue, newValue))
                    break;
                //若是+1失敗,說明其餘線程已經修改過了舊值
            }
        }
        return newValue;
    }
 
console output:
調用次數:100000

再次調用後得到了正確的結果,上述方案看上去比較繁瑣,由於第一次調用時須要進行一次初始化,因此多了一個判斷,也用到了另外一個CAS操做putIfAbsent,他的源代碼描述以下:

代碼清單4:

/**
     * If the specified key is not already associated
     * with a value, associate it with the given value.
     * This is equivalent to
     *  <pre> {@code
     * if (!map.containsKey(key))
     *   return map.put(key, value);
     * else
     *   return map.get(key);
     * }</pre>
     *
     * except that the action is performed atomically.
     *
     * @implNote This implementation intentionally re-abstracts the
     * inappropriate default provided in {@code Map}.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with the specified key, or
     *         {@code null} if there was no mapping for the key.
     *         (A {@code null} return can also indicate that the map
     *         previously associated {@code null} with the key,
     *         if the implementation supports null values.)
     * @throws UnsupportedOperationException if the {@code put} operation
     *         is not supported by this map
     * @throws ClassCastException if the class of the specified key or value
     *         prevents it from being stored in this map
     * @throws NullPointerException if the specified key or value is null,
     *         and this map does not permit null keys or values
     * @throws IllegalArgumentException if some property of the specified key
     *         or value prevents it from being stored in this map
     */
     V putIfAbsent(K key, V value);

簡單翻譯以下:「若是(調用該方法時)key-value 已經存在,則返回那個 value 值。若是調用時 map 裏沒有找到 key 的 mapping,返回一個 null 值」。值得注意點的一點就是concurrentHashMap的value是不能存在null值的。實際上呢,上述的方案也能夠把Long替換成AtomicLong,能夠簡化實現, ConcurrentHashMap

private AtomicLongMap<String> urlCounter3 = AtomicLongMap.create();
 
public long increase3(String url) {
    long newValue = urlCounter3.incrementAndGet(url);
    return newValue;
}
 
 
public Long getCount3(String url) {
    return urlCounter3.get(url);
}

看一下他的源碼就會發現,其實和代碼清單3思路差很少,只不過功能更完善了一點。

和CAS很像的操做,我以前的博客中提到過數據庫的樂觀鎖,用version字段來進行併發控制,其實也是一種compare and swap的思想。

雜談:網上不少對ConcurrentHashMap的介紹,衆所周知,這是一個用分段鎖實現的一個線程安全的map容器,可是真正對他的使用場景有介紹的少之又少。面試中能知道這個容器的人也確實很多,問出去,也就回答一個分段鎖就沒有下文了,但我以爲吧,有時候只知其一;不知其二反而會比不知道更可怕。

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

https://www.awaimai.com/348.html
網站大規模併發處理方案:電商秒殺與搶購
——————————
今天分享的主題最後的討論很好,最後部分同窗可能仍是會有疑惑,到底高併發或電商場景下該用樂觀鎖仍是悲觀鎖呢?
建議感興趣的同窗先看下上面那篇文章,對整個背景有個大致瞭解,再回來我們的問題:

悲觀鎖和樂觀鎖是數據庫用來保證數據併發安全防止更新丟失的兩種方法,PPT列舉的例子在select ... for update 前加個事務就能夠防止更新丟失。悲觀鎖和樂觀鎖大部分場景下差別不會不大,一些獨特場景下有一些差異,通常咱們能夠從以下幾個方面來判斷:

1.響應速度:若是須要很是高的響應速度,建議採用樂觀鎖方案,成功就執行,不成功就失敗,不須要等待其餘併發去釋放鎖;
2.衝突頻率:若是衝突頻率很是高,建議採用悲觀鎖,保證成功率,若是衝突頻率大,樂觀鎖會須要屢次重試才能成功,資源消耗代價比較大;
3.重試代價:若是重試代價大,建議採用悲觀鎖,好比付款的時候調用第三方外部接口;

總結起來就是:
秒殺活動是一個併發寫的過程,同時也是一個隨機性很高的事件,並不須要去關注事務失敗率高這個問題,因此採用樂觀鎖是合適的。但若是要保證事務成功率的話,顯然使用樂觀鎖是一個糟糕的方案。因此到底該用悲觀鎖仍是樂觀鎖仍是得看場景和業務需求,還有架構。

Refer:

[1] 非阻塞同步算法與CAS(Compare and Swap)無鎖算法

http://www.cnblogs.com/Mainz/p/3546347.html

     小白科普:悲觀鎖和樂觀鎖

     http://bit.ly/2isI7Jx

     併發一枝花之 ConcurrentLinkedQueue

     http://bit.ly/2hEmxlR

[2] ConcurrentHashMap使用示例

https://my.oschina.net/mononite/blog/144329

[3] 深度剖析ConcurrentHashMap源碼

http://blog.csdn.net/xiaoxian8023/article/details/49249091

[4] CAS下ABA問題及優化方案 | 架構師之路

     http://bit.ly/2w1Vfve

     庫存扣多了,到底怎麼整 | 架構師之路

     http://chuansong.me/n/1921434646119

     庫存扣減還有這麼多方案? | 架構師之路

     http://chuansong.me/n/1921434546720

[5] Java併發編程——鎖與可重入鎖

     http://www.jianshu.com/p/007bd7029faf

     java的可重入鎖用在哪些場合?

     https://www.zhihu.com/question/23284564

     java自旋鎖

     http://www.jianshu.com/p/dfbe0ebfec95

     java鎖的種類以及辨析(一):自旋鎖

     http://ifeve.com/java_lock_see1/

[6] Disruptor簡介

     http://blog.csdn.net/winwill2012/article/details/71718809

     高性能隊列——Disruptor

     https://zhuanlan.zhihu.com/p/23863915

     併發框架DISRUPTOR譯文

     http://coolshell.cn/articles/9169.html

[7] Java併發編程-原子性變量

http://www.jianshu.com/p/9e473657340a

相關文章
相關標籤/搜索