歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我微信「java_front」一塊兒交流學習java
在互聯網場景中緩存系統是一個重要系統,爲了防止流量頻繁訪問數據庫,通常會在數據庫層前設置一道緩存層做爲保護。面試
緩存是一個廣義概念,核心要義是將數據存放在離用戶更近的地方,或者是將數據存放在訪問更快的介質。redis
緩存對應到實際應用中能夠分爲內存緩存、遠程緩存。內存緩存常見工具例如Guava、Ecache等,遠程緩存常見系統例如Redis,memcache等。本文以遠程緩存Redis爲例進行講解。數據庫
緩存穿透和擊穿是高併發場景下必須面對的問題,這些問題會致使訪問請求繞過緩存直接打到數據庫,可能會形成數據庫掛掉或者系統雪崩,下面本文根據下圖提綱來分析這些問題的原理和解決方案。緩存
緩存穿透和擊穿從最終結果上來講都是流量繞過緩存打到了數據庫,可能會致使數據庫掛掉或者系統雪崩,可是仔細區分仍是有一些不一樣,咱們分析一張業務讀取緩存通常流程圖。安全
咱們用文字簡要描述這張圖:服務器
(1) 業務查詢數據時首先查詢緩存,若是緩存存在數據則返回,流程結束微信
(2) 若是緩存不存在數據則查詢數據庫,若是數據庫不存在數據則返回空數據,流程結束markdown
(3) 若是數據庫存在數據則將數據寫入緩存並返回數據給業務,流程結束架構
假設業務方要查詢A數據,緩存穿透是指數據庫根本不存在A數據,因此根本沒有數據能夠寫入緩存,致使緩存層失去意義,大量請求會頻繁訪問數據庫。
緩存擊穿是指請求在查詢數據庫前,首先查緩存看看是否存在,這是沒有問題的。可是併發量太大,致使第一個請求尚未來得及將數據寫入緩存,後續大量請求已經開始訪問緩存,這是數據在緩存中仍是不存在的,因此瞬時大量請求會打到數據庫。
如今咱們把緩存問題放一放,一塊兒來分析CAS這個概念的實例源碼,後面咱們編寫緩存工具須要借鑑這個思想。
咱們來看一道常見面試題,相信這個面試題你們並不會陌生:分析下面這段代碼輸出的值是多少。
class Data {
volatile int num = 0;
public void increase() {
num++;
}
}
public class VolatileTest {
public static void main(String[] args) {
Data data = new Data();
// 100個線程操做num累加
for (int i = 1; i <= 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
data.increase();
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
}).start();
}
// 等待上述線程執行完 -> 數值2表示只有主線程和GC線程在運行
while (Thread.activeCount() > 2) {
// 主線程讓出CPU時間片
Thread.yield();
}
System.out.println(data.num);
}
}
複製代碼
運行結果num值通常小於100,這是由於num++不是原子性,咱們編寫一段簡單代碼進行證實。
public class VolatileTest2 {
volatile int num = 0;
public void increase() {
num++;
}
}
複製代碼
執行下列命令獲取字節碼:
javac VolatileTest2.java
javap -c VolatileTest2.class
複製代碼
字節碼文件以下所示:
$ javap -c VolatileTest2.class
Compiled from "VolatileTest2.java"
public class com.java.front.test.VolatileTest2 {
volatile int num;
public com.java.front.test.VolatileTest2();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: iconst_0
6: putfield #2 // Field num:I
9: return
public void increase();
Code:
0: aload_0
1: dup
2: getfield #2 // Field num:I
5: iconst_1
6: iadd
7: putfield #2 // Field num:I
10: return
}
複製代碼
咱們觀察num++代碼片斷,發現其實分爲三個步驟:
(1) getfield
(2) iadd
(3) putfield
複製代碼
getfield讀取num值,iadd運算num+1,最後putfield將新值賦值給num。這就不難理解爲何num最終會小於100:由於線程A在執行到第二步後執行第三步前,還沒來得及將新值賦給num,數據就被線程B取走了,這時仍是沒有加1的舊值。
那麼怎麼解決上述問題呢?常見方案有兩種:加鎖方案和無鎖方案。
加鎖方案是對increase加上同步關鍵字,這樣就能夠保證同一時刻只有一個線程操做,這不是咱們這篇文章重點,不詳細展開了。
無鎖方案能夠採用JUC提供的AtomicInteger進行運算,咱們看一下改進後的代碼。
import java.util.concurrent.atomic.AtomicInteger;
class Data {
volatile AtomicInteger num = new AtomicInteger(0);
public void increase() {
num.incrementAndGet();
}
}
public class VolatileTest {
public static void main(String[] args) {
Data data = new Data();
for (int i = 1; i <= 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
data.increase();
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
}).start();
}
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(data.num);
}
}
複製代碼
這樣改寫以後結果正如咱們預期等於100,咱們並無加鎖,那麼爲何改用AtomicInteger就能夠達到預期效果呢?
本章節咱們以incrementAndGet方法做爲入口,進行CAS源碼分析。
class Data {
volatile AtomicInteger num = new AtomicInteger(0);
public void increase() {
num.incrementAndGet();
}
}
複製代碼
進入incrementAndGet方法:
import sun.misc.Unsafe;
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
}
複製代碼
咱們看到一個名爲Unsafe的類。這個類並不常見,到底有什麼用呢?Unsafe是位於sun.misc包下的一個類,具備操做底層資源的能力。例如能夠直接訪問操做系統,操做特定內存數據,提供許多CPU原語級別的API。
咱們繼續分析源碼跟進getAndAddInt方法:
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
複製代碼
咱們對參數進行說明:o表示待修改的對象,offset表示待修改字段在內存中的偏移量,delta表示本次修改的增量。
整個方法核心是一段do-while循環代碼,其中方法getIntVolatile比較好理解,就是獲取對象o偏移量爲offset的某個字段值。
咱們重點分析while中compareAndSwapInt方法:
public final native boolean compareAndSwapInt( Object o, long offset, int expected, int x);
複製代碼
其中o和offset含義不變,expected表示指望值,x表更新值,這就引出了CAS核心操做三個值:內存位置值、預期原值及新值。
執行CAS操做時,內存位置值會與預期原值比較。若是相匹配處理器會自動將該位置值更新爲新值,不然處理器不作任何操做。
Unsafe提供的CAS方法是一條CPU的原子指令,底層實現即爲CPU指令cmpxchg,不會形成數據不一致。
咱們再回過頭分析這段代碼:
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
複製代碼
代碼執行流程以下:
(1) 線程A執行累加,執行到getAndAddInt方法,首先根據內存地址獲取o對象offset偏移量的字段值v1
(2) while循環中compareAndSwapInt執行,這個方法將再次獲取o對象offset偏移量的字段值v2,此時判斷v1和v2是否相等,若是相等則自動將該位置值更新爲v1加上增量後的值,跳出循環
(3) 若是執行compareAndSwapInt時字段值已經被線程B改掉,則該方法會返回false,因此沒法跳出循環,繼續執行直至成功,這就是自旋設計思想
經過上述分析咱們知道,Unsafe類和自旋設計思想是CAS實現核心,其中自旋設計思想會在咱們緩存工具中體現。
在相同JVM進程中爲了保證同一段代碼塊在同一時刻只能被一個線程訪問,JAVA提供了鎖機制,例如咱們可使用synchroinzed、ReentrantLock進行併發控制。
若是在多個服務器的集羣環境,每一個服務器運行着一個JVM進程。若是但願對多個JVM進行併發控制,此時JVM鎖就不適用了。這時就須要引入分佈式鎖。顧名思義分佈式鎖是對分佈式場景下,多個JVM進程進行併發控制。
分佈式鎖在實現時當心踩坑:例如沒有設置超時時間,若是獲取到鎖的節點因爲某種緣由掛掉沒有釋放鎖,致使其它節點永遠拿不到鎖。
分佈式鎖有多種實現方式,能夠本身經過Redis或者Zookeeper進行實現,也能夠直接使用Redisson框架。本章節給出Redis分佈式鎖Lua腳本實現。本文不進行展開。
上述章節分析了CAS原理和分佈式鎖實現,如今咱們要將上述知識結合起來,實現一個能夠解決緩存擊穿問題的緩存工具。
緩存工具核心思想是若是發現緩存中無數據,利用分佈式鎖使得同一時刻只有一個JVM進程能夠訪問數據庫,並將數據寫入緩存。
那麼沒有搶到分佈式鎖的進程怎麼辦呢?咱們提供如下三種選擇:
方案一:直接返回空數據
方案二:自旋直到獲取到數據
方案三:自旋N次仍然沒有獲取到數據則返回空數據
複製代碼
緩存工具代碼以下:
/** * 業務回調 * * @author 微信公衆號「JAVA前線」 * */
public interface RedisBizCall {
/** * 業務回調方法 * * @return 序列化後數據值 */
String call();
}
/** * 安全緩存管理器 * * @author 微信公衆號「JAVA前線」 * */
@Service
public class SafeRedisManager {
@Resource
private RedisClient RedisClient;
@Resource
private RedisLockManager redisLockManager;
public String getDataSafeRetry(String key, int lockExpireSeconds, int dataExpireSeconds, RedisBizCall bizCall, int retryMaxTimes) {
try {
int currentTimes = 0;
boolean getLockSuccess = false;
while(currentTimes < retryMaxTimes) {
String value = redisClient.get(key);
if (StringUtils.isNotEmpty(value)) {
return value;
}
/** 競爭分佈式鎖 **/
if (getLockSuccess = redisLockManager.tryLock(key, lockExpireSeconds)) {
value = redisClient.get(key);
if (StringUtils.isNotEmpty(value)) {
return value;
}
/** 查詢數據庫 **/
value = bizCall.call();
/** 數據庫無數據則返回**/
if (StringUtils.isEmpty(value)) {
return null;
}
/** 數據存入緩存 **/
redisClient.setex(key, seconds, value);
return value;
} else {
Thread.sleep(100L);
logger.warn("嘗試從新獲取數據,key={}", key);
currentTimes++;
}
}
} catch (Exception ex) {
logger.error("getDistributeSafeRetryError", ex);
return null;
} finally {
if (getLockSuccess) {
redisLockManager.unLock(key);
}
}
}
}
複製代碼
在上面代碼中咱們採用分佈式鎖,對訪問數據庫資源的行爲進行了限制,同一時刻只有一個進程能夠訪問數據庫資源。若是有數據則放入緩存,解決了緩存擊穿問題。若是沒有數據則結束循環,解決了緩存穿透問題。使用方法以下:
/** * 緩存工具使用 * * @author 微信公衆號「JAVA前線」 * */
@Service
public class StudentService implements StudentService {
private static final String KEY_PREFIX = "stuKey_";
@Resource
private StudentDao studentDao;
@Resource
private SafeRedisManager safeRedisManager;
public Student getStudentInfo(String studentId) {
String studentJSON = safeRedisManager.getDataRetry(KEY_PREFIX + studentId, 30, 600, new RedisBizCall() {
public String call() {
StudentDO student = studentDao.getStudentById(studentId);
if (null == student) {
return StringUtils.EMPTY;
}
return JSON.toJSONString(student);
}, 5);
if(StringUtils.isEmpty(studentJSON) {
return null;
}
return JSON.toJSONString(studentJSON, Student.class);
}
}
}
複製代碼
本文到第五章節緩存擊穿問題從原理到解決方案已經講清楚了,這個章節我想引伸一個問題:究竟是先寫緩存仍是先寫數據庫,或者說數據庫與緩存一致性怎麼保證?
個人結論很是清晰明確:先寫數據庫再寫緩存。核心思想是數據庫和緩存之間追求最終一致性,如無必要則無需保證強一致性。
(1) 在緩存做爲提高系統性能手段的背景下,不須要保證數據庫和緩存的強一致性。若是非要保證兩者的強一致性,會增大系統的複雜度沒有必要
(2) 若是更新數據庫成功,再更新緩存。此時存在兩種狀況:更新緩存成功則萬事大吉。更新緩存失敗沒有關係,等待緩存失效,此處必定要合理設置失效時間
(3) 若是更新數據庫失敗,則操做失敗,重試或者等待用戶從新發起
(4) 數據庫是持久化數據,是操做成功仍是失敗的判斷依據。緩存是提高性能的手段,容許短期和數據庫的不一致
(5) 在互聯網架構中,通常不追求強一致性,而追求最終一致性。若是非要保證緩存和數據庫的一致性,本質上是在解決分佈式一致性問題
(6) 分佈式一致性問題解決方案有不少,能夠選擇好比兩階段提交、TCC、本地消息表、MQ事務性消息
本文介紹了緩存擊穿問題緣由和解決方案,其中參考率CAS源碼的自旋設計思想,結合分佈式鎖實現了緩存工具,但願文章對你們有所幫助。
歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我微信「java_front」一塊兒交流學習