在集羣下,常常會由於同時處理髮生資源爭搶和併發問題,可是咱們都知道同步鎖
synchronized
、cas
、ReentrankLock
這些鎖的做用範圍都是JVM
,說白了在集羣下沒啥用。這時咱們就須要能在多臺JVM
之間決定執行順序的鎖了,如今分佈式鎖主要有redis
、Zookeeper
實現的,還有數據庫的方式,不過性能太差,也就是須要一個第三方的監管。java
最近在作一個消費Kafka
消息的時候發現,因爲線上的消費者過多,常常會遇到,多個機器同時處理一個主鍵類型的數據的狀況發生,若是最後是執行更新操做的話,也就是一個更新順序的問題,可是若是剛好都須要插入數據的時候,會出現主鍵重複的問題。這是生產上不被容許的(由於公司有異常監管的機制,扣分啥的),這是就須要個分佈式鎖了,斟酌後用了Redis
的實現方式(由於網上例子多)redis
redis
實現的分佈式鎖,實現原理是set
方法,由於多個線程同時請求的時候,只有一個線程能夠成功並返回結果,還能夠設置有效期,來避免死鎖的發生,一切都是這麼的完美,不過有個問題,在set
的時候,會直接返回結果,成功或者失敗,不具備阻塞效果,須要咱們本身對失敗的線程進程處理,有兩種方式數據庫
redis
的List
類型實現等待序列的做用直接上代碼 其實直接redis的工具類就能夠解決了數組
package com.test
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.List;
/**
* @desc redis隊列實現方式
* @anthor
* @date
**/
public class RedisUcUitl {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
private RedisUcUitl() {
}
/**
* logger
**/
/**
* 存儲redis隊列順序存儲 在隊列首部存入
*
* @param key 字節類型
* @param value 字節類型
*/
public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {
return jedis.lpush(key, value);
}
/**
* 移除列表中最後一個元素 並將改元素添加入另外一個列表中 ,當列表爲空時 將阻塞鏈接 直到等待超時
*
* @param srckey
* @param dstkey
* @param timeout 0 表示永不超時
* @return
*/
public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {
return jedis.brpoplpush(srckey, dstkey, timeout);
}
/**
* 返回制定的key,起始位置的redis數據
* @param redisKey
* @param start
* @param end -1 表示到最後
* @return
*/
public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
return jedis.lrange(redisKey, start, end);
}
/**
* 刪除key
* @param redisKey
*/
public static void delete(Jedis jedis, final byte[] redisKey) {
return jedis.del(redisKey);
}
/**
* 嘗試加鎖
* @param lockKey key名稱
* @param requestId 身份標識
* @param expireTime 過時時間
* @return
*/
public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
return LOCK_SUCCESS.equals(result);
}
/**
* 釋放鎖
* @param lockKey key名稱
* @param requestId 身份標識
* @return
*/
public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return RELEASE_SUCCESS.equals(result);
}
}
複製代碼
業務邏輯主要代碼以下bash
// 1.先消耗隊列中的
while(true){
// 消費隊列
try{
// 被放入redis隊列的數據 序列化後的
byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
if(bytes == null || bytes.isEmpty()){
// 隊列中沒數據時退出
break;
}
// 反序列化對象
Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);
// 塞入惟一的值 防止被其餘線程誤解鎖
String requestId = UUID.randomUUID().toString();
boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
if(lockGetFlag){
// 成功獲取鎖 進行業務處理
//TODO
// 處理完畢釋放鎖
boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);
}else{
// 未能得到鎖放入等待隊列
RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
}
}catch(Exception e){
break;
}
}
// 2.處理最新接到的數據
// 一樣是走嘗試獲取鎖,獲取不到放入隊列的流程
複製代碼
通常序列化用
fastJson
之列的就能夠了,這裏用的是JDK
自帶的,工具類以下併發
public class ObjectSerialUtil {
private ObjectSerialUtil() {
// 工具類
}
/**
* 將Object對象序列化爲byte[]
*
* @param obj 對象
* @return byte數組
* @throws Exception
*/
public static byte[] objectToBytes(Object obj) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
byte[] bytes = bos.toByteArray();
bos.close();
oos.close();
return bytes;
}
/**
* 將bytes數組還原爲對象
*
* @param bytes
* @return
* @throws Exception
*/
public static Object bytesToObject(byte[] bytes) {
try {
ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bin);
return ois.readObject();
} catch (Exception e) {
throw new BaseException("反序列化出錯!", e);
}
}
}
複製代碼