在分佈式系統中,以前單一的用synchronized或lock已經不適用了。分佈式鎖通常有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分佈式鎖;3. 基於ZooKeeper的分佈式鎖。本博客討論爲第二種 java
代碼實現redis
現象:模擬多個線程去運算同一個數據 能夠發現數據計算是不規則的spring
package com.zhcx.dispatch.test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadTest { private static int max = 10; public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<15;i++){ threadPool.submit(new Runnable() { public void run() { int current = getMax(); if(current>0){ max--; } } }); System.out.println(max); } } private static Integer getMax(){ return max; } }
輸出值爲混亂的數據庫
建立redis鎖類 dom
package com.zhcx.dispatch.test; import java.util.Collections; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @Component public class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private static JedisPool pool = null; public static Jedis getSource(){ JedisPoolConfig config = new JedisPoolConfig(); // 設置最大鏈接數 config.setMaxTotal(200); // 設置最大空閒數 config.setMaxIdle(8); // 設置最大等待時間 config.setMaxWaitMillis(1000 * 100); // 在borrow一個jedis實例時,是否須要驗證,若爲true,則全部jedis實例均是可用的 config.setTestOnBorrow(true); pool = new JedisPool(config, "127.0.0.0", 6379, 3000); return pool.getResource(); } /** * * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean getLock(Jedis jedis,String lockKey,String requestId, int expireTime){ String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if(LOCK_SUCCESS.equals(result)){ return true; } return false; } /** * 釋放分佈式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } }
能夠看到,咱們加鎖就一行代碼:jedis.set(String key, String value, String nxxx, String expx, int time)
,這個set()方法一共有五個形參:分佈式
第一個爲key,咱們使用key來當鎖,由於key是惟一的。spa
第二個爲value,咱們傳的是requestId,經過給value賦值爲requestId,咱們就知道這把鎖是哪一個請求加的了,在解鎖的時候就能夠有依據。requestId可使用UUID.randomUUID().toString()
方法生成。線程
第三個爲nxxx,這個參數咱們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,咱們進行set操做;若key已經存在,則不作任何操做;3d
第四個爲expx,這個參數咱們傳的是PX,意思是咱們要給這個key加一個過時的設置,具體時間由第五個參數決定。code
第五個爲time,與第四個參數相呼應,表明key的過時時間。
解鎖代碼,寫了簡單的Lua腳本代碼。咱們將Lua代碼傳到方法裏,並使參數KEYS[1]賦值爲lockKey,ARGV[1]賦值爲requestId。eval()方法是將Lua代碼交給Redis服務端執行。
簡單來講,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,而且直到eval命令執行完成,Redis纔會執行其餘命令。
咱們加入到以前的線程中
jedis.eval()
package com.zhcx.dispatch.test; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class ThreadTest { private static int max = 10; synchronized public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<15;i++){ String requestId = UUID.randomUUID().toString(); String lockKey = "lock:"; RedisLock.getLock(RedisLock.getSource(), lockKey, requestId, 30); threadPool.submit(new Runnable() { public void run() { int current = getMax(); if(current>0){ max--; } } }); RedisLock.releaseDistributedLock(RedisLock.getSource(), lockKey, requestId); System.out.println(max); } } private static Integer getMax(){ return max; } }
查看輸出: