Spring Cloud 分佈式環境下,同一個服務都是部署在不一樣的機器上,這種狀況沒法像單體架構下數據一致性問題採用加鎖就實現數據一致性問題,在高併發狀況下,對於分佈式架構顯然是不合適的,針對這種狀況咱們就須要用到分佈式鎖了。git
場景一:比較敏感的數據好比金額修改,同一時間只能有一我的操做,想象下2我的同時修改金額,一個加金額一個減金額,爲了防止同時操做形成數據不一致,須要鎖,若是是數據庫須要的就是行鎖或表鎖,若是是在集羣裏,多個客戶端同時修改一個共享的數據就須要分佈式鎖。github
場景二:好比多臺機器均可以定時執行某個任務,若是限制任務每次只能被一臺機器執行,不能重複執行,就能夠用分佈式鎖來作標記。redis
場景三:好比秒殺場景,要求併發量很高,那麼同一件商品只能被一個用戶搶到,那麼就可使用分佈式鎖實現。算法
數據庫是單點?搞兩個數據庫,數據以前雙向同步。一旦掛掉快速切換到備庫上。spring
沒有失效時間?只要作一個定時任務,每隔必定時間把數據庫中的超時數據清理一遍。sql
非阻塞的?搞一個while循環,直到insert成功再返回成功。數據庫
非重入的?在數據庫表中加個字段,記錄當前得到鎖的機器的主機信息和線程信息,那麼下次再獲取鎖的時候先查詢數據庫,若是當前機器的主機信息和線程信息在數據庫能夠查到的話,直接把鎖分配給他就能夠了。瀏覽器
大量請求下數據庫每每是系統的瓶頸,大量鏈接,而後sql查詢,幾乎全部時間都浪費到這些上面,因此每每狀況下能內存操做就在內存操做,使用基於內存操做的Redis實現分佈式鎖,也能夠根據需求選擇ZooKeeper 來實現。緩存
經過 Redis 的 Redlock 和 ZooKeeper 來加鎖,性能有了比較大的提高,通常狀況咱們根據實際場景選擇使用。安全
Redis實現分佈式鎖利用 SETNX
和 SETEX
基本命令主要有:
當且僅當 key 不存在,將 key 的值設爲 value ,並返回1;若給定的 key 已經存在,則 SETNX 不作任何動做,並返回0。
分佈式鎖
分佈式鎖其實大白話,本質上要實現的目標(客戶端)在redis中佔一個位置,等到這個客戶試用,別的人進來就必須得等着,等我試用完了,走了,你再來。 感受跟多線程鎖同樣,意思大體是同樣的,多線程是針對單機的,在同一個Jvm中,可是分佈式石鎖,是跨機器的,多個進程不一樣機器上發來得請求,去對同一個數據進行操做。
好比,分佈式架構下的秒殺系統,幾萬人對10個商品進行搶購,10個商品存在redis中,就是表示10個位置,第一我的進來了,商品就剩9個了,第二我的進來就剩8個,在第一我的進來的時候,其餘人必須等到10個商品數量成功減去1以後你才能進來。
這個過程當中第一我的進來的時候還沒操做減1而後異常了,沒有釋放鎖,而後後面人一直等待着,這就是死鎖。真對這種狀況能夠設置超時時間,若是超過10s中仍是沒出來,就讓他超時失效。
redis中提供了 setnx(set if not exists)
指令
> setnx lock:codehole true -- 鎖定
OK
... do something xxxx... 數量減1
> del lock:codehole -- 釋放鎖
(integer) 1 --成功
若是在減1期間發生異常 del 指令沒有被調用 而後就一直等着,鎖永遠不會釋放。
redis Redis 2.8 版本中提供了 setex(set if not exists) 指令 setnx 和 expire 兩個指令構成一個原子操做 給鎖加上一個過時時間
> setex lock:codehole true
OK
> expire lock:codehole 5
... do something xxxx ...
> del lock:codehole
(integer) 1
SETEX 實現原理
經過 SETNX 設置 Key-Value 來得到鎖,隨即進入死循環,每次循環判斷,若是存在 Key 則繼續循環,若是不存在 Key,則跳出循環,當前任務執行完成後,刪除 Key 以釋放鎖。
實現步驟
pom.xml 導入Redis依賴
<!-- redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
添加配置文件 application.yml:
server:
port: 8080
spring:
profiles: dev
data:
redis:
# Redis數據庫索引(默認爲0)
database: 0
# Redis服務器地址
host: 127.0.0.1
# Redis服務器鏈接端口
port: 6379
# Redis服務器鏈接密碼(默認爲空)
password:
全局鎖類
@Data
public class Lock {
/**
* key名
/
private String name;
/*
* value值
*/
private String value;
public Lock(String name, String value) {
this.name = name;
this.value = value;
}
}
分佈式鎖類
@Slf4j
@Component
public class DistributedLockConfig {
/**
* 單個業務持有鎖的時間30s,防止死鎖
/
private final static long LOCK_EXPIRE = 30 * 1000L;
/*
* 默認30ms嘗試一次
/
private final static long LOCK_TRY_INTERVAL = 30L;
/*
* 默認嘗試20s
*/
private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;
private RedisTemplate template;
public void setTemplate(RedisTemplate template) {
this.template = template;
}
/**
* 嘗試獲取全局鎖
*
* @param lock 鎖的名稱
* @return true 獲取成功,false獲取失敗
*/
public boolean tryLock(Lock lock) {
return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
/**
* 嘗試獲取全局鎖
* SETEX:能夠設置超時時間
*
* @param lock 鎖的名稱
* @param timeout 獲取超時時間 單位ms
* @return true 獲取成功,false獲取失敗
*/
public boolean tryLock(Lock lock, long timeout) {
return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
/**
* 嘗試獲取全局鎖
*
* @param lock 鎖的名稱
* @param timeout 獲取鎖的超時時間
* @param tryInterval 多少毫秒嘗試獲取一次
* @return true 獲取成功,false獲取失敗
*/
public boolean tryLock(Lock lock, long timeout, long tryInterval) {
return getLock(lock, timeout, tryInterval, LOCK_EXPIRE);
}
/**
* 嘗試獲取全局鎖
*
* @param lock 鎖的名稱
* @param timeout 獲取鎖的超時時間
* @param tryInterval 多少毫秒嘗試獲取一次
* @param lockExpireTime 鎖的過時
* @return true 獲取成功,false獲取失敗
*/
public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {
return getLock(lock, timeout, tryInterval, lockExpireTime);
}
/**
* 操做redis獲取全局鎖
*
* @param lock 鎖的名稱
* @param timeout 獲取的超時時間
* @param tryInterval 多少ms嘗試一次
* @param lockExpireTime 獲取成功後鎖的過時時間
* @return true 獲取成功,false獲取失敗
*/
public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {
try {
if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) {
return false;
}
long startTime = System.currentTimeMillis();
do {
if (!template.hasKey(lock.getName())) {
ValueOperations<String, String> ops = template.opsForValue();
ops.set(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS);
return true;
} else {
//存在鎖
log.debug("lock is exist!!!");
}
//嘗試超過了設定值以後直接跳出循環
if (System.currentTimeMillis() - startTime > timeout) {
return false;
}
//每隔多長時間嘗試獲取
Thread.sleep(tryInterval);
}
while (template.hasKey(lock.getName()));
} catch (InterruptedException e) {
log.error(e.getMessage());
return false;
}
return false;
}
/**
* 獲取鎖
* SETNX(SET If Not Exists):當且僅當 Key 不存在時,則能夠設置,不然不作任何動做。
*/
public Boolean getLockNoTime(Lock lock) {
if (!StringUtils.isEmpty(lock.getName())) {
return false;
}
// setIfAbsent 底層封裝命令 是 setNX()
boolean falg = template.opsForValue().setIfAbsent(lock.getName(), lock.getValue());
return false;
}
/**
* 釋放鎖
*/
public void releaseLock(Lock lock) {
if (!StringUtils.isEmpty(lock.getName())) {
template.delete(lock.getName());
}
}
}
測試方法
@RequestMapping("test")
public String index() {
distributedLockConfig.setTemplate(redisTemplate);
Lock lock = new Lock("test", "test");
if (distributedLockConfig.tryLock(lock)) {
try {
//爲了演示鎖的效果,這裏睡眠5000毫秒
System.out.println("執行方法");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
distributedLockConfig.releaseLock(lock);
}
return "hello world!";
}
開啓兩個瀏覽器窗口,執行方法,咱們能夠看到兩個瀏覽器在等待執行,當一個返回 hello world! 以後,若是沒超時執行另外一個也會返回hello world! 兩個方法彼此前後返回,說明分佈式鎖執行成功。
可是存在一個問題:
這段方法是先去查詢key是否存在redis中,若是存在走循環,而後根據間隔時間去等待嘗試獲取,若是不存在則進行獲取鎖,若是等待時間超過超時時間返回false。
若是在集羣環境下也會存在問題
假如在哨兵模式中 主節點獲取到鎖以後,數據沒有同步到從節點主節點掛掉了,這樣數據完整性不能保證,另外一個客戶端請求過來,就會一把鎖被兩個客戶端持有,會致使數據一致性出問題。
對此Redis中還提供了另一種實現分佈式鎖的方法 Redlock
Redlock是redis官方提出的實現分佈式鎖管理器的算法。這個算法會比通常的普通方法更加安全可靠。
爲何選擇紅鎖? 在集羣中須要半數以上的節點贊成才能得到鎖,保證了數據的完整性,不會由於主節點數據存在,主節點掛了以後沒有同步到從節點,致使數據丟失。
Redlock 算法
使用場景
對於Redis集羣模式儘可能採用這種分佈式鎖,保證高可用,數據一致性,就使用Redlock 分佈式鎖。
pom.xml 增長依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.7.0</version> </dependency>
獲取鎖後須要處理的邏輯
/**
* 獲取鎖後須要處理的邏輯
*/
public interface AquiredLockWorker<T> {
T invokeAfterLockAquire() throws Exception;
}
獲取鎖管理類
/**
* 獲取鎖管理類
*/
public interface DistributedLocker {
/**
* 獲取鎖
* @param resourceName 鎖的名稱
* @param worker 獲取鎖後的處理類
* @param <T>
* @return 處理完具體的業務邏輯要返回的數據
* @throws UnableToAquireLockException
* @throws Exception
*/
<T> T lock(String resourceName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception;
<T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception;
}
異常類
/**
* 異常類
*/
public class UnableToAquireLockException extends RuntimeException {
public UnableToAquireLockException() {
}
public UnableToAquireLockException(String message) {
super(message);
}
public UnableToAquireLockException(String message, Throwable cause) {
super(message, cause);
}
}
獲取RedissonClient鏈接類
/**
* 獲取RedissonClient鏈接類
*/
@Component
public class RedissonConnector {
RedissonClient redisson;
@PostConstruct
public void init(){
redisson = Redisson.create();
}
public RedissonClient getClient(){
return redisson;
}
}
分佈式鎖實現
@Component
public class RedisLocker implements DistributedLocker{
private final static String LOCKER_PREFIX = "lock:";
@Autowired
RedissonConnector redissonConnector;
@Override
public <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception {
return lock(resourceName, worker, 100);
}
@Override
public <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception {
RedissonClient redisson= redissonConnector.getClient();
RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName);
// Wait for 100 seconds seconds and automatically unlock it after lockTime seconds
boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS);
if (success) {
try {
return worker.invokeAfterLockAquire();
} finally {
lock.unlock();
}
}
throw new UnableToAquireLockException();
}
}
測試方法
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
for (int i = 0; i < 50; i++) {
scheduledExecutorService.execute(new Worker());
}
scheduledExecutorService.shutdown();
//任務
class Worker implements Runnable {
public Worker() {
}
@Override
public void run() {
try {
redisLocker.lock("tizz1100", new AquiredLockWorker<Object>() {
@Override
public Object invokeAfterLockAquire() {
doTask();
return null;
}
});
} catch (Exception e) {
}
}
void doTask() {
System.out.println(Thread.currentThread().getName() + " ---------- " + LocalDateTime.now());
System.out.println(Thread.currentThread().getName() + " start");
Random random = new Random();
int _int = random.nextInt(200);
System.out.println(Thread.currentThread().getName() + " sleep " + _int + "millis");
try {
Thread.sleep(_int);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " end");
}
}
參考資料: