流程圖+源碼深刻分析:緩存穿透和擊穿問題原理以及解決方案

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習java

1 文章概述

在互聯網場景中緩存系統是一個重要系統,爲了防止流量頻繁訪問數據庫,通常會在數據庫層前設置一道緩存層做爲保護。面試

緩存是一個廣義概念,核心要義是將數據存放在離用戶更近的地方,或者是將數據存放在訪問更快的介質。redis

緩存對應到實際應用中能夠分爲內存緩存、遠程緩存。內存緩存常見工具例如Guava、Ecache等,遠程緩存常見系統例如Redis,memcache等。本文以遠程緩存Redis爲例進行講解。數據庫

緩存穿透和擊穿是高併發場景下必須面對的問題,這些問題會致使訪問請求繞過緩存直接打到數據庫,可能會形成數據庫掛掉或者系統雪崩,下面本文根據下圖提綱來分析這些問題的原理和解決方案。緩存

流程圖+源碼深刻分析:緩存穿透和擊穿問題原理以及解決方案

 

2 緩存穿透與擊穿區分

緩存穿透和擊穿從最終結果上來講都是流量繞過緩存打到了數據庫,可能會致使數據庫掛掉或者系統雪崩,可是仔細區分仍是有一些不一樣,咱們分析一張業務讀取緩存通常流程圖。安全

流程圖+源碼深刻分析:緩存穿透和擊穿問題原理以及解決方案

 

咱們用文字簡要描述這張圖:服務器

(1) 業務查詢數據時首先查詢緩存,若是緩存存在數據則返回,流程結束微信

(2) 若是緩存不存在數據則查詢數據庫,若是數據庫不存在數據則返回空數據,流程結束架構

(3) 若是數據庫存在數據則將數據寫入緩存並返回數據給業務,流程結束併發

假設業務方要查詢A數據,緩存穿透是指數據庫根本不存在A數據,因此根本沒有數據能夠寫入緩存,致使緩存層失去意義,大量請求會頻繁訪問數據庫。

緩存擊穿是指請求在查詢數據庫前,首先查緩存看看是否存在,這是沒有問題的。可是併發量太大,致使第一個請求尚未來得及將數據寫入緩存,後續大量請求已經開始訪問緩存,這是數據在緩存中仍是不存在的,因此瞬時大量請求會打到數據庫。

3 CAS實例與源碼分析

如今咱們把緩存問題放一放,一塊兒來分析CAS這個概念的實例源碼,後面咱們編寫緩存工具須要借鑑這個思想。

3.1 一道面試題

咱們來看一道常見面試題,相信這個面試題你們並不會陌生:分析下面這段代碼輸出的值是多少:

class Data {
    volatile int num = 0;
    public void increase() {
        num++;
    }
}
public class VolatileTest {
    public static void main(String[] args) {
        Data data = new Data();
        
        // 100個線程操做num累加
        for (int i = 1; i <= 100; i++) {
            new Thread(new Runnable() {
                @Override public void run() {
                    try {
                        Thread.sleep(1000L);
                        data.increase();
                    } catch (Exception ex) {
                        System.out.println(ex.getMessage());
                    }
                }
            }).start();
        }
        
        // 等待上述線程執行完 -數值2表示只有主線程和GC線程在運行
        while (Thread.activeCount() 2) {
            // 主線程讓出CPU時間片
            Thread.yield();
        }
        System.out.println(data.num);
    }
}

運行結果num值通常小於100,這是由於num++不是原子性,咱們編寫一段簡單代碼進行證實。

public class VolatileTest2 {
    volatile int num = 0;
    public void increase() {
        num++;
    }
}

執行下列命令獲取字節碼:

javac VolatileTest2.java
javap -c VolatileTest2.class

字節碼文件以下所示:

$ javap -c VolatileTest2.class
Compiled from "VolatileTest2.java"
public class com.java.front.test.VolatileTest2 {
  volatile int num;
  public com.java.front.test.VolatileTest2();
    Code:
       0: aload_0
       1: invokespecial 1 // Method java/lang/Object."<init>":()V
       4: aload_0
       5: iconst_0
       6: putfield 2 // Field num:I
       9: return
  public void increase();
    Code:
       0: aload_0
       1: dup
       2: getfield 2 // Field num:I
       5: iconst_1
       6: iadd
       7: putfield 2 // Field num:I
      10: return
}

咱們觀察num++代碼片斷,發現其實分爲三個步驟:

(1) getfield 
(2) iadd  
(3) putfield

getfield讀取num值,iadd運算num+1,最後putfield將新值賦值給num。這就不難理解爲何num最終會小於100:由於線程A在執行到第二步後執行第三步前,還沒來得及將新值賦給num,數據就被線程B取走了,這時仍是沒有加1的舊值。

3.2 CAS實例分析

那麼怎麼解決上述問題呢?常見方案有兩種:加鎖方案和無鎖方案。

加鎖方案是對increase加上同步關鍵字,這樣就能夠保證同一時刻只有一個線程操做,這不是咱們這篇文章重點,不詳細展開了。

無鎖方案能夠採用JUC提供的AtomicInteger進行運算,咱們看一下改進後的代碼。

import java.util.concurrent.atomic.AtomicInteger;
class Data {
    volatile AtomicInteger num = new AtomicInteger(0);
    public void increase() {
        num.incrementAndGet();
    }
}
public class VolatileTest {
    public static void main(String[] args) {
        Data data = new Data();
        for (int i = 1; i <= 100; i++) {
            new Thread(new Runnable() {
                @Override public void run() {
                    try {
                        Thread.sleep(1000L);
                        data.increase();
                    } catch (Exception ex) {
                        System.out.println(ex.getMessage());
                    }
                }
            }).start();
        }
        while (Thread.activeCount() 2) {
            Thread.yield();
        }
        System.out.println(data.num);
    }
}

這樣改寫以後結果正如咱們預期等於100,咱們並無加鎖,那麼爲何改用AtomicInteger就能夠達到預期效果呢?

3.3 CAS源碼分析

本章節咱們以incrementAndGet方法做爲入口,進行CAS源碼分析。

class Data {
    volatile AtomicInteger num = new AtomicInteger(0);
    public void increase() {
        num.incrementAndGet();
    }
}

進入incrementAndGet方法:

import sun.misc.Unsafe;
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

咱們看到一個名爲Unsafe的類。這個類並不常見,到底有什麼用呢?Unsafe是位於sun.misc包下的一個類,具備操做底層資源的能力。例如能夠直接訪問操做系統,操做特定內存數據,提供許多CPU原語級別的API。

咱們繼續分析源碼跟進getAndAddInt方法:

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

咱們對參數進行說明:o表示待修改的對象,offset表示待修改字段在內存中的偏移量,delta表示本次修改的增量。

整個方法核心是一段do-while循環代碼,其中方法getIntVolatile比較好理解,就是獲取對象o偏移量爲offset的某個字段值。

咱們重點分析while中compareAndSwapInt方法:

public final native boolean compareAndSwapInt( Object o, long offset, int expected, int x);

其中o和offset含義不變,expected表示指望值,x表更新值,這就引出了CAS核心操做三個值:內存位置值、預期原值及新值。

執行CAS操做時,內存位置值會與預期原值比較。若是相匹配處理器會自動將該位置值更新爲新值,不然處理器不作任何操做。

Unsafe提供的CAS方法是一條CPU的原子指令,底層實現即爲CPU指令cmpxchg,不會形成數據不一致。

咱們再回過頭分析這段代碼:

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

代碼執行流程以下:

(1) 線程A執行累加,執行到getAndAddInt方法,首先根據內存地址獲取o對象offset偏移量的字段值v1
(2) while循環中compareAndSwapInt執行,這個方法將再次獲取o對象offset偏移量的字段值v2,此時判斷v1和v2是否相等,若是相等則自動將該位置值更新爲v1加上增量後的值,跳出循環
(3) 若是執行compareAndSwapInt時字段值已經被線程B改掉,則該方法會返回false,因此沒法跳出循環,繼續執行直至成功,這就是自旋設計思想

經過上述分析咱們知道,Unsafe類和自旋設計思想是CAS實現核心,其中自旋設計思想會在咱們緩存工具中體現。

4 分佈式鎖實例分析

在相同JVM進程中爲了保證同一段代碼塊在同一時刻只能被一個線程訪問,JAVA提供了鎖機制,例如咱們可使用synchroinzed、ReentrantLock進行併發控制。

若是在多個服務器的集羣環境,每一個服務器運行着一個JVM進程。若是但願對多個JVM進行併發控制,此時JVM鎖就不適用了。這時就須要引入分佈式鎖。顧名思義分佈式鎖是對分佈式場景下,多個JVM進程進行併發控制。

分佈式鎖在實現時當心踩坑:例如沒有設置超時時間,若是獲取到鎖的節點因爲某種緣由掛掉沒有釋放鎖,致使其它節點永遠拿不到鎖。

分佈式鎖有多種實現方式,能夠本身經過Redis或者Zookeeper進行實現,也能夠直接使用Redisson框架。本章節給出Redis分佈式鎖Lua腳本實現。

public class RedisLockManager {
    private static final String DEFAULT_VALUE = "lock";
    private static final String LOCK_SCRIPT =
        "\nlocal r = tonumber(redis.call('SETNX', KEYS[1], ARGV[1]));"
        + "\nif r == 1 then"
        + "\nredis.call('PEXPIRE',KEYS[1],ARGV[2]);"
        + "\nend"
        + "\nreturn r";
    private static final String UNLOCK_SCRIPT =
        "\nlocal v = redis.call('GET', KEYS[1]);"
        + "\nlocal r = 0;"
        + "\nif v == ARGV[1] then"
        + "\nr = redis.call('DEL',KEYS[1]);"
        + "\nend"
        + "\nreturn r";
    @Resource
    private RedisClient redisClient;
    public boolean tryLock(String key, int seconds) {
        try {
            String lockValue = executeLuaScript(key, lockSeconds);
            if (lockValue != null) {
                return true;
            }
            return false;
        } catch (Exception ex) {
            LOGGER.error("key={},lockSeconds={}", key, lockSeconds, ex);
            return false;
        }
    }
    public boolean unLock(String key) {
        try {
            Long r = (Long) redisClient.eval(UNLOCK_SCRIPT, 1, key, DEFAULT_VALUE);
            if (new Long(1).equals(r)) {
                return true;
            }
        } catch (Exception ex) {
            LOGGER.info("key={}", key, ex);
        }
        return false;
    }
    private String executeLuaScript(String key, int lockSeconds) {
        try {
            Long returnValue = (Long) redisClient.eval(LOCK_SCRIPT, 1, key, DEFAULT_VALUE, String.valueOf(lockSeconds));
            if (new Long(1).equals(returnValue)) {
                return DEFAULT_VALUE;
            }
        } catch (Exception ex) {
            LOGGER.error("key={},lockSeconds={}", key, lockSeconds, ex);
        }
        return null;
    }
}

5 緩存工具實例分析

上述章節分析了CAS原理和分佈式鎖實現,如今咱們要將上述知識結合起來,實現一個能夠解決緩存擊穿問題的緩存工具。

緩存工具核心思想是若是發現緩存中無數據,利用分佈式鎖使得同一時刻只有一個JVM進程能夠訪問數據庫,並將數據寫入緩存。

那麼沒有搶到分佈式鎖的進程怎麼辦呢?咱們提供如下三種選擇:

方案一:直接返回空數據

方案二:自旋直到獲取到數據

方案三:自旋N次仍然沒有獲取到數據,則返回空數據

緩存工具代碼以下:

/** * 業務回調 * * @author 今日頭條號「JAVA前線」 * */
public interface RedisBizCall {
    /** * 業務回調方法 * * @return 序列化後數據值 */
    String call();
}

/** * 安全緩存管理器 * * @author 今日頭條號「JAVA前線」 * */
@Service
public class SafeRedisManager {
    @Resource
    private RedisClient RedisClient;
    @Resource
    private RedisLockManager redisLockManager;
    public String getDataSafe(String key, int lockExpireSeconds, int dataExpireSeconds, RedisBizCall bizCall, boolean alwaysRetry) {
        try {
            boolean getLockSuccess = false;
            while(true) {
                String value = redisClient.get(key);
                if (StringUtils.isNotEmpty(value)) {
                    return value;
                }
                /** 競爭分佈式鎖 **/
                if (getLockSuccess = redisLockManager.tryLock(key, lockExpireSeconds)) {
                    value = redisClient.get(key);
                    if (StringUtils.isNotEmpty(value)) {
                        return value;
                    }
                    /** 查詢數據庫 **/
                    value = bizCall.call();
                    /** 數據庫無數據則返回**/
                    if (StringUtils.isEmpty(value)) {
                        return null;
                    }
                    /** 數據存入緩存 **/
                    redisClient.setex(key, dataExpireSeconds, value);
                    return value;
                } else {
                    if (!alwaysRetry) {
                        logger.warn("競爭分佈式鎖失敗,key={}", key);
                        return null;
                    }
                    Thread.sleep(100L);
                    logger.warn("嘗試從新獲取數據,key={}", key);
                }
            }
        } catch (Exception ex) {
            logger.error("getDistributeSafeError", ex);
            return null;
        } finally {
            if (getLockSuccess) {
                redisLockManager.unLock(key);
            }
        }
    }
    public String getDataSafeRetry(String key, int lockExpireSeconds, int dataExpireSeconds, RedisBizCall bizCall, int retryMaxTimes) {
        try {
            int currentTimes = 0;
            boolean getLockSuccess = false;
            while(currentTimes < retryMaxTimes) {
                String value = redisClient.get(key);
                if (StringUtils.isNotEmpty(value)) {
                    return value;
                }
                /** 競爭分佈式鎖 **/
                if (getLockSuccess = redisLockManager.tryLock(key, lockExpireSeconds)) {
                    value = redisClient.get(key);
                    if (StringUtils.isNotEmpty(value)) {
                        return value;
                    }
                    /** 查詢數據庫 **/
                    value = bizCall.call();
                    /** 數據庫無數據則返回**/
                    if (StringUtils.isEmpty(value)) {
                        return null;
                    }
                    /** 數據存入緩存 **/
                    redisClient.setex(key, seconds, value);
                    return value;
                } else {
                    Thread.sleep(100L);
                    logger.warn("嘗試從新獲取數據,key={}", key);
                    currentTimes++;
                }
            }
        } catch (Exception ex) {
            logger.error("getDistributeSafeRetryError", ex);
            return null;
        } finally {
            if (getLockSuccess) {
                redisLockManager.unLock(key);
            }
        }
    }
}

在上面代碼中咱們採用分佈式鎖,對訪問數據庫資源的行爲進行了限制,同一時刻只有一個進程能夠訪問數據庫資源。若是有數據則放入緩存,解決了緩存擊穿問題。若是沒有數據則結束循環,解決了緩存穿透問題。使用方法以下:

/** * 緩存工具使用 * * @author 今日頭條號「JAVA前線」 * */
@Service
public class StudentService implements StudentService {
    private static final String KEY_PREFIX = "stuKey_";
    @Resource
    private StudentDao studentDao;
    @Resource
    private SafeRedisManager safeRedisManager;
    public Student getStudentInfo(String studentId) {
        String studentJSON = safeRedisManager.getDataRetry(KEY_PREFIX + studentId, 30, 600, new RedisBizCall() {
            public String call() {
                StudentDO student = studentDao.getStudentById(studentId);
                if (null == student) {
                    return StringUtils.EMPTY;
                }
                return JSON.toJSONString(student);
            }, 5);
            if(StringUtils.isEmpty(studentJSON) {
                return null;
            }
            return JSON.toJSONString(studentJSON, Student.class);
        }
    }
}

6 數據庫與緩存一致性問題

本文到第五章節緩存擊穿問題從原理到解決方案已經講清楚了,這個章節我想引伸一個問題:究竟是先寫緩存仍是先寫數據庫,或者說數據庫與緩存一致性怎麼保證?

個人結論很是清晰明確:先寫數據庫再寫緩存。核心思想是數據庫和緩存之間追求最終一致性,如無必要則無需保證強一致性。

(1) 在緩存做爲提高系統性能手段的背景下,不須要保證數據庫和緩存的強一致性。若是非要保證兩者的強一致性,會增大系統的複雜度沒有必要

(2) 若是更新數據庫成功,再更新緩存。此時存在兩種狀況:更新緩存成功則萬事大吉。更新緩存失敗沒有關係,等待緩存失效,此處必定要合理設置失效時間

(3) 若是更新數據庫失敗,則操做失敗,重試或者等待用戶從新發起

(4) 數據庫是持久化數據,是操做成功仍是失敗的判斷依據。緩存是提高性能的手段,容許短期和數據庫的不一致

(5) 在互聯網架構中,通常不追求強一致性,而追求最終一致性。若是非要保證緩存和數據庫的一致性,本質上是在解決分佈式一致性問題

(6) 分佈式一致性問題解決方案有不少,能夠選擇好比兩階段提交、TCC、本地消息表、MQ事務性消息

7 文章總結

本文介紹了緩存擊穿問題緣由和解決方案,其中參考率CAS源碼的自旋設計思想,結合分佈式鎖實現了緩存工具,但願文章對你們有所幫助。

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習

相關文章
相關標籤/搜索