(1)redis如何實現分佈式鎖?java
(2)redis分佈式鎖有哪些優勢?node
(3)redis分佈式鎖有哪些缺點?mysql
(4)redis實現分佈式鎖有沒有現成的輪子可使用?git
Redis(全稱:Remote Dictionary Server 遠程字典服務)是一個開源的使用ANSI C語言編寫、支持網絡、可基於內存亦可持久化的日誌型、Key-Value數據庫,並提供多種語言的API。github
本章咱們將介紹如何基於redis實現分佈式鎖,並把其實現的進化史從頭至尾講明白,以便你們在面試的時候能講清楚redis分佈式鎖的來(忽)龍(悠)去(考)脈(官)。面試
基於前面關於鎖(分佈式鎖)的學習,咱們知道實現鎖的條件有三個:redis
(1)狀態(共享)變量,它是有狀態的,這個狀態的值標識了是否已經被加鎖,在ReentrantLock中是經過控制state的值實現的,在ZookeeperLock中是經過控制子節點來實現的;算法
(2)隊列,它是用來存放排隊的線程,在ReentrantLock中是經過AQS的隊列實現的,在ZookeeperLock中是經過子節點的有序性實現的;spring
(3)喚醒,上一個線程釋放鎖以後喚醒下一個等待的線程,在ReentrantLock中結合AQS的隊列釋放時自動喚醒下一個線程,在ZookeeperLock中是經過其監聽機制來實現的;sql
那麼上面三個條件是否是必要的呢?
其實否則,實現鎖的必要條件只有第一個,對共享變量的控制,若是共享變量的值爲null就給他設置個值(java中可使用CAS操做進程內共享變量),若是共享變量有值則不斷重複檢查其是否有值(重試),待鎖內邏輯執行完畢再把共享變量的值設置回null。
說白了,只要有個地方存這個共享變量就好了,並且要保證整個系統(多個進程)內只有這一份便可。
這也是redis實現分佈式鎖的關鍵【本篇文章由公衆號「彤哥讀源碼」原創】。
既然上面說了實現分佈式鎖只須要對共享變量控制到位便可,那麼redis咱們怎麼控制這個共享變量呢?
首先,咱們知道redis的基礎命令有get/set/del,經過這三個命令能夠實現分佈式鎖嗎?固然能夠。
在獲取鎖以前先get lock_user_1
看這個鎖存不存在,若是不存在則再set lock_user_1 value
,若是存在則等待一段時間後再重試,最後使用完成了再刪除這個鎖del lock_user_1
便可。
可是,這種方案有個問題,若是一開始這個鎖是不存在的,兩個線程去同時get,這個時候返回的都是null(nil),而後這兩個線程都去set,這時候就出問題了,兩個線程均可以set成功,至關於兩個線程都獲取到同一個鎖了。
因此,這種方案不可行!
上面的方案不可行的主要緣由是多個線程同時set都是能夠成功的,因此後來有了setnx
這個命令,它是set if not exist
的縮寫,也就是若是不存在就set。
能夠看到,當重複對同一個key進行setnx的時候,只有第一次是能夠成功的。
所以,方案二就是先使用setnx lock_user_1 value
命令,若是返回1則表示加鎖成功,若是返回0則表示其它線程先執行成功了,那就等待一段時間後重試,最後同樣使用del lock_user_1
釋放鎖。
可是,這種方案也有個問題,若是獲取鎖的這個客戶端斷線了怎麼辦?這個鎖不是一直都不會釋放嗎?是的,是這樣的。
因此,這種方案也不可行!
上面的方案不可行的主要緣由是獲取鎖以後客戶端斷線了沒法釋放鎖的問題,那麼,我在setnx以後立馬再執行setex能夠嗎?
答案是能夠的,2.6.12以前的版本使用redis實現分佈式鎖你們都是這麼玩的。
所以,方案三就是先使用setnx lock_user_1 value
命令拿到鎖,再當即使用setex lock_user_1 30 value
設置過時時間,最後使用del lock_user_1
釋放鎖。
在setnx獲取到鎖以後再執行setex設置過時時間,這樣就很大機率地解決了獲取鎖以後客戶端斷線不會釋放鎖的問題。
可是,這種方案依然有問題,若是setnx以後setex以前這個客戶端就斷線了呢?嗯~,彷佛無解,不過這種機率實在是很是小,因此2.6.12以前的版本你們也都這麼用,幾乎沒出現過什麼問題。
因此,這種方案基本可用,只是不太好!
上面的方案不太好的主要緣由是setnx/setex是兩條獨立的命令,沒法解決前者成功以後客戶端斷線的問題,那麼,把兩條命令合在一塊兒不就好了嗎?
是的,redis官方也意識到這個問題了,因此2.6.12版本給set命令加了一些參數:
SET key value [EX seconds] [PX milliseconds] [NX|XX]
EX,過時時間,單位秒
PX,過時時間,單位毫秒
NX,not exist,若是不存在才設置成功
XX,exist exist?若是存在才設置成功
經過這個命令咱們就不再怕客戶端無端斷線了【本篇文章由公衆號「彤哥讀源碼」原創】。
所以,方案四就是先使用set lock_user_1 value nx ex 30
獲取鎖,獲取鎖以後使用,使用完成了最後del lock_user_1
釋放鎖。
然而,這種方案就沒有問題嗎?
固然有問題,其實這裏的釋放鎖只要簡單地執行del lock_user_1
便可,並不會檢查這個鎖是否是當前客戶端獲取到的。
因此,這種方案還不是很完美。
上面的方案不完美的主要緣由是釋放鎖這裏控制的還不是很到位,那麼有沒有其它方法能夠控制釋放鎖的線程和加鎖的線程必定是同一個客戶端呢?
redis官方給出的方案是這樣的:
// 加鎖 SET resource_name my_random_value NX PX 30000 // 釋放鎖 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
加鎖的時候,設置隨機值,保證這個隨機值只有當前客戶端本身知道。
釋放鎖的時候,執行一段lua腳本,把這段lua腳本當成一個完整的命令,先檢查這個鎖對應的值是否是上面設置的隨機值,若是是再執行del釋放鎖,不然直接返回釋放鎖失敗。
咱們知道,redis是單線程的,因此這段lua腳本中的get和del不會存在併發問題,可是不能在java中先get再del,這樣會當成兩個命令,會有併發問題,lua腳本至關因而一個命令一塊兒傳輸給redis的。
這種方案算是比較完美了,可是還有一點小缺陷,就是這個過時時間設置成多少合適呢?
設置的太小,有可能上一個線程還沒執行完鎖內邏輯,鎖就自動釋放了,致使另外一個線程能夠獲取鎖了,就出現併發問題了;
設置的過大,就要考慮客戶端斷線了,這個鎖要等待很長一段時間。
因此,這裏又衍生出一個新的問題,過時時間我設置小一點,可是快到期了它能自動續期就行了。
上面方案的缺陷是過時時間很差把握,雖然也能夠本身啓一個監聽線程來處理續期,可是代碼實在不太好寫,好在現成的輪子redisson已經幫咱們把這個邏輯都實現好了,咱們拿過來直接用就能夠了。
並且,redisson充分考慮了redis演化過程當中留下的各類問題,單機模式、哨兵模式、集羣模式,它通通都處理好了,無論是從單機進化到集羣仍是從哨兵進化到集羣,都只須要簡單地修改下配置就能夠了,不用改動任何代碼,能夠說是非(業)常(界)方(良)便(心)。
redisson實現的分佈式鎖內部使用的是Redlock算法,這是官方推薦的一種算法。
另外,redisson還提供了不少分佈式對象(分佈式的原子類)、分佈式集合(分佈式的Map/List/Set/Queue等)、分佈式同步器(分佈式的CountDownLatch/Semaphore等)、分佈式鎖(分佈式的公平鎖/非公平鎖/讀寫鎖等),有興趣的能夠去看看,下面貼出連接:
Redlock介紹:https://redis.io/topics/distlock
redisson介紹:https://github.com/redisson/redisson/wiki
由於前面五種方案都已通過時,因此彤哥這裏偷個懶,就不去一一實現的,咱們直接看最後一種redisson的實現方式。
添加spring redis及redisson的依賴,我這裏使用的是springboot 2.1.6版本,springboot 1.x版本的本身注意下,查看上面的github能夠找到方法。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-data-21</artifactId> <version>3.11.0</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.0</version> </dependency>
配置redis的鏈接信息,彤哥這裏給出了三種方式。
spring: redis: # 單機模式 #host: 192.168.1.102 #port: 6379 # password: <your passowrd> timeout: 6000ms # 鏈接超時時長(毫秒) # 哨兵模式 【本篇文章由公衆號「彤哥讀源碼」原創】 # sentinel: # master: <your master> # nodes: 192.168.1.101:6379,192.168.1.102:6379,192.168.1.103:6379 # 集羣模式(三主三從僞集羣) cluster: nodes: - 192.168.1.102:30001 - 192.168.1.102:30002 - 192.168.1.102:30003 - 192.168.1.102:30004 - 192.168.1.102:30005 - 192.168.1.102:30006
定義Locker接口。
public interface Locker { void lock(String key, Runnable command); }
直接使用RedissonClient獲取鎖,注意這裏不須要再單獨配置RedissonClient這個bean,redisson框架會根據配置自動生成RedissonClient的實例,咱們後面說它是怎麼實現的。
@Component public class RedisLocker implements Locker { @Autowired private RedissonClient redissonClient; @Override public void lock(String key, Runnable command) { RLock lock = redissonClient.getLock(key); try { // 【本篇文章由公衆號「彤哥讀源碼」原創】 lock.lock(); command.run(); } finally { lock.unlock(); } } }
啓動1000個線程,每一個線程內部打印一句話,而後睡眠1秒。
@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class RedisLockerTest { @Autowired private Locker locker; @Test public void testRedisLocker() throws IOException { for (int i = 0; i < 1000; i++) { new Thread(()->{ locker.lock("lock", ()-> { // 可重入鎖測試 locker.lock("lock", ()-> { System.out.println(String.format("time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName())); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); }); }, "Thread-"+i).start(); } System.in.read(); } }
運行結果:
能夠看到穩定在1000ms左右打印一句話,說明這個鎖是可用的,並且是可重入的。
time: 1570100167046, threadName: Thread-756 time: 1570100168067, threadName: Thread-670 time: 1570100169080, threadName: Thread-949 time: 1570100170093, threadName: Thread-721 time: 1570100171106, threadName: Thread-937 time: 1570100172124, threadName: Thread-796 time: 1570100173134, threadName: Thread-944 time: 1570100174142, threadName: Thread-974 time: 1570100175167, threadName: Thread-462 time: 1570100176180, threadName: Thread-407 time: 1570100177194, threadName: Thread-983 time: 1570100178206, threadName: Thread-982 ...
剛纔說RedissonClient不須要配置,其實它是在RedissonAutoConfiguration中自動配置的,咱們簡單看下它的源碼,主要看redisson()這個方法:
@Configuration @ConditionalOnClass({Redisson.class, RedisOperations.class}) @AutoConfigureBefore(RedisAutoConfiguration.class) @EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class}) public class RedissonAutoConfiguration { @Autowired private RedissonProperties redissonProperties; @Autowired private RedisProperties redisProperties; @Autowired private ApplicationContext ctx; @Bean @ConditionalOnMissingBean(name = "redisTemplate") public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<Object, Object>(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean @ConditionalOnMissingBean(StringRedisTemplate.class) public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) { StringRedisTemplate template = new StringRedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean @ConditionalOnMissingBean(RedisConnectionFactory.class) public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) { return new RedissonConnectionFactory(redisson); } @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(RedissonClient.class) public RedissonClient redisson() throws IOException { Config config = null; Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster"); Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout"); Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties); int timeout; if(null == timeoutValue){ // 超時未設置則爲0 timeout = 0; }else if (!(timeoutValue instanceof Integer)) { // 轉毫秒 Method millisMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis"); timeout = ((Long) ReflectionUtils.invokeMethod(millisMethod, timeoutValue)).intValue(); } else { timeout = (Integer)timeoutValue; } // 看下是否給redisson單獨寫了一個配置文件 if (redissonProperties.getConfig() != null) { try { InputStream is = getConfigStream(); config = Config.fromJSON(is); } catch (IOException e) { // trying next format try { InputStream is = getConfigStream(); config = Config.fromYAML(is); } catch (IOException e1) { throw new IllegalArgumentException("Can't parse config", e1); } } } else if (redisProperties.getSentinel() != null) { // 若是是哨兵模式 Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes"); Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel()); String[] nodes; // 看sentinel.nodes這個節點是列表配置仍是逗號隔開的配置 if (nodesValue instanceof String) { nodes = convert(Arrays.asList(((String)nodesValue).split(","))); } else { nodes = convert((List<String>)nodesValue); } // 生成哨兵模式的配置 config = new Config(); config.useSentinelServers() .setMasterName(redisProperties.getSentinel().getMaster()) .addSentinelAddress(nodes) .setDatabase(redisProperties.getDatabase()) .setConnectTimeout(timeout) .setPassword(redisProperties.getPassword()); } else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) { // 若是是集羣模式 Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, redisProperties); Method nodesMethod = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes"); // 集羣模式的cluster.nodes是列表配置 List<String> nodesObject = (List) ReflectionUtils.invokeMethod(nodesMethod, clusterObject); String[] nodes = convert(nodesObject); // 生成集羣模式的配置 config = new Config(); config.useClusterServers() .addNodeAddress(nodes) .setConnectTimeout(timeout) .setPassword(redisProperties.getPassword()); } else { // 單機模式的配置 config = new Config(); String prefix = "redis://"; Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl"); // 判斷是否走ssl if (method != null && (Boolean)ReflectionUtils.invokeMethod(method, redisProperties)) { prefix = "rediss://"; } // 生成單機模式的配置 config.useSingleServer() .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort()) .setConnectTimeout(timeout) .setDatabase(redisProperties.getDatabase()) .setPassword(redisProperties.getPassword()); } return Redisson.create(config); } private String[] convert(List<String> nodesObject) { // 將哨兵或集羣模式的nodes轉換成標準配置 List<String> nodes = new ArrayList<String>(nodesObject.size()); for (String node : nodesObject) { if (!node.startsWith("redis://") && !node.startsWith("rediss://")) { nodes.add("redis://" + node); } else { nodes.add(node); } } return nodes.toArray(new String[nodes.size()]); } private InputStream getConfigStream() throws IOException { // 讀取redisson配置文件 Resource resource = ctx.getResource(redissonProperties.getConfig()); InputStream is = resource.getInputStream(); return is; } }
網上查到的資料中不少配置都是多餘的(多是版本問題),看下源碼很清楚,這也是看源碼的一個好處。
(1)redis因爲歷史緣由致使有三種模式:單機、哨兵、集羣;
(2)redis實現分佈式鎖的進化史:set -> setnx -> setnx + setex -> set nx ex(或px) -> set nx ex(或px) + lua script -> redisson;
(3)redis分佈式鎖有現成的輪子redisson可使用;
(4)redisson還提供了不少有用的組件,好比分佈式集合、分佈式同步器、分佈式對象;
redis分佈式鎖有哪些優勢?
答:1)大部分系統都依賴於redis作緩存,不須要額外依賴其它組件(相對於zookeeper來講);
2)redis能夠集羣部署,相對於mysql的單點更可靠;
3)不會佔用mysql的鏈接數,不會增長mysql的壓力;
4)redis社區相對活躍,redisson的實現更是穩定可靠;
5)利用過時機制解決客戶端斷線的問題,雖然不太及時;
6)有現成的輪子redisson可使用,鎖的種類比較齊全;
redis分佈式鎖有哪些缺點?
答:1)集羣模式下會在全部master節點執行加鎖命令,大部分(2N+1)成功了則得到鎖,節點越多,加鎖的過程越慢;
2)高併發狀況下,未得到鎖的線程會睡眠重試,若是同一把鎖競爭很是激烈,會佔用很是多的系統資源;
3)歷史緣由致使的坑挺多的,本身很難實現出來健壯的redis分佈式鎖;
總之,redis分佈式鎖的優勢是大於缺點的,並且社區活躍,這也是咱們大部分系統使用redis做爲分佈式鎖的緣由。
三、死磕 java同步系列之JMM(Java Memory Model)
八、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖
九、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖
十、死磕 java同步系列之ReentrantLock VS synchronized
十一、死磕 java同步系列之ReentrantReadWriteLock源碼解析
1三、死磕 java同步系列之CountDownLatch源碼解析
1五、死磕 java同步系列之StampedLock源碼解析
1六、死磕 java同步系列之CyclicBarrier源碼解析
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。