Redis之上的分佈式Java隊列

最近學習的勢頭大漲,碼了不少乾貨。分享給你們參考學習!

經過優銳課的java學習筆記中,瞭解到關於讓咱們使用Redisson Java框架討論六種不一樣類型的基於Redis的分佈式隊列。java

 

一、在Redis中使用隊列

Redis是一個功能強大的工具,支持從字符串和列表到映射和流的許多不一樣類型的數據結構。 開發人員將Redis用於多種目的,包括用於數據庫,緩存和消息代理。redis

像任何消息代理同樣,Redis須要以正確的順序發送消息。 能夠根據消息的年齡或某些其餘預約義的優先級等級發送消息。數據庫

爲了存儲這些未決消息,Redis開發人員須要隊列數據結構。 Redisson是使用Redis和Java進行分佈式編程的框架,它提供了許多分佈式數據結構(包括隊列)的實現。編程

Redisson經過提供Java API使Redis開發更加容易。 Redisson不須要開發人員學習Redis命令,而是包括全部衆所周知的Java接口,例如Queue和BlockingQueue。 Redisson還處理Redis中繁瑣的幕後工做,例如鏈接管理,故障轉移處理和數據序列化。緩存

二、基於Redis的分佈式Java隊列

Redisson提供了Java中基本隊列數據結構的多個基於Redis的實現,每種實現都有不一樣的功能。 這使能夠選擇最適合目的的隊列類型。服務器

下面,咱們將使用Redisson Java框架討論六種不一樣類型的基於Redis的分佈式隊列。數據結構

三、隊列

Redisson中的RQueue對象實現了java.util.Queue接口。 隊列用於須要從最先的最先的元素開始處理(也稱爲「先進先出」或FIFO)的狀況。架構

與普通Java同樣,可使用peek()方法檢查RQueue的第一個元素,或者使用poll()方法檢查和刪除RQueue的第一個元素:框架

1 RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
2 
3 queue.add(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();

 

四、阻塞隊列

Redisson中的RBlockingQueue對象實現了java.util.BlockingQueue接口。分佈式

BlockingQueues是阻塞線程的隊列,這些線程試圖從空隊列中進行輪詢,或者試圖在已滿的隊列中插入元素。 該線程將被阻塞,直到另外一個線程將一個元素插入到空隊列中,或從完整隊列中輪詢第一個元素爲止。

下面的示例代碼演示了RBlockingQueue的正確實例化和使用。 特別是,可使用參數指定對象將等待線程變得可用的時間來調用poll()方法:

1 RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
2 
3 queue.offer(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();
8 
9 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

在故障轉移或從新鏈接到Redis服務器的過程當中,將自動從新預訂poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()Java方法。

五、BoundedBlockingQueue

Redisson中的RBoundedBlockingQueue對象實現了有界的阻塞隊列結構。 有界阻塞隊列是容量已受限制(即有限)的阻塞隊列。

如下代碼演示瞭如何在Redisson中實例化和使用RBoundedBlockingQueue。 trySetCapacity()方法用於嘗試設置阻塞隊列的容量。 trySetCapacity()返回布爾值「 true」或「 false」,這取決因而否成功設置了容量或是否已經設置了容量:

 1 RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
 2 
 3 queue.trySetCapacity(2);
 4 
 5 queue.offer(new SomeObject(1));
 6 
 7 queue.offer(new SomeObject(2));
 8 
 9 // will be blocked until free space available in queue
10 
11 queue.put(new SomeObject());
12 
13 SomeObject obj = queue.peek();
14 
15 SomeObject someObj = queue.poll();
16 
17 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

六、延遲排隊

Redisson中的RDelayedQueue對象容許Redis中實現延遲隊列。 當使用諸如指數補償的策略將消息傳遞給消費者時,這可能會頗有用。 每次嘗試發送郵件失敗後,重試之間的時間將成倍增長。

在與元素一塊兒指定的延遲以後,延遲隊列中的每一個元素將被轉移到目標隊列。 此目標隊列能夠是實現RQueue接口的任何隊列,例如RBlockingQueue或RBoundedBlockingQueue。

 1 RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
 2 
 3 RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
 4 
 5 // move object to destinationQueue in 10 seconds
 6 
 7 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
 8 
 9 // move object to destinationQueue in 1 minute
10 
11 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

 

在再也不須要隊列以後,經過使用destroy()方法銷燬延遲的隊列是一個好主意。 可是,若是要關閉Redisson,則沒有必要。

七、PriorityQueue

Redisson中的RPriorityQueue對象實現了java.util.Queue接口。 優先級隊列是否是按元素的使用期限而是按照與每一個元素相關聯的優先級排序的隊列。

以下面的示例代碼所示,RPriorityQueue使用比較器對隊列中的元素進行排序:

 1 RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.poll();

 

八、PriorityBlockingQueue

Redisson中的RPriorityBlockingQueue對象結合了RPriorityQueue和RBlockingQueue的功能。 與RPriorityQueue同樣,RPriorityBlockingQueue也使用Comparator對隊列中的元素進行排序。

 1 RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.take();

 

在故障轉移或從新鏈接到Redis服務器的過程當中,將自動從新預訂poll(),pollLastAndOfferFirstTo()和take()Java方法。

 文章分享到這裏,若有不足之處,歡迎補充評論!

抽絲剝繭,細說架構那些事!

相關文章
相關標籤/搜索