針對項目中使用的分佈式鎖進行簡單的示例配置以及源碼解析,並列舉源碼中使用到的一些基礎知識點,可是沒有對redisson中使用到的netty知識進行解析。node
本篇主要是對如下幾個方面進行了探索redis
Maven配置服務器
1併發 2異步 3分佈式 4ide 5源碼分析 6post 7測試 8 9 10 |
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>2.2.12</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.0</version> </dependency> |
RedissonLock簡單示例
redission支持4種鏈接redis方式,分別爲單機、主從、Sentinel、Cluster 集羣,項目中使用的鏈接方式是Sentinel。
redis服務器不在本地的同窗請注意權限問題。
Sentinel配置
1 2 3 |
Config config = new Config(); config.useSentinelServers().addSentinelAddress("127.0.0.1:6479", "127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0); RedissonClient redisson = Redisson.create(config); |
簡單使用
1 2 3 4 5 6 7 8 9 10 |
RLock lock = redisson.getLock("test_lock"); try{ boolean isLock=lock.tryLock(); if(isLock){ doBusiness(); } }catch(exception e){ }finally{ lock.unlock(); } |
源碼中使用到的Redis命令
分佈式鎖主要須要如下redis命令,這裏列舉一下。在源碼分析部分能夠繼續參照命令的操做含義。
源碼中使用到的lua腳本語義
Redisson源碼中,執行redis命令的是lua腳本,其中主要用到以下幾個概念。
須要注意的是,在redis執行lua腳本時,至關於一個redis級別的鎖,不能執行其餘操做,相似於原子操做,也是redisson實現的一個關鍵點。
另外,若是lua腳本執行過程當中出現了異常或者redis服務器直接宕掉了,執行redis的根據日誌回覆的命令,會將腳本中已經執行的命令在日誌中刪除。
源碼分析
RLOCK結構
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public interface RLock extends Lock, RExpirable { void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException; boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; void lock(long leaseTime, TimeUnit unit); void forceUnlock(); boolean isLocked(); boolean isHeldByCurrentThread(); int getHoldCount(); Future<Void> unlockAsync(); Future<Boolean> tryLockAsync(); Future<Void> lockAsync(); Future<Void> lockAsync(long leaseTime, TimeUnit unit); Future<Boolean> tryLockAsync(long waitTime, TimeUnit unit); Future<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit); } |
該接口主要繼承了Lock接口, 並擴展了部分方法, 好比:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設置鎖的過時時間, 若是超過leaseTime尚未解鎖的話, redis就強制解鎖. leaseTime的默認時間是30s
RedissonLock獲取鎖 tryLock源碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } |
其中
KEYS[1] 表示的是 getName() ,表明的是鎖名 test_lock
ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s
ARGV[2] 表示的是 getLockName(threadId) 表明的是 id:threadId 用鎖對象id+線程id, 表示當前訪問線程,用於區分不一樣服務器上的線程.
逐句分析:
1 2 3 4 5 |
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; |
1 2 3 4 5 |
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; |
1 |
return redis.call('pttl', KEYS[1]); |
RedissonLock解鎖 unlock源碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public void unlock() { Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } } |
其中
KEYS[1] 表是的是getName() 表明鎖名test_lock
KEYS[2] 表示getChanelName() 表示的是發佈訂閱過程當中使用的Chanel
ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際表明的是數字 0,表明解鎖消息
ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s
ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當前鎖id+線程id
語義分析:
1 2 3 4 |
if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end; |
1 2 3 |
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; |
1 2 3 4 5 6 7 8 9 |
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; |
1 |
return nil; |
1 2 3 4 |
if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } |
RedissonLock強制解鎖源碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public void forceUnlock() { get(forceUnlockAsync()); } Future<Boolean> forceUnlockAsync() { cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage); } |
總結
這裏只是簡單的一個redisson分佈式鎖的測試用例,並分析了執行lua腳本這部分,若是要繼續分析執行結束以後的操做,須要進行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。