java中有信號量Semaphore控制特定資源的訪問數量,在多進程甚至跨服務器跨網絡的狀況下,咱們能夠用reids來實現。
java的Semaphore,查看源碼可知道經過設置state,每次被獲取state-1,釋放+1,等於0就等待,大於0就喚醒其餘的線程。在redis中沒有辦法去喚醒其餘的等待進程,因此能夠用while循環來判斷是否獲取到信號量。
在while循環中,有如下幾個步驟:java
上面的第五點,是兩個集合的交集計算,先看看如下的例子。redis
如下例子中,setkey1有三個元素,one、two、three,setkey2有兩個元素,one、two,因此取他們的交集就是one和two,對應的score是相加的。segmentfault
@Test public void test() { Transaction trans = JedisUtils.multi(); String dstkey = "dstkey:"; String setkey1 = "setkey1:"; String setkey2 = "setkey2:"; trans.zadd(setkey1, 1, "one"); trans.zadd(setkey1, 2, "two"); trans.zadd(setkey1, 3, "three"); trans.zadd(setkey2, 4, "one"); trans.zadd(setkey2, 5, "two"); // 1+4=5,2+5=7 trans.zinterstore(dstkey, setkey1, setkey2); trans.exec(); Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1); System.out.println(tuples); }
運行結果以下:服務器
[[one,5.0], [two,7.0]]
上面例子的權重很明顯是1:1,若是咱們想設置其餘值,就須要引入ZParams的wight方法。下面的例子權重是7,8,因此計算規則爲17+48=39,27+58=54。網絡
@Test public void testWeight() { Transaction trans = JedisUtils.multi(); ZParams params = new ZParams(); params.weights(7, 8); String dstkey = "dstkey:"; String setkey1 = "setkey1:"; String setkey2 = "setkey2:"; trans.zadd(setkey1, 1, "one"); trans.zadd(setkey1, 2, "two"); trans.zadd(setkey1, 3, "three"); trans.zadd(setkey2, 4, "one"); trans.zadd(setkey2, 5, "two"); // 1*7+4*8=39,2*7+5*8=54 trans.zinterstore(dstkey, params, setkey1, setkey2); trans.exec(); Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1); System.out.println(tuples); }
運行結果以下:dom
[[one,39.0], [two,54.0]]
除了權重也有最大值,最小值,平均值。這邊只演示最大值。下面例子的計算規則爲max(1,4)8=48=32,max(2,5)8=58=40。分佈式
@Test public void testAggregate() { Transaction trans = JedisUtils.multi(); ZParams params = new ZParams(); params.aggregate(ZParams.Aggregate.MAX); params.weights(7, 8); String dstkey = "dstkey:"; String setkey1 = "setkey1:"; String setkey2 = "setkey2:"; trans.zadd(setkey1, 1, "one"); trans.zadd(setkey1, 2, "two"); trans.zadd(setkey1, 3, "three"); trans.zadd(setkey2, 4, "one"); trans.zadd(setkey2, 5, "two"); // max(1,4)*8=4*8=32,max(2,5)*8=5*8=40 trans.zinterstore(dstkey, params, setkey1, setkey2); trans.exec(); Set<Tuple> tuples = JedisUtils.zrangeWithScores(dstkey, 0, -1); System.out.println(tuples); }
運行結果以下:ide
[[one,32.0], [two,40.0]]
獲取鎖參考分佈式鎖的文章ui
static int cnt = 10; static int timeOut = 1000; static int limit = 2; static CountDownLatch countDownLatch = new CountDownLatch(cnt); static String count = "count:"; static String out = "out:"; static String semaphore = "semaphore:"; private static String acquire() { Transaction trans = JedisUtils.multi(); // 移除過時的信號量 trans.zremrangeByScore(out, Long.MIN_VALUE, System.currentTimeMillis() - timeOut); ZParams params = new ZParams(); params.weights(1, 0); // 取交集,若是out的元素過時被移除了,則和semaphore交集後,再賦值給semaphore。 // 相對於semaphore對應的元素也被移除了。 // 若是沒有過時,經過out權重爲0,也不影響semaphore的分數 trans.zinterstore(semaphore, params, semaphore, out); // 獲取自增加的計數器 trans.incr(count); List<Object> results = trans.exec(); // 獲取自增加的值 int counter = ((Long) results.get(results.size() - 1)).intValue(); trans = JedisUtils.multi(); String uuid = UUID.randomUUID().toString(); // 添加到信號量集合 trans.zadd(semaphore, counter, uuid); // 添加到超時集合 trans.zadd(out, System.currentTimeMillis(), uuid); // 查看排名 trans.zrank(semaphore, uuid); results = trans.exec(); // 獲取當前排名 int result = ((Long) results.get(results.size() - 1)).intValue(); // 在限制的數量之內,則返回uuid if (result < limit) { return uuid; } // 沒有在限制的數量之內,說明沒獲取到,則移除上面新增的值 release(uuid); return null; } private static void release(String uuid) { Transaction trans = JedisUtils.multi(); trans.zrem(semaphore, uuid); trans.zrem(out, uuid); trans.exec(); } @Test public void test() throws InterruptedException { // 鎖的時間,達到這個時間就釋放 int lockTime = 1; // 鎖的超時時間,達到這個時間就放棄獲取 long timeOut = 2500; for (int i = 0; i < cnt; i++) { new Thread(new LockThread(i, lockTime, timeOut)).start(); countDownLatch.countDown(); } TimeUnit.SECONDS.sleep(10); } static class LockThread implements Runnable { int lockTime; long timeOut; int idx; public LockThread(int idx, int lockTime, long timeOut) { this.idx = idx; this.lockTime = lockTime; this.timeOut = timeOut; } @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { // 獲取鎖 String lock = TestDistributedLocking.lock(TestDistributedLocking.lockName, lockTime, timeOut); String uuid = acquire(); // 釋放鎖 JedisUtils.del(lock); if (null != uuid) { SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(formatter.format(new Date()) + "---" + idx + ":獲取到了信號量"); try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } release(uuid); break; } try { // 休眠10毫秒 TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }