Redisson 分佈式鎖場景和使用

分佈式鎖的使用場景

在傳統架構單機單服務中,對一些併發場景讀取公共資源時,好比加減庫存、銀行卡消費等,經過同步或者加鎖就能夠進行資源控制。node

這裏主要經過線程鎖實現:經過給方法、代碼塊加鎖。當某個方法或代碼使用鎖,在同一時刻僅有一個線程能夠執行該方法或該代碼段。線程鎖只在同一JVM中有效果,由於線程鎖的實如今根本上是依靠線程之間共享內存實現的,好比Synchronized、Lock等。redis

當應用從單進程多線程拆分爲分佈式系統集羣部署的多進程多線程後,以前的解決方案就沒法知足需求了,業界經常使用的解決方案一般是藉助於一個第三方組件並利用它自身的排他性來達到多進程間的互斥、資源隔離。spring

  • 基於zk的臨時順序節點
  • 基於數據庫的惟一主鍵
  • 基於redis的setnx setex

zk和DB在高併發的場景下可能會有性能問題,經過redis實現分佈式鎖不論是從實現難度和性能方面都比較合適。數據庫

redis分佈式鎖方案比較

在對比redis分佈式鎖方案以前,先列舉下分佈式鎖的特色:編程

  • 互斥性:任何一時刻只有一個線程獲取到鎖
  • 可重入性:同一線程在獲取鎖以後可重複獲取該鎖
  • 鎖超時:設置超時時間,避免死鎖
  • 安全性:鎖只能被持有者刪除

對於Redis分佈式鎖的實現方式,網上的文章大部分都是單純使用setnx命令的基礎上進行一個簡單封裝,且少有文章分析這樣設計的缺陷。在這個博客滿天飛,代碼隨便貼的時代,這樣的局面無形之中給了你們一個假象,就是Redis分佈式鎖只能是以這樣簡單的形式存在,即使有缺陷也只能在業務代碼裏規避。那麼爲何不換位思考一下,即用稍微複雜點的設計來彌補它的不足,從而換取業務上的靈活性呢?再介紹redisson以前,咱們先了解一下單純使用setnx命令封裝的分佈式鎖有哪些不足。安全

  1. 不具有可重入性markdown

    在執行setnx命令時,一般採用業務上指定的名稱做爲key名,用時間或隨機值做爲value來實現。這樣的實現方式不具有追蹤請求線程的能力,同時也不具有統計重入次數的能力,甚至有些實現方式都不具有操做的原子性。當遇到業務上須要在多個地方用到一樣一個鎖的時候,很顯然使用不具備可重入的鎖會很容易發生死鎖的現象。特別是在有遞歸邏輯的場景裏,發生死鎖的概率會更高。Java併發工具包裏的Lock對象和Synchronized語塊都具備可重入性,對於常用這些工具的人來講,每每會很容易忽略setnx的這個缺陷。多線程

  2. 不支持續約架構

    在分佈式環境中,爲了保證鎖的活性和避免程序宕機形成的死鎖現象,分佈式鎖每每會引入一個失效時間,超過這個時間則認爲自動解鎖。這樣的設計前提是開發人員對這個自動解鎖時間的粒度有一個很好的把握,過短了可能會出現任務沒作完鎖就失效了,而太長了在出現程序宕機或業務節點掛掉時,其它節點須要等很長時間才能恢復,而難以保證業務的SLA(正常運行時間保障)。setnx的設計缺少一個延續有效期的續約機制,沒法保證業務可以先工做作完再解鎖,也不能確保在某個程序宕機或業務節點掛掉的時候,其它節點可以很快的恢復業務處理能力。併發

  3. 不具有阻塞的能力

    阻塞性是指的在有競爭的狀況下,未獲取到資源的線程會中止繼續操做,直到成功獲取到資源或取消操做。很顯然setnx命令只提供了互斥的特性,卻沒有提供阻塞的能力。雖然在業務代碼裏能夠引入自旋機制來進行再次獲取,但這僅僅是把本來應該在鎖裏實現的功能搬到了業務代碼裏,經過增長業務代碼的複雜程度來簡化鎖的實現彷佛顯得有點南轅北轍。

Redisson的分佈式鎖在知足以上三個基本要求的同時還增長了線程安全的特色。利用Redis的Hash結構做爲儲存單元,將業務指定的名稱做爲key,將隨機UUID和線程ID做爲field,最後將加鎖的次數做爲value來儲存。同時UUID做爲鎖的實例變量保存在客戶端。將UUID和線程ID做爲標籤在運行多個線程同時使用同一個鎖的實例時,仍然保證了操做的獨立性,知足了線程安全的要求。

擁抱開源,不重複造輪子

redisson介紹

Redisson是架設在Redis基礎上的一個Java駐內存數據網格(In-Memory Data Grid)。充分的利用了Redis鍵值數據庫提供的一系列優點,基於Java實用工具包中經常使用接口,爲使用者提供了一系列具備分佈式特性的經常使用工具類。使得本來做爲協調單機多線程併發程序的工具包得到了協調分佈式多機多線程併發系統的能力,大大下降了設計和研發大規模分佈式系統的難度。同時結合各富特點的分佈式服務,更進一步簡化了分佈式環境中程序相互之間的協做。

jedis客戶端:採用同步編程模型的客戶端,須要隨時確保併發線程數與鏈接數一對一

redisson客戶端:採用Netty異步編程框架,使用了與Redis服務端結構相似的事件循環(EventLoop)式的線程池,並結合鏈接池的方式彈性管理鏈接。最終作到了使用少許的鏈接就能夠知足對大量線程的要求,從根本上緩解線程之間的競爭關係。

項目集成配置

引入依賴

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.10.6</version>
</dependency>
複製代碼

可經過JSON、YAML和Spring XML文件配置集羣模式,這裏經過yaml方式進行配置,增長文件redisson.yaml,以下:

clusterServersConfig:
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  reconnectionTimeout: 3000
  failedAttempts: 3
  password: null
  subscriptionsPerConnection: 5
  clientName: null
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  slaveSubscriptionConnectionMinimumIdleSize: 1
  slaveSubscriptionConnectionPoolSize: 50
  slaveConnectionMinimumIdleSize: 32
  slaveConnectionPoolSize: 64
  masterConnectionMinimumIdleSize: 32
  masterConnectionPoolSize: 64
  readMode: "SLAVE"
  nodeAddresses:
  - "redis://127.0.0.1:7004"
  - "redis://127.0.0.1:7001"
  - "redis://127.0.0.1:7000"
  scanInterval: 1000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.JsonJacksonCodec> {}
"transportMode":"NIO"
複製代碼

在application.yml中增長redis配置

spring.redis.redisson:
    config: classpath:redisson.yaml
複製代碼

結合apollo的配置

因爲apollo會在客戶端保存一份配置文件的備份,與classpath下的redisson.yaml是兩個文件,因此沒法經過以上方式加載,經過查看RedissonAutoConfiguration中RedissonClient的初始化代碼,經過自定義Properties類,加載配置文件生成org.redisson.config.config進行RedissonClient的建立。

@Slf4j
@ConfigurationProperties(
    prefix = "spring.redisson"
)
@Data
public class RedissonProperties implements InitializingBean {

    private SentinelServersConfig sentinelServersConfig;
    private MasterSlaveServersConfig masterSlaveServersConfig;
    private SingleServerConfig singleServerConfig;
    private Map<String,Object> clusterServersConfig;
    private ReplicatedServersConfig replicatedServersConfig;
    private ConnectionManager connectionManager;
    private int threads = 16;
    private int nettyThreads = 32;
    private JSONObject codec;
    private ExecutorService executor;
    private boolean referenceEnabled = true;
    private TransportMode transportMode;
    private EventLoopGroup eventLoopGroup;
    private long lockWatchdogTimeout;
    private boolean keepPubSubOrder;
    private boolean decodeInExecutor;
    private boolean useScriptCache;
    private int minCleanUpDelay;
    private int maxCleanUpDelay;
    private JSONObject addressResolverGroupFactory;

    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,String> nodeAddressesMap = (Map<String,String>)clusterServersConfig.get("nodeAddresses");
        clusterServersConfig.put("nodeAddresses",nodeAddressesMap.values());
        log.debug(JSON.toJSONString(this));
    }
}
複製代碼
public class YamlPropertySourceFactory implements PropertySourceFactory {

    @Override
    public PropertySource<?> createPropertySource(@Nullable String name, EncodedResource resource) throws IOException {
        Properties propertiesFromYaml = loadYamlIntoProperties(resource);
        String sourceName = name != null ? name : resource.getResource().getFilename();
        return new PropertiesPropertySource(sourceName, propertiesFromYaml);
    }

    private Properties loadYamlIntoProperties(EncodedResource resource) throws FileNotFoundException {
        try {
            YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean();
            factory.setResources(resource.getResource());
            factory.afterPropertiesSet();
            return factory.getObject();
        } catch (IllegalStateException e) {
            // for ignoreResourceNotFound
            Throwable cause = e.getCause();
            if (cause instanceof FileNotFoundException)
                throw (FileNotFoundException) e.getCause();
            throw e;
        }
    }
}
複製代碼
@Configuration
@PropertySource(factory = YamlPropertySourceFactory.class, value = "classpath:redisson.yaml")
@EnableConfigurationProperties({RedissonProperties.class})
public class RedissonConfig {

    @Autowired
    private RedissonProperties redissonProperties;

    @Bean(
            destroyMethod = "shutdown"
    )
    public RedissonClient redisson() throws IOException {
        String redissonCondig = JSON.toJSONString(redissonProperties);
        Config config = Config.fromJSON(redissonCondig);
        return Redisson.create(config);
    }
}
複製代碼

分佈式鎖使用案例

普通加鎖

RLock lock = redisson.getLock("anyLock");
lock.lock();
複製代碼

設置等待時間、釋放時間加鎖

// 加鎖之後10秒鐘自動解鎖
// 無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);

// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}
複製代碼

公平鎖:當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程

RLock fairLock = redisson.getFairLock("anyLock");
// 最多見的使用方法
fairLock.lock();
複製代碼

聯合鎖

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 爲加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
複製代碼

讀寫鎖(讀寫互斥,讀讀不互斥)

redisson.getReadWriteLock("anyRWLock");
RLock lock = rwlock.readLock();
// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
// 或
RLock lock = rwlock.writeLock();
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
複製代碼

閉鎖

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其餘線程或其餘JVM裏
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
複製代碼

redisson分佈式鎖還能夠應用於保證冪等,控制mq消費等。

源碼分析

加鎖lua腳本

//KEYS[1]:鎖的名稱,getLock(name)的name
//ARGV[2]:id(UUID)+ ":" +threadId
//ARGV[1]:leaseTime

//檢查鎖是否存在,若是不存在,則直接設置hash對象和超時時間
if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('hset', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
//若是鎖存在,則判斷hash裏的ARGV[2]是否與本線程相同,相同則重入,更新失效時間
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
//返回鎖的失效時間
return redis.call('pttl', KEYS[1]);
複製代碼

解鎖lua腳本

//KEYS[1]:鎖的名稱,getLock(name)的name
//KEYS[2]:"redisson_lock__channel"+":{" + KEYS[1] + "}" || "redisson_lock__channel"+":" + KEYS[1]
//ARGV[1]:0L
//ARGV[2]:leaseTime
//ARGV[3]:id(UUID)+ ":" +threadId

//判斷是不是加鎖的線程,不是則直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
    return nil;
end; 
//獲取加鎖次數-1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
//若是加鎖次數>0,則更新失效時間後返回;反之則刪除鎖後發佈解鎖消息
if (counter > 0) then 
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1; 
end; 
return nil;
複製代碼

watchdog

//異步執行加鎖lua
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e == null) {
        if (ttlRemaining) {
            //新建子線程定時設置失效時間
            this.scheduleExpirationRenewal(threadId);
        }
    }
});
private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        //建立一個HashedWheelTimeout實例,利用HashedWheelTimer實現定時任務
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                RedissonLock.this.renewExpiration();
                            }
                        });
                    }
                }
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

//定時任務執行的腳本
//KEYS[1]:鎖的名稱,getLock(name)的name
//ARGV[1]:leaseTime
//ARGV[2]:id(UUID)+ ":" +threadId
//判斷是不是加鎖的線程,更新失效時間
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return 1;
end; 
return 0;

複製代碼
相關文章
相關標籤/搜索