redis - 信號量

java中有信號量Semaphore控制特定資源的訪問數量,在多進程甚至跨服務器跨網絡的狀況下,咱們能夠用reids來實現。
java的Semaphore,查看源碼可知道經過設置state,每次被獲取state-1,釋放+1,等於0就等待,大於0就喚醒其餘的線程。在redis中沒有辦法去喚醒其餘的等待進程,因此能夠用while循環來判斷是否獲取到信號量。
在while循環中,有如下幾個步驟:java

  1. 加鎖,同一時間只能一個進程進行操做。
  2. 引用計數器,可否獲取到信號量,取決於計數器的大小。
  3. 將計數器保存在有序集合中,經過有序集合的排序,來決定是否獲取信號量。
  4. 將每一個獲取信號量元素另外保存其餘有序集合,用於判斷時間是否超時。
  5. 每次獲取信號量以前,優先移除超時的元素。移除超時元素的同時,也要移除信號量有序集合對應的元素。

交集計算

上面的第五點,是兩個集合的交集計算,先看看如下的例子。redis

並集

如下例子中,setkey1有三個元素,one、two、three,setkey2有兩個元素,one、two,因此取他們的交集就是one和two,對應的score是相加的。segmentfault

@Test
public void test() {
    Transaction trans = JedisUtils.multi();
    String dstkey = "dstkey:";
    String setkey1 = "setkey1:";
    String setkey2 = "setkey2:";
    trans.zadd(setkey1, 1, "one");
    trans.zadd(setkey1, 2, "two");
    trans.zadd(setkey1, 3, "three");
    trans.zadd(setkey2, 4, "one");
    trans.zadd(setkey2, 5, "two");
    // 1+4=5,2+5=7
 trans.zinterstore(dstkey, setkey1, setkey2);
    trans.exec();
    Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1);
    System.out.println(tuples);
}

運行結果以下:服務器

[[one,5.0], [two,7.0]]

權重

上面例子的權重很明顯是1:1,若是咱們想設置其餘值,就須要引入ZParams的wight方法。下面的例子權重是7,8,因此計算規則爲17+48=39,27+58=54。網絡

@Test
public void testWeight() {
    Transaction trans = JedisUtils.multi();
    ZParams params = new ZParams();
    params.weights(7, 8);
    String dstkey = "dstkey:";
    String setkey1 = "setkey1:";
    String setkey2 = "setkey2:";
    trans.zadd(setkey1, 1, "one");
    trans.zadd(setkey1, 2, "two");
    trans.zadd(setkey1, 3, "three");
    trans.zadd(setkey2, 4, "one");
    trans.zadd(setkey2, 5, "two");
    // 1*7+4*8=39,2*7+5*8=54
 trans.zinterstore(dstkey, params, setkey1, setkey2);
    trans.exec();
    Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1);
    System.out.println(tuples);
}

運行結果以下:dom

[[one,39.0], [two,54.0]]

最大值/最小值/平均值

除了權重也有最大值,最小值,平均值。這邊只演示最大值。下面例子的計算規則爲max(1,4)8=48=32,max(2,5)8=58=40。分佈式

@Test
public void testAggregate() {
    Transaction trans = JedisUtils.multi();
    ZParams params = new ZParams();
    params.aggregate(ZParams.Aggregate.MAX);
    params.weights(7, 8);
    String dstkey = "dstkey:";
    String setkey1 = "setkey1:";
    String setkey2 = "setkey2:";
    trans.zadd(setkey1, 1, "one");
    trans.zadd(setkey1, 2, "two");
    trans.zadd(setkey1, 3, "three");
    trans.zadd(setkey2, 4, "one");
    trans.zadd(setkey2, 5, "two");
    // max(1,4)*8=4*8=32,max(2,5)*8=5*8=40
 trans.zinterstore(dstkey, params, setkey1, setkey2);
    trans.exec();
    Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1);
    System.out.println(tuples);
}

運行結果以下:ide

[[one,32.0], [two,40.0]]

信號量

獲取鎖參考分佈式鎖的文章ui

static int cnt = 10;
static int timeOut = 1000;
static int limit = 2;
static CountDownLatch countDownLatch = new CountDownLatch(cnt);
static String count = "count:";
static String out = "out:";
static String semaphore = "semaphore:";

private static String acquire() {
    Transaction trans = JedisUtils.multi();
    // 移除過時的信號量
    trans.zremrangeByScore(out, Long.MIN_VALUE, System.currentTimeMillis() - timeOut);
    ZParams params = new ZParams();
    params.weights(1, 0);
    // 取交集,若是out的元素過時被移除了,則和semaphore交集後,再賦值給semaphore。
    // 相對於semaphore對應的元素也被移除了。
    // 若是沒有過時,經過out權重爲0,也不影響semaphore的分數
    trans.zinterstore(semaphore, params, semaphore, out);
    // 獲取自增加的計數器
    trans.incr(count);
    List<Object> results = trans.exec();
    // 獲取自增加的值
    int counter = ((Long) results.get(results.size() - 1)).intValue();

    trans = JedisUtils.multi();
    String uuid = UUID.randomUUID().toString();
    // 添加到信號量集合
    trans.zadd(semaphore, counter, uuid);
    // 添加到超時集合
    trans.zadd(out, System.currentTimeMillis(), uuid);
    // 查看排名
    trans.zrank(semaphore, uuid);
    results = trans.exec();
    // 獲取當前排名
    int result = ((Long) results.get(results.size() - 1)).intValue();
    // 在限制的數量之內,則返回uuid
    if (result < limit) {
        return uuid;
    }
    // 沒有在限制的數量之內,說明沒獲取到,則移除上面新增的值
    release(uuid);
    return null;
}

private static void release(String uuid) {
    Transaction trans = JedisUtils.multi();
    trans.zrem(semaphore, uuid);
    trans.zrem(out, uuid);
    trans.exec();
}

@Test
public void test() throws InterruptedException {
    // 鎖的時間,達到這個時間就釋放
    int lockTime = 1;
    // 鎖的超時時間,達到這個時間就放棄獲取
    long timeOut = 2500;
    for (int i = 0; i < cnt; i++) {
        new Thread(new LockThread(i, lockTime, timeOut)).start();
        countDownLatch.countDown();
    }
    TimeUnit.SECONDS.sleep(10);
}

static class LockThread implements Runnable {
    int lockTime;
    long timeOut;
    int idx;

    public LockThread(int idx, int lockTime, long timeOut) {
        this.idx = idx;
        this.lockTime = lockTime;
        this.timeOut = timeOut;
    }

    @Override
    public void run() {
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            // 獲取鎖
            String lock = TestDistributedLocking.lock(TestDistributedLocking.lockName, lockTime, timeOut);
            String uuid = acquire();
            // 釋放鎖
            JedisUtils.del(lock);
            if (null != uuid) {
                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                System.out.println(formatter.format(new Date()) + "---" + idx + ":獲取到了信號量");
                try {
                    TimeUnit.SECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                release(uuid);
                break;
            }
            try {
                // 休眠10毫秒
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
相關文章
相關標籤/搜索