在互聯網企業中,限購的作法,多種多樣,有的別出心裁,有的因循守舊,可是種種作法皆想達到的目的,無外乎幾種,商品賣的完,系統抗的住,庫存不超限。雖然短短數語,卻有着說不完,道不盡,輕者如釋重負,重者涕淚橫流的架構體驗。 可是,在實際開發過程當中,庫存超限,做爲其中最核心的一員,到底該怎麼作,如何作纔會是最合適的呢?python
今天這篇文章,我將會展現給你們庫存限購的五種常見的作法,並對其利弊一一探討,因爲這五種作法,有的在設計之初當作提案被否認掉的,有的在線上跑着,可是在沒有任何單元測試和壓測狀況下,這幾種超限控制的作法也許是不符合你的業務的,因此不建議直接用於生產環境。我這裏權當是作拋磚引玉,期待你們更好的作法。mysql
工欲善其事必先利其器,在這裏,咱們將利用一臺測試環境的redis服務器當作庫存超限控制的主戰場,先設置庫存量爲10進去,而後根據此庫存量,一一展開,設置庫存代碼以下:程序員
1: def set_storage():
2: conn = redis_conn()
3: key = "storage_seckill"
4: current_storage = conn.get(key)
5: if current_storage == None:
6: conn.set(key, 10)
複製代碼
爲了方便性,我這裏使用了python語言來書寫邏輯,可是今天咱們只是講解思想,語言這類的,你們能夠本身嘗試轉一下。redis
上面就是咱們的設置庫存到redis中的作法,很簡單,就是在redis中設置一個storage_seckill的庫存key,而後裏面放上庫存量10.sql
超限限制作法一:先獲取當前庫存值進行比對,而後進行扣減操做bash
1: def storage_scenario_one():
2: conn = redis_conn()
3: key = "storage_seckill"
4: current_storage = conn.get(key)
5: current_storage_int = int(current_storage)
6: if current_storage_int<=0 :
7: return 0
8: result = conn.decr(key)
9: return result
複製代碼
首先,咱們拿到當前的庫存值,而後看看是否已經扣減到了零,若是扣減到了零,則不繼續扣減,直接返回;若是庫存還有,則利用decr原子操做進行扣減,同時返回扣減後的庫存值。服務器
此種作法在小併發量下訪問,問題不大;在對庫存量控制不嚴格的業務中,問題也不大。可是若是併發量比較大一些,同時業務要求嚴格控制庫存,那麼此種作法是很是不合適的,緣由在於,在高併發狀況下,get命令,decr命令,都是分開發給redis的,這樣會致使比對的時候,很容易出現限制不住的狀況,也就是會形成第六行的比對失效。網絡
設想以下一個場景,AB兩個請求進來,A獲取的庫存值爲1,B獲取的庫存值爲1,而後兩個請求都被髮到redis中進行扣減操做,而後這種場景下,A最後獲得的庫存值爲0;可是B最後獲得的庫存值爲-1,超限。架構
因此此種場景,因爲在高併發下,get和decr操做不是一組原子性操做,會引起超限問題,被直接pass。併發
超限限制作法二:先扣減庫存,而後比對,最後根據狀況回滾
1: def storage_scenario_two():
2: conn = redis_conn()
3: key = "storage_seckill"
4: current = conn.decr(key)
5: if current>=0:
6: return current
7: else:
8: #回滾庫存
9: conn.incr(key)
10: return 0
複製代碼
首先,請求進來,直接對庫存值進行扣減,而後獲得當前的庫存值;而後,對此庫存值進行校驗,若是庫存還有,則返回庫存值,若是庫存沒有了,則回滾庫存,以便於防止負庫存量的存在。
此作法,相比作法一,要稍微可靠一些,因爲redis的decr操做直接返回真實的庫存值,因此每一個請求進來,只要執行了decr操做,拿到的確定是當前最準確的庫存值。而後進行比對,若是庫存值大於等於零,返回當前庫存值,若是小於零,則將庫存進行回滾。
此種作法,最大的一個問題就是,若是大批量的併發請求過來,redis承受的寫操做的量,相對於方法一來講,是加倍的,由於回滾庫存的存在致使的。因此這種狀況下,高併發量進來,極有可能將redis的寫操做打出極限值,而後會出現不少redis寫失敗的錯誤警告。 另外一個問題和作法一是同樣的,就是第五行的比對在高併發下,也是限不住的,具體的壓測結果請看個人這篇stackoverflow的提問:Will redis incr command can be limitation to specific number?
因此此種場景,雖然在高併發狀況下避免了redis命令的分開操做,可是卻大大增長了redis的寫併發量,被pass。
超限限制作法三:先遞減庫存,而後經過整數溢出控制,最後根據狀況回滾
1: def storage_scenario_three():
2: conn = redis_conn()
3: key = "storage_seckill"
4: current = conn.decr(key)
5: #經過整數控制溢出的作法
6: if storage_overflow_checker(current):
7: return current
8: else:
9: #回滾庫存
10: conn.incr(key)
11: return 0
12:
13: def storage_overflow_checker(current_storage):
14: #若是當前庫存未被遞減到0,則check_number爲int類型,isinstance方法檢測結果爲true
15: #若是當前庫存已被遞減到負數,則check_number爲long類型,isinstance方法檢測結果爲false
16: check_number = sys.maxint - current_storage
17: check_result = isinstance(check_number,int)
18: return check_result
複製代碼
說明一下,當前庫存,若是爲負數,則利用python的isinstance(check_number,int)檢測的時候,check_result返回是false;若是爲非負數,則檢測的時候,check_result返回的是true,上面的storage_overflow_checker的作法,和下面的C#語言的作法是同樣的,利用C#語言描述,你們可能對上面的代碼更清晰一些:
1: /**
2: * 經過讓Integer溢出的方式來控制數量超賣(遞減致使溢出)
3: * @param current
4: * @return
5: */
6: public boolean StorageOverFillChecker(long current) {
7: try {
8: //當前數值的結果計算
9: Long value = Integer.MAX_VALUE - current;
10: //嘗試轉變爲Inter類型,若是超賣,則轉換會出錯;若是未超賣,則轉換不會出錯
11: Integer.parseInt(value.toString());
12: } catch (Exception ex) {
13: //值溢出
14: return true;
15: }
16:
17: return false;
18: }
複製代碼
能夠看出,此種作法和方法二很類似,只是比對部分由,直接和零比對,變成了經過檢測integer是否溢出的方式來進行。這樣就完全解決了高併發狀況下,直接和零比對,限制不住的問題了。
雖然此種作法,相對於作法二說來,要靠譜不少,可是仍然解決不了在高併發狀況下,redis寫併發量加倍的問題,極有可能某個促銷活動,在開始的那一刻,直接將redis的寫操做打出問題來。
超限限制作法四:共享鎖
1: def storage_scenario_four():
2: conn = redis_conn()
3: key = "storage_seckill"
4: key_lock = key + "_lock"
5: if conn.setnx(key_lock, "1"):
6: #客戶端掛掉,設置過時時間,防止其不釋放鎖
7: conn.pexpire(key_lock, 5)
8: current_storage = conn.get(key)
9: if int(current_storage)<=0 :
10: return 0
11: result = conn.decr(key)
12: #客戶端正常,刪除共享鎖,提升性能
13: conn.delete(key_lock)
14: return result
15: else :
16: return "someone in it"
複製代碼
前面三種,因爲在高併發下都有問題,因此本作法,主要是經過setnx設置共享鎖,而後請求到鎖的用戶請求,正常進行庫存扣減操做;請求不到鎖的用戶請求,則直接提示有其餘人在操做庫存。
因爲setnx的特殊性,當客戶端掛掉的時候,是不會釋放這個鎖的,因此當請求進來的時候,首先經過pexpire命令,爲鎖設置過時時間,防止死鎖不釋放。而後執行正常的庫存扣減操做,當操做完畢,刪掉共享鎖,能夠極大的提升程序性能,不然只能等待鎖慢慢過時了。
此種作法相對於上面的三種操做,經過採用共享鎖,犧牲了部分性能,來規避了高併發的問題,比較推薦,可是因爲redis操做命令仍是不少,而且每條都要發送到redis端執行,因此在網絡傳輸上,耗費的時間開銷是不小的。這是後面須要着力優化的方向。
看了上面四種作法,都不是很完美,其中最大的問題在於,高併發狀況下,多條redis命令分開操做庫存,極容易發生庫存限不住的問題;同時,因爲加了rollback庫存操做,極容易因爲redis寫命令的操做數加倍致使壓垮redis的風險。加了鎖,雖然犧牲了部分性能,規避了高併發問題,可是redis命令操做量過多。
其實我上面一直在強調高併發,高併發。上面的四個場景,只有在高併發的狀況下,纔會出現問題,若是你的用戶請求量沒有那麼多,那麼採用上面四種方式之一,也不是不能夠。可是如何才能知道採用起來沒問題呢?其實最簡單的一個方式,就是在大家本身的集羣機器上,模擬活動的真實用戶量,進行壓測,看看會不會超限就好了,不超限的話,上面四種作法徹底知足需求。
那麼,就沒有比較好一些的解決方案了嗎?
也不是,雖然解決這個問題,沒有絕對好用的銀彈,可是有相對好用的大蒜和聖水。下面的講解,會涉及到Redisson的Redlock的源碼實現,固然也會涉及一點lua方面的知識,還請提早預備一下。
偶然在研究分佈式鎖的時候,嘗試翻閱過Redisson的Redlock的實現,並對其底層的實現方式有所記錄,咱們先來看看其加鎖過程的源碼實現:
從上面的方法中,咱們能夠看到,分佈式鎖的上鎖過程,是首先判斷一個key存不存在,若是不存在,則設置這個key,而後pexpire設置一個過時時間,來防止客戶端訪問的時候,掛掉了後,不釋放鎖的問題。爲何這段lua代碼就能實現分佈式鎖的核心呢? 緣由就是,這段代碼放到一個lua腳本中,那麼這段lua腳本就是一個原子性的操做。redis在執行這段lua腳本的過程當中,不會摻雜任何其餘的命令。因此從根本上避免了併發操做命令的問題。
咱們都知道,一個key若是設置了過時時間,key過時後,redis是不會刪掉這個key的,只有用戶訪問纔會刪除掉這個key,因此,當使用分佈式鎖的時候,若是設置的pexpire過時時間爲5ms,那麼一秒鐘只能處理200個併發,性能很是低。如何解決這種性能問題呢?來看來解鎖的操做:
從上面解鎖的方法中,咱們能夠看到,若是這個鎖用完了以後,Redisson的作法是是直接刪除掉的。這樣能夠提升很多的性能。(源碼參閱,屬於我本身的理解,若有謬誤,還請指教)
那麼按照上面這種設計思路,新的超限作法就出來了。
超限作法五:基於lua的共享鎖
1: def storage_scenario_five():
2: conn = redis_conn()
3: key = "storage_seckill"
4: key_lock = key + "_lock"
5: key_val = "get_the_key"
6: lua = """ 7: local key = KEYS[1] 8: local expire = KEYS[2] 9: local value = KEYS[3] 10: 11: local result = redis.call('setnx',key,value) 12: if result == 1 then 13: redis.call('pexpire', key, expire) 14: end 15: return result 16: """
17: locked = conn.eval(lua, 3, key_lock, 5, key_val)
18: print (locked == 1)
19: if locked == 1:
20: val = storage_scenario_one()
21: print("val:"+str(val))
22: #刪掉共享key,用以提升性能, 不然只能默默的等其過時
23: conn.delete(key_lock)
24: return val
25: else:
26: return "someone in it"
複製代碼
這種作法,實際上是作法四的衍生優化版本,優化的地方在於,將多條redis操做命令屢次發送,改爲了將多條redis操做命令放在了一個原子性操做事務中一次性執行完畢,省去了不少的網絡請求。若是能夠,其實你也能夠將業務邏輯糅合到上面的lua代碼中,這樣一來,性能固然會更好。
上面這種作法,若是 storage_scenario_one()這種操做是直接操做的mysql庫存,則很是推薦這種作法,可是若是storage_scenario_one()這種操做直接操做的redis中的虛擬庫存,則不是很推薦這種作法,不如直接用限流操做。
超限作法六: All In Lua
1: def storage_scenario_six():
2: conn = redis_conn()
3: lua = """ 4: local storage = redis.call('get','storage_seckill') 5: if storage ~= false then 6: if tonumber(storage) > 0 then 7: return redis.call('decr','storage_seckill') 8: else 9: return 'storage is zero now, can't perform decr action' 10: end 11: else 12: return redis.call('set','storage_seckill',10) 13: end 14: """
15: result = conn.eval(lua,0)
16: print(result)
複製代碼
此種作法是當前最好的作法,全部的庫存扣減操做都放在lua腳本中進行,造成一個原子性操做,redis在執行上面的lua腳本的時候,是不會摻雜任何其餘的執行命令的。因此這樣從根本上避免了高併發下,多條命令執行帶來的問題。並且上面的redis命令執行,都直接在redis服務器上,省去了網絡傳輸時間,也沒有共享鎖的限制,從性能上而言,是最好的。可是,業務邏輯的lua化,相對而言是比較麻煩的,因此對於追求極限庫存控制的業務,能夠考慮這種作法。
好了,這就是我今天爲你們帶來的六種庫存超限的作法,每種作法都有本身的優缺點,好使的限不住,限的住的性能不行,性能好的又須要引入lua,真心不知道如何選擇了。
聲明:上面六種庫存超限作法,有些屬於本人的推理,線上並未實際用過,若是你貿然使用而未通過壓測,由此形成的損失,找老闆去討論吧。
歡迎工做一到五年的Java工程師朋友們加入Java程序員開發: 854393687 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!