Redis實現分佈式鎖和等待序列

在集羣下,常常會由於同時處理髮生資源爭搶和併發問題,可是咱們都知道同步鎖synchronizedcasReentrankLock這些鎖的做用範圍都是JVM,說白了在集羣下沒啥用。這時咱們就須要能在多臺JVM之間決定執行順序的鎖了,如今分佈式鎖主要有redisZookeeper實現的,還有數據庫的方式,不過性能太差,也就是須要一個第三方的監管。java

背景

最近在作一個消費Kafka消息的時候發現,因爲線上的消費者過多,常常會遇到,多個機器同時處理一個主鍵類型的數據的狀況發生,若是最後是執行更新操做的話,也就是一個更新順序的問題,可是若是剛好都須要插入數據的時候,會出現主鍵重複的問題。這是生產上不被容許的(由於公司有異常監管的機制,扣分啥的),這是就須要個分佈式鎖了,斟酌後用了Redis的實現方式(由於網上例子多)redis

分析

redis實現的分佈式鎖,實現原理是set方法,由於多個線程同時請求的時候,只有一個線程能夠成功並返回結果,還能夠設置有效期,來避免死鎖的發生,一切都是這麼的完美,不過有個問題,在set的時候,會直接返回結果,成功或者失敗,不具備阻塞效果,須要咱們本身對失敗的線程進程處理,有兩種方式數據庫

  • 丟棄
  • 等待重試 因爲咱們的系統須要這些數據,那麼只能從新嘗試獲取。這裏使用redisList類型實現等待序列的做用

代碼

直接上代碼 其實直接redis的工具類就能夠解決了數組

package com.test
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.List;

/**
 * @desc redis隊列實現方式
 * @anthor 
 * @date 
 **/
public class RedisUcUitl {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    private static final Long RELEASE_SUCCESS = 1L;

    private RedisUcUitl() {

    }
    /**
     * logger
     **/

    /**
     * 存儲redis隊列順序存儲 在隊列首部存入
     *
     * @param key   字節類型
     * @param value 字節類型
     */
    public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {

        return jedis.lpush(key, value);
    
    }

    /**
     * 移除列表中最後一個元素 並將改元素添加入另外一個列表中 ,當列表爲空時 將阻塞鏈接 直到等待超時
     *
     * @param srckey
     * @param dstkey
     * @param timeout 0 表示永不超時
     * @return
     */
    public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {

        return jedis.brpoplpush(srckey, dstkey, timeout);

    }

    /**
     * 返回制定的key,起始位置的redis數據
     * @param redisKey
     * @param start
     * @param end -1 表示到最後
     * @return
     */
    public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
        
        return jedis.lrange(redisKey, start, end);
    }

    /**
     * 刪除key
     * @param redisKey
     */
    public static void delete(Jedis jedis, final byte[] redisKey) {
        
         return jedis.del(redisKey);
    }

    /**
     * 嘗試加鎖
     * @param lockKey key名稱
     * @param requestId 身份標識
     * @param expireTime 過時時間
     * @return
     */
    public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
        String result =  jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        return LOCK_SUCCESS.equals(result);

    }

    /**
     * 釋放鎖
     * @param lockKey key名稱
     * @param requestId 身份標識
     * @return
     */
    public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
        final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        return RELEASE_SUCCESS.equals(result);

    }
}

複製代碼

業務邏輯主要代碼以下bash

// 1.先消耗隊列中的
while(true){
    // 消費隊列
    try{
        // 被放入redis隊列的數據 序列化後的
        byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
        if(bytes == null || bytes.isEmpty()){
           // 隊列中沒數據時退出
            break;
        }
        // 反序列化對象
        Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);
        // 塞入惟一的值 防止被其餘線程誤解鎖
        String requestId = UUID.randomUUID().toString();
        boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
        if(lockGetFlag){
            // 成功獲取鎖 進行業務處理
            //TODO
            // 處理完畢釋放鎖 
            boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);

        }else{
            // 未能得到鎖放入等待隊列
          RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
    
        }
        
    }catch(Exception e){
        break;
    }
    
}

// 2.處理最新接到的數據
// 一樣是走嘗試獲取鎖,獲取不到放入隊列的流程

複製代碼

通常序列化用fastJson之列的就能夠了,這裏用的是JDK自帶的,工具類以下併發

public class ObjectSerialUtil {

    private ObjectSerialUtil() {
//        工具類
    }

    /**
     * 將Object對象序列化爲byte[]
     *
     * @param obj 對象
     * @return byte數組
     * @throws Exception
     */
    public static byte[] objectToBytes(Object obj) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(obj);
        byte[] bytes = bos.toByteArray();
        bos.close();
        oos.close();
        return bytes;
    }


    /**
     * 將bytes數組還原爲對象
     *
     * @param bytes
     * @return
     * @throws Exception
     */
    public static Object bytesToObject(byte[] bytes) {
        try {
            ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bin);
            return ois.readObject();
        } catch (Exception e) {
            throw new BaseException("反序列化出錯!", e);
        }
    }
}
複製代碼
相關文章
相關標籤/搜索