Java程序猿筆記——基於redis分佈式鎖實現「秒殺」

最近在項目中遇到了相似「秒殺」的業務場景,在本篇博客中,我將用一個很是簡單的demo,闡述實現所謂「秒殺」的基本思路。java

業務場景
所謂秒殺,從業務角度看,是短期內多個用戶「爭搶」資源,這裏的資源在大部分秒殺場景裏是商品;將業務抽象,技術角度看,秒殺就是多個線程對資源進行操做,因此實現秒殺,就必須控制線程對資源的爭搶,既要保證高效併發,也要保證操做的正確。程序員

一些可能的實現
剛纔提到過,實現秒殺的關鍵點是控制線程對資源的爭搶,根據基本的線程知識,能夠不加思索的想到下面的一些方法:web

一、秒殺在技術層面的抽象應該就是一個方法,在這個方法裏可能的操做是將商品庫存-1,將商品加入用戶的購物車等等,在不考慮緩存的狀況下應該是要操做數據庫的。那麼最簡單直接的實現就是在這個方法上加上synchronized關鍵字,通俗的講就是鎖住整個方法;面試

二、鎖住整個方法這個策略簡單方便,可是彷佛有點粗暴。能夠稍微優化一下,只鎖住秒殺的代碼塊,好比寫數據庫的部分;redis

三、既然有併發問題,那我就讓他「不併發」,將全部的線程用一個隊列管理起來,使之變成串行操做,天然不會有併發問題。spring

上面所述的方法都是有效的,可是都很差。爲何?第一和第二種方法本質上是「加鎖」,可是鎖粒度依然比較高。什麼意思?試想一下,若是兩個線程同時執行秒殺方法,這兩個線程操做的是不一樣的商品,從業務上講應該是能夠同時進行的,可是若是採用第一二種方法,這兩個線程也會去爭搶同一個鎖,這實際上是沒必要要的。第三種方法也沒有解決上面說的問題。數據庫

那麼如何將鎖控制在更細的粒度上呢?能夠考慮爲每一個商品設置一個互斥鎖,以和商品ID相關的字符串爲惟一標識,這樣就能夠作到只有爭搶同一件商品的線程互斥,不會致使全部的線程互斥。分佈式鎖剛好能夠幫助咱們解決這個問題。緩存

何爲分佈式鎖
分佈式鎖是控制分佈式系統之間同步訪問共享資源的一種方式。在分佈式系統中,經常須要協調他們的動做。若是不一樣的系統或是同一個系統的不一樣主機之間共享了一個或一組資源,那麼訪問這些資源的時候,每每須要互斥來防止彼此干擾來保證一致性,在這種狀況下,便須要使用到分佈式鎖。性能優化

咱們來假設一個最簡單的秒殺場景:數據庫裏有一張表,column分別是商品ID,和商品ID對應的庫存量,秒殺成功就將此商品庫存量-1。如今假設有1000個線程來秒殺兩件商品,500個線程秒殺第一個商品,500個線程秒殺第二個商品。咱們來根據這個簡單的業務場景來解釋一下分佈式鎖。架構

一般具備秒殺場景的業務系統都比較複雜,承載的業務量很是巨大,併發量也很高。這樣的系統每每採用分佈式的架構來均衡負載。那麼這1000個併發就會是從不一樣的地方過來,商品庫存就是共享的資源,也是這1000個併發爭搶的資源,這個時候咱們須要將併發互斥管理起來。這就是分佈式鎖的應用。

而key-value存儲系統,如redis,由於其一些特性,是實現分佈式鎖的重要工具。

具體的實現
先來看看一些redis的基本命令:

SETNX key value
若是key不存在,就設置key對應字符串value。在這種狀況下,該命令和SET同樣。當key已經存在時,就不作任何操做。SETNX是」SET if Not eXists」。

expire KEY seconds
設置key的過時時間。若是key已過時,將會被自動刪除。

del KEY
刪除key

因爲筆者的實現只用到這三個命令,就只介紹這三個命令,更多的命令以及redis的特性和使用,能夠參考redis官網。

須要考慮的問題
一、用什麼操做redis?幸好redis已經提供了jedis客戶端用於java應用程序,直接調用jedis API便可。

二、怎麼實現加鎖?「鎖」實際上是一個抽象的概念,將這個抽象概念變爲具體的東西,就是一個存儲在redis裏的key-value對,key是於商品ID相關的字符串來惟一標識,value其實並不重要,由於只要這個惟一的key-value存在,就表示這個商品已經上鎖。

三、如何釋放鎖?既然key-value對存在就表示上鎖,那麼釋放鎖就天然是在redis裏刪除key-value對。

四、阻塞仍是非阻塞?筆者採用了阻塞式的實現,若線程發現已經上鎖,會在特定時間內輪詢鎖。

五、如何處理異常狀況?好比一個線程把一個商品上了鎖,可是因爲各類緣由,沒有完成操做(在上面的業務場景裏就是沒有將庫存-1寫入數據庫),天然沒有釋放鎖,這個狀況筆者加入了鎖超時機制,利用redis的expire命令爲key設置超時時長,過了超時時間redis就會將這個key自動刪除,即強制釋放鎖(能夠認爲超時釋放鎖是一個異步操做,由redis完成,應用程序只須要根據系統特色設置超時時間便可)。

talk is cheap,show me the code

在代碼實現層面,註解有併發的方法和參數,經過動態代理獲取註解的方法和參數,在代理中加鎖,執行完被代理的方法後釋放鎖。

幾個註解定義:

cachelock是方法級的註解,用於註解會產生併發問題的方法:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedPrefix() default "";//redis 鎖key的前綴
long timeOut() default 2000;//輪詢鎖的時間
int expireTime() default 1000;//key在redis裏存在的時間,1000S
}
lockedObject是參數級的註解,用於註解商品ID等基本類型的參數:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
//不須要值
}
LockedComplexObject也是參數級的註解,用於註解自定義類型的參數:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
String field() default "";//含有成員變量的複雜對象中須要加鎖的成員變量,如一個商品對象的商品ID
}
CacheLockInterceptor實現InvocationHandler接口,在invoke方法中獲取註解的方法和參數,在執行註解的方法前加鎖,執行被註解的方法後釋放鎖:

public class CacheLockInterceptor implements InvocationHandler{
public static int ERROR_COUNT = 0;
private Object proxied;
public CacheLockInterceptor(Object proxied) {
this.proxied = proxied;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
CacheLock cacheLock = method.getAnnotation(CacheLock.class);
//沒有cacheLock註解,pass
if(null == cacheLock){
System.out.println("no cacheLock annotation");
return method.invoke(proxied, args);
}
//得到方法中參數的註解
Annotation[][] annotations = method.getParameterAnnotations();
//根據獲取到的參數註解和參數列表得到加鎖的參數
Object lockedObject = getLockedObject(annotations,args);
String objectValue = lockedObject.toString();
//新建一個鎖
RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
//加鎖
boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
if(!result){//取鎖失敗
ERROR_COUNT += 1;
throw new CacheLockException("get lock fail");
}
try{
//加鎖成功,執行方法
return method.invoke(proxied, args);
}finally{
lock.unlock();//釋放鎖
}
}
/**

  • @param annotations
  • @param args
  • @return
  • @throws CacheLockException

*/
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
if(null == args || args.length == 0){
throw new CacheLockException("方法參數爲空,沒有被鎖定的對象");
}
if(null == annotations || annotations.length == 0){
throw new CacheLockException("沒有被註解的參數");
}
//不支持多個參數加鎖,只支持第一個註解爲lockedObject或者lockedComplexObject的參數
int index = -1;//標記參數的位置指針
for(int i = 0;i < annotations.length;i++){
for(int j = 0;j < annotations[i].length;j++){
if(annotationsi instanceof LockedComplexObject){//註解爲LockedComplexObject
index = i;
try {
return args[i].getClass().getField(((LockedComplexObject)annotationsi).field());
} catch (NoSuchFieldException | SecurityException e) {
throw new CacheLockException("註解對象中沒有該屬性" + ((LockedComplexObject)annotationsi).field());
}
}
if(annotationsi instanceof LockedObject){
index = i;
break;
}
}
//找到第一個後直接break,不支持多參數加鎖
if(index != -1){
break;
}
}
if(index == -1){
throw new CacheLockException("請指定被鎖定參數");
}
return args[index];
}
}
最關鍵的RedisLock類中的lock方法和unlock方法:

/**

  • 加鎖
  • 使用方式爲:
  • lock();
  • try{
  • executeMethod();
  • }finally{
  • unlock();
  • }
  • @param timeout timeout的時間範圍內輪詢鎖
  • @param expire 設置鎖超時時間
  • @return 成功 or 失敗

*/
public boolean lock(long timeout,int expire){
long nanoTime = System.nanoTime();
timeout *= MILLI_NANO_TIME;
try {
//在timeout的時間範圍內不斷輪詢鎖
while (System.nanoTime() - nanoTime < timeout) {
//鎖不存在的話,設置鎖並設置鎖過時時間,即加鎖
if (this.redisClient.setnx(this.key, LOCKED) == 1) {
this.redisClient.expire(key, expire);//設置鎖過時時間是爲了在沒有釋放
//鎖的狀況下鎖過時後消失,不會形成永久阻塞
this.lock = true;
return this.lock;
}
System.out.println("出現鎖等待");
//短暫休眠,避免可能的活鎖
Thread.sleep(3, RANDOM.nextInt(30));
}
} catch (Exception e) {
throw new RuntimeException("locking error",e);
}
return false;
}
public void unlock() {
try {
if(this.lock){
redisClient.delKey(key);//直接刪除
}
} catch (Throwable e) {
}
}
上述的代碼是框架性的代碼,如今來說解如何使用上面的簡單框架來寫一個秒殺函數。

先定義一個接口,接口裏定義了一個秒殺方法:

public interface SeckillInterface {
/**
*如今暫時只支持在接口方法上註解
*/
//cacheLock註解可能產生併發的方法
@CacheLock(lockedPrefix="TEST_PREFIX")
public void secKill(String userID,@LockedObject Long commidityID);//最簡單的秒殺方法,參數是用戶ID和商品ID。可能有多個線程爭搶一個商品,因此商品ID加上LockedObject註解
}
上述SeckillInterface接口的實現類,即秒殺的具體實現:

public class SecKillImpl implements SeckillInterface{
static Map<Long, Long> inventory ;
static{
inventory = new HashMap<>();
inventory.put(10000001L, 10000l);
inventory.put(10000002L, 10000l);
}
@Override
public void secKill(String arg1, Long arg2) {
//最簡單的秒殺,這裏僅做爲demo示例
reduceInventory(arg2);
}
//模擬秒殺操做,姑且認爲一個秒殺就是將庫存減一,實際情景要複雜的多
public Long reduceInventory(Long commodityId){
inventory.put(commodityId,inventory.get(commodityId) - 1);
return inventory.get(commodityId);
}
}
模擬秒殺場景,1000個線程來爭搶兩個商品:

@Test
public void testSecKill(){
int threadCount = 1000;
int splitPoint = 500;
CountDownLatch endCount = new CountDownLatch(threadCount);
CountDownLatch beginCount = new CountDownLatch(1);
SecKillImpl testClass = new SecKillImpl();
Thread[] threads = new Thread[threadCount];
//起500個線程,秒殺第一個商品
for(int i= 0;i < splitPoint;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一個信號量上,掛起
beginCount.await();
//用動態代理的方式調用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId1);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();
}
//再起500個線程,秒殺第二件商品
for(int i= splitPoint;i < threadCount;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一個信號量上,掛起
beginCount.await();
//用動態代理的方式調用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill("test", commidityId2);
//testClass.testFunc("test", 10000001L);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();
}
long startTime = System.currentTimeMillis();
//主線程釋放開始信號量,並等待結束信號量,這樣作保證1000個線程作到徹底同時執行,保證測試的正確性
beginCount.countDown();
try {
//主線程等待結束信號量
endCount.await();
//觀察秒殺結果是否正確
System.out.println(SecKillImpl.inventory.get(commidityId1));
System.out.println(SecKillImpl.inventory.get(commidityId2));
System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
System.out.println("total cost " + (System.currentTimeMillis() - startTime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
在正確的預想下,應該每一個商品的庫存都減小了500,在屢次試驗後,實際狀況符合預想。若是不採用鎖機制,會出現庫存減小499,498的狀況。

這裏採用了動態代理的方法,利用註解和反射機制獲得分佈式鎖ID,進行加鎖和釋放鎖操做。固然也能夠直接在方法進行這些操做,採用動態代理也是爲了可以將鎖操做代碼集中在代理中,便於維護。

一般秒殺場景發生在web項目中,能夠考慮利用spring的AOP特性將鎖操做代碼置於切面中,固然AOP本質上也是動態代理。

小結
這篇文章從業務場景出發,從抽象到實現闡述瞭如何利用redis實現分佈式鎖,完成簡單的秒殺功能,也記錄了筆者思考的過程,但願能給閱讀到本篇文章的人一些啓發。如讀者有其餘看法歡迎留言。

程序員學習交流學習羣:878249276,羣裏有分享的視頻,面試指導,架構資料,還有思惟導圖、羣裏有視頻,都是乾貨的,你能夠下載來看。主要分享分佈式架構、高可擴展、高性能、高併發、性能優化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分佈式項目實戰學習架構師視頻。合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索