Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了不少Zookeeper客戶端很是底層的細節開發工做,包括鏈接重連、反覆註冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句「Guava is to Java that Curator to Zookeeper」給Curator予高度評價。node
Zookeeper名字的由來是比較有趣的,下面的片斷摘抄自《從PAXOS到ZOOKEEPER分佈式一致性原理與實踐》一書:算法
Zookeeper最先起源於雅虎的研究院的一個研究小組。在當時,研究人員發現,在雅虎內部不少大型的系統須要依賴一個相似的系統進行分佈式協調,可是這些系統每每存在分佈式單點問題。因此雅虎的開發人員就試圖開發一個通用的無單點問題的分佈式協調框架。在立項初期,考慮到不少項目都是用動物的名字來命名的(例如著名的Pig項目),雅虎的工程師但願給這個項目也取一個動物的名字。時任研究院的首席科學家Raghu Ramakrishnan開玩笑說:再這樣下去,咱們這兒就變成動物園了。此話一出,你們紛紛表示就叫動物園管理員吧——由於各個以動物命名的分佈式組件放在一塊兒,雅虎的整個分佈式系統看上去就像一個大型的動物園了,而Zookeeper正好用來進行分佈式環境的協調——因而,Zookeeper的名字由此誕生了。數據庫
Curator無疑是Zookeeper客戶端中的瑞士軍刀,它譯做"館長"或者''管理者'',不知道是否是開發小組有意而爲之,筆者猜想有可能這樣命名的緣由是說明Curator就是Zookeeper的館長(腦洞有點大:Curator就是動物園的園長)。apache
curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操做,例如重試策略等
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等。api
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
注意:使用curator的版本:2.12.0,對應Zookeeper的版本爲:3.4.x,若是跨版本會有兼容性問題,頗有可能致使節點操做失敗。緩存
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient( connectionInfo, 5000, 3000, retryPolicy);
newClient靜態工廠方法包含四個主要參數:服務器
參數名 | 說明 |
---|---|
connectionString | 服務器列表,格式host1:port1,host2:port2,... |
retryPolicy | 重試策略,內建有四種重試策略,也能夠自行實現RetryPolicy接口 |
sessionTimeoutMs | 會話超時時間,單位毫秒,默認60000ms |
connectionTimeoutMs | 鏈接建立超時時間,單位毫秒,默認60000ms |
以sleepMsBetweenRetries的間隔重連,直到超過maxElapsedTimeMs的時間設置網絡
指定重連次數session
重連一次,簡單粗暴併發
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
時間間隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
核心參數變爲流式設置,
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectionInfo) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build();
爲了實現不一樣的Zookeeper業務之間的隔離,須要爲每一個業務分配一個獨立的命名空間(NameSpace),即指定一個Zookeeper的根路徑(官方術語:爲Zookeeper添加「Chroot」特性)。例如(下面的例子)當客戶端指定了獨立命名空間爲「/base」,那麼該客戶端對Zookeeper上的數據節點的操做都是基於該目錄進行的。經過設置Chroot能夠將客戶端應用與Zookeeper服務端的一課子樹相對應,在多個應用共用一個Zookeeper集羣的場景下,這對於實現不一樣應用之間的相互隔離十分有意義。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectionInfo) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .namespace("base") .build();
當建立會話成功,獲得client的實例而後能夠直接調用其start( )方法:
client.start();
Zookeeper的節點建立模式:
建立一個節點,初始內容爲空
client.create().forPath("path");
注意:若是沒有設置節點屬性,節點建立模式默認爲持久化節點,內容默認爲空
建立一個節點,附帶初始化內容
client.create().forPath("path","init".getBytes());
建立一個節點,指定建立模式(臨時節點),內容爲空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
建立一個節點,指定建立模式(臨時節點),附帶初始化內容
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
建立一個節點,指定建立模式(臨時節點),附帶初始化內容,而且自動遞歸建立父節點
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
這個creatingParentContainersIfNeeded()接口很是有用,由於通常狀況開發人員在建立一個子節點必須判斷它的父節點是否存在,若是不存在直接建立會拋出NoNodeException,使用creatingParentContainersIfNeeded()以後Curator可以自動遞歸建立全部所需的父節點。
刪除一個節點
client.delete().forPath("path");
注意,此方法只能刪除葉子節點,不然會拋出異常。
刪除一個節點,而且遞歸刪除其全部的子節點
client.delete().deletingChildrenIfNeeded().forPath("path");
刪除一個節點,強制指定版本進行刪除
client.delete().withVersion(10086).forPath("path");
刪除一個節點,強制保證刪除
client.delete().guaranteed().forPath("path");
guaranteed()接口是一個保障措施,只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操做,直到刪除節點成功。
注意:上面的多個流式接口是能夠自由組合的,例如:
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
讀取一個節點的數據內容
client.getData().forPath("path");
注意,此方法返的返回值是byte[ ];
讀取一個節點的數據內容,同時獲取到該節點的stat
[zk: 10.15.107.155:3352(CONNECTED) 111] stat /c/hfx/sw1 cZxid = 0x2000007df ctime = Thu Mar 14 11:08:28 CST 2013 mZxid = 0x200000a87 mtime = Thu Mar 14 15:00:34 CST 2013 pZxid = 0x2000007df cversion = 0 dataVersion = 608 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 0
Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("path");
更新一個節點的數據內容
client.setData().forPath("path","data".getBytes());
注意:該接口會返回一個Stat實例
更新一個節點的數據內容,強制指定版本進行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
client.checkExists().forPath("path");
注意:該方法返回一個Stat實例,用於檢查ZNode是否存在的操做. 能夠調用額外的方法(監控或者後臺處理)並在最後調用forPath( )指定要操做的ZNode
client.getChildren().forPath("path");
注意:該方法的返回值爲List<String>,得到ZNode的子節點Path列表。 能夠調用額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後調用forPath()指定要操做的父ZNode。
CuratorFramework的實例包含inTransaction( )接口方法,調用此方法開啓一個ZooKeeper事務. 能夠複合create, setData, check, and/or delete 等操做而後調用commit()做爲一個原子操做提交。一個例子以下:
client.inTransaction().check().forPath("path") .and() .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes()) .and() .setData().withVersion(10086).forPath("path","data2".getBytes()) .and() .commit();
上面提到的建立、刪除、更新、讀取等方法都是同步的,Curator提供異步接口,引入了BackgroundCallback接口用於處理異步接口調用以後服務端返回的結果信息。BackgroundCallback接口中一個重要的回調值爲CuratorEvent,裏面包含事件類型、響應嗎和節點的詳細信息。
CuratorEventType
事件類型 | 對應CuratorFramework實例的方法 |
---|---|
CREATE | #create() |
DELETE | #delete() |
EXISTS | #checkExists() |
GET_DATA | #getData() |
SET_DATA | #setData() |
CHILDREN | #getChildren() |
SYNC | #sync(String,Object) |
GET_ACL | #getACL() |
SET_ACL | #setACL() |
WATCHED | #Watcher(Watcher) |
CLOSING | #close() |
響應碼(#getResultCode())
響應碼 | 意義 |
---|---|
0 | OK,即調用成功 |
-4 | ConnectionLoss,即客戶端與服務端斷開鏈接 |
-110 | NodeExists,即節點已經存在 |
-112 | SessionExpired,即會話過時 |
在程序中,咱們能夠經過如下API來進行異步操做。
Backgroundable<T> public T inBackground(); public T inBackground(Object context); public T inBackground(BackgroundCallback callback); public T inBackground(BackgroundCallback callback, Object context); public T inBackground(BackgroundCallback callback, Executor executor); public T inBackground(BackgroundCallback callback, Object context, Executor executor);
在這些構造方法中,咱們重點來關注下executor這個參數。在ZooKeeper中,全部異步通知事件處理都是由EventThread這個線程來處理的——EventThread線程用於串行處理全部的事件通知。EventThread的「串行處理機制」在絕大部分應用場景下可以保證對事件處理的順序性,帶這個特性也有其弊端,就是一旦碰上一個複雜的處理單元,就會消耗過長的處理時間,從而影響對其餘事件的處理。所以,在上面的inBackground接口中,容許用戶傳入一個Executor實例,這樣一來,就能夠把那些比較複雜的的事件處理放到一個專門的線程池中去,如Executors.newFixedThreadPool(2)。
Executor executor = Executors.newFixedThreadPool(2); client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode())); },executor) .forPath("path");
注意:若是#inBackground()方法不指定executor,那麼會默認使用Curator的EventThread去進行異步處理。
提醒:首先你必須添加curator-recipes依賴,下文僅僅對recipes一些特性的使用進行解釋和舉例,不打算進行源碼級別的探討
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
重要提醒:強烈推薦使用ConnectionStateListener監控鏈接的狀態,當鏈接狀態爲LOST,curator-recipes下的全部Api將會失效或者過時,儘管後面全部的例子都沒有使用到ConnectionStateListener。
Zookeeper原生支持經過註冊Watcher來進行事件監聽,可是開發者須要反覆註冊(Watcher只能單次註冊單次使用)。Cache是Curator中對事件監聽的包裝,能夠看做是對事件監聽的本地緩存視圖,可以自動爲開發者處理反覆註冊監聽。Curator提供了三種Watcher(Cache)來監聽結點的變化。
Path Cache用來監控一個ZNode的子節點. 當一個子節點增長, 更新,刪除時, Path Cache會改變它的狀態, 會包含最新的子節點, 子節點的數據和狀態,而狀態的更變將經過PathChildrenCacheListener通知。
實際使用時會涉及到四個類:
經過下面的構造函數建立Path Cache:
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
想使用cache,必須調用它的start
方法,使用完後調用close
方法。 能夠設置StartMode來實現啓動的模式,
StartMode有下面幾種:
start()
以前會調用rebuild()
。public void addListener(PathChildrenCacheListener listener)
能夠增長listener監聽緩存的變化。
getCurrentData()
方法返回一個List<ChildData>
對象,能夠遍歷全部的子節點。
設置/更新、移除實際上是使用client (CuratorFramework)來操做, 不經過PathChildrenCache操做:
public class PathCacheDemo { private static final String PATH = "/example/pathCache"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); PathChildrenCache cache = new PathChildrenCache(client, PATH, true); cache.start(); PathChildrenCacheListener cacheListener = (client1, event) -> { System.out.println("事件類型:" + event.getType()); if (null != event.getData()) { System.out.println("節點數據:" + event.getData().getPath() + " = " + new String(event.getData().getData())); } }; cache.getListenable().addListener(cacheListener); client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes()); Thread.sleep(10); client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes()); Thread.sleep(10); client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes()); Thread.sleep(10); for (ChildData data : cache.getCurrentData()) { System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData())); } client.delete().forPath("/example/pathCache/test01"); Thread.sleep(10); client.delete().forPath("/example/pathCache/test02"); Thread.sleep(1000 * 5); cache.close(); client.close(); System.out.println("OK!"); } }
注意:若是new PathChildrenCache(client, PATH, true)中的參數cacheData值設置爲false,則示例中的event.getData().getData()、data.getData()將返回null,cache將不會緩存節點數據。
注意:示例中的Thread.sleep(10)能夠註釋掉,可是註釋後事件監聽的觸發次數會不全,這可能與PathCache的實現原理有關,不能太過頻繁的觸發事件!
Node Cache與Path Cache相似,Node Cache只是監聽某一個特定的節點。它涉及到下面的三個類:
NodeCache
- Node Cache實現類NodeCacheListener
- 節點監聽器ChildData
- 節點數據注意:使用cache,依然要調用它的start()
方法,使用完後調用close()
方法。
getCurrentData()將獲得節點當前的狀態,經過它的狀態能夠獲得當前的值。
public class NodeCacheDemo { private static final String PATH = "/example/cache"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); client.create().creatingParentsIfNeeded().forPath(PATH); final NodeCache cache = new NodeCache(client, PATH); NodeCacheListener listener = () -> { ChildData data = cache.getCurrentData(); if (null != data) { System.out.println("節點數據:" + new String(cache.getCurrentData().getData())); } else { System.out.println("節點被刪除!"); } }; cache.getListenable().addListener(listener); cache.start(); client.setData().forPath(PATH, "01".getBytes()); Thread.sleep(100); client.setData().forPath(PATH, "02".getBytes()); Thread.sleep(100); client.delete().deletingChildrenIfNeeded().forPath(PATH); Thread.sleep(1000 * 2); cache.close(); client.close(); System.out.println("OK!"); } }
注意:示例中的Thread.sleep(10)能夠註釋,可是註釋後事件監聽的觸發次數會不全,這可能與NodeCache的實現原理有關,不能太過頻繁的觸發事件!
注意:NodeCache只能監聽一個節點的狀態變化。
Tree Cache能夠監控整個樹上的全部節點,相似於PathCache和NodeCache的組合,主要涉及到下面四個類:
public class TreeCacheDemo { private static final String PATH = "/example/cache"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); client.create().creatingParentsIfNeeded().forPath(PATH); TreeCache cache = new TreeCache(client, PATH); TreeCacheListener listener = (client1, event) -> System.out.println("事件類型:" + event.getType() + " | 路徑:" + (null != event.getData() ? event.getData().getPath() : null)); cache.getListenable().addListener(listener); cache.start(); client.setData().forPath(PATH, "01".getBytes()); Thread.sleep(100); client.setData().forPath(PATH, "02".getBytes()); Thread.sleep(100); client.delete().deletingChildrenIfNeeded().forPath(PATH); Thread.sleep(1000 * 2); cache.close(); client.close(); System.out.println("OK!"); } }
注意:在此示例中沒有使用Thread.sleep(10),可是事件觸發次數也是正常的。
注意:TreeCache在初始化(調用start()
方法)的時候會回調TreeCacheListener
實例一個事TreeCacheEvent,而回調的TreeCacheEvent對象的Type爲INITIALIZED,ChildData爲null,此時event.getData().getPath()
頗有可能致使空指針異常,這裏應該主動處理並避免這種狀況。
在分佈式計算中, leader elections是很重要的一個功能, 這個選舉過程是這樣子的: 指派一個進程做爲組織者,將任務分發給各節點。 在任務開始前, 哪一個節點都不知道誰是leader(領導者)或者coordinator(協調者). 當選舉算法開始執行後, 每一個節點最終會獲得一個惟一的節點做爲任務leader. 除此以外, 選舉還常常會發生在leader意外宕機的狀況下,新的leader要被選舉出來。
在zookeeper集羣中,leader負責寫操做,而後經過Zab協議實現follower的同步,leader或者follower均可以處理讀操做。
Curator 有兩種leader選舉的recipe,分別是LeaderSelector和LeaderLatch。
前者是全部存活的客戶端不間斷的輪流作Leader,大同社會。後者是一旦選舉出Leader,除非有客戶端掛掉從新觸發選舉,不然不會交出領導權。某黨?
LeaderLatch有兩個構造函數:
public LeaderLatch(CuratorFramework client, String latchPath) public LeaderLatch(CuratorFramework client, String latchPath, String id)
LeaderLatch的啓動:
leaderLatch.start( );
一旦啓動,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,而後其中一個最終會被選舉爲leader,能夠經過hasLeadership
方法查看LeaderLatch實例是否leader:
leaderLatch.hasLeadership( ); //返回true說明當前實例是leader
相似JDK的CountDownLatch, LeaderLatch在請求成爲leadership會block(阻塞),一旦不使用LeaderLatch了,必須調用close
方法。 若是它是leader,會釋放leadership, 其它的參與者將會選舉一個leader。
public void await() throws InterruptedException,EOFException /*Causes the current thread to wait until this instance acquires leadership unless the thread is interrupted or closed.*/ public boolean await(long timeout,TimeUnit unit)throws InterruptedException
異常處理: LeaderLatch實例能夠增長ConnectionStateListener來監聽網絡鏈接問題。 當 SUSPENDED 或 LOST 時, leader再也不認爲本身仍是leader。當LOST後鏈接重連後RECONNECTED,LeaderLatch會刪除先前的ZNode而後從新建立一個。LeaderLatch用戶必須考慮致使leadership丟失的鏈接問題。 強烈推薦你使用ConnectionStateListener。
一個LeaderLatch的使用例子:
public class LeaderLatchDemo extends BaseConnectionInfo { protected static String PATH = "/francis/leader"; private static final int CLIENT_QTY = 10; public static void main(String[] args) throws Exception { List<CuratorFramework> clients = Lists.newArrayList(); List<LeaderLatch> examples = Lists.newArrayList(); TestingServer server=new TestingServer(); try { for (int i = 0; i < CLIENT_QTY; i++) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3)); clients.add(client); LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i); latch.addListener(new LeaderLatchListener() { @Override public void isLeader() { // TODO Auto-generated method stub System.out.println("I am Leader"); } @Override public void notLeader() { // TODO Auto-generated method stub System.out.println("I am not Leader"); } }); examples.add(latch); client.start(); latch.start(); } Thread.sleep(10000); LeaderLatch currentLeader = null; for (LeaderLatch latch : examples) { if (latch.hasLeadership()) { currentLeader = latch; } } System.out.println("current leader is " + currentLeader.getId()); System.out.println("release the leader " + currentLeader.getId()); currentLeader.close(); Thread.sleep(5000); for (LeaderLatch latch : examples) { if (latch.hasLeadership()) { currentLeader = latch; } } System.out.println("current leader is " + currentLeader.getId()); System.out.println("release the leader " + currentLeader.getId()); } finally { for (LeaderLatch latch : examples) { if (null != latch.getState()) CloseableUtils.closeQuietly(latch); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } } } }
能夠添加test module的依賴方便進行測試,不須要啓動真實的zookeeper服務端:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>2.12.0</version> </dependency>
首先咱們建立了10個LeaderLatch,啓動後它們中的一個會被選舉爲leader。 由於選舉會花費一些時間,start後並不能立刻就獲得leader。
經過hasLeadership
查看本身是不是leader, 若是是的話返回true。
能夠經過.getLeader().getId()
能夠獲得當前的leader的ID。
只能經過close
釋放當前的領導權。
await
是一個阻塞方法, 嘗試獲取leader地位,可是未必能上位。
LeaderSelector使用的時候主要涉及下面幾個類:
核心類是LeaderSelector,它的構造函數以下:
public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener) public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)
相似LeaderLatch,LeaderSelector必須start
: leaderSelector.start();
一旦啓動,當實例取得領導權時你的listener的takeLeadership()
方法被調用。而takeLeadership()方法只有領導權被釋放時才返回。 當你再也不使用LeaderSelector實例時,應該調用它的close方法。
異常處理 LeaderSelectorListener類繼承ConnectionStateListener。LeaderSelector必須當心鏈接狀態的改變。若是實例成爲leader, 它應該響應SUSPENDED 或 LOST。 當 SUSPENDED 狀態出現時, 實例必須假定在從新鏈接成功以前它可能再也不是leader了。 若是LOST狀態出現, 實例再也不是leader, takeLeadership方法返回。
重要: 推薦處理方式是當收到SUSPENDED 或 LOST時拋出CancelLeadershipException異常.。這會致使LeaderSelector實例中斷並取消執行takeLeadership方法的異常.。這很是重要, 你必須考慮擴展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推薦的處理邏輯。
下面的一個例子摘抄自官方:
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable { private final String name; private final LeaderSelector leaderSelector; private final AtomicInteger leaderCount = new AtomicInteger(); public LeaderSelectorAdapter(CuratorFramework client, String path, String name) { this.name = name; leaderSelector = new LeaderSelector(client, path, this); leaderSelector.autoRequeue(); } public void start() throws IOException { leaderSelector.start(); } @Override public void close() throws IOException { leaderSelector.close(); } @Override public void takeLeadership(CuratorFramework client) throws Exception { final int waitSeconds = (int) (5 * Math.random()) + 1; System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); try { Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); } catch (InterruptedException e) { System.err.println(name + " was interrupted."); Thread.currentThread().interrupt(); } finally { System.out.println(name + " relinquishing leadership.\n"); } } }
leaderSelector.autoRequeue();
保證在此實例釋放領導權以後還可能得到領導權。 在這裏咱們使用AtomicInteger來記錄此client得到領導權的次數, 它是」fair」, 每一個client有平等的機會得到領導權。
public class LeaderSelectorDemo { protected static String PATH = "/francis/leader"; private static final int CLIENT_QTY = 10; public static void main(String[] args) throws Exception { List<CuratorFramework> clients = Lists.newArrayList(); List<LeaderSelectorAdapter> examples = Lists.newArrayList(); TestingServer server = new TestingServer(); try { for (int i = 0; i < CLIENT_QTY; i++) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3)); clients.add(client); LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i); examples.add(selectorAdapter); client.start(); selectorAdapter.start(); } System.out.println("Press enter/return to quit\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("Shutting down..."); for (LeaderSelectorAdapter exampleClient : examples) { CloseableUtils.closeQuietly(exampleClient); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } CloseableUtils.closeQuietly(server); } } }
close()
方法纔會釋放領導權,而對於LeaderSelector,經過
LeaderSelectorListener
能夠對領導權進行控制, 在適當的時候釋放領導權,這樣每一個節點都有可能得到領導權。從而,LeaderSelector具備更好的靈活性和可控性,建議有LeaderElection應用場景下優先使用LeaderSelector。
提醒:
1.推薦使用ConnectionStateListener監控鏈接的狀態,由於當鏈接LOST時你再也不擁有鎖
2.分佈式的鎖全局同步, 這意味着任何一個時間點不會有兩個客戶端都擁有相同的鎖。
Shared意味着鎖是全局可見的, 客戶端均可以請求鎖。 Reentrant和JDK的ReentrantLock相似,便可重入, 意味着同一個客戶端在擁有鎖的同時,能夠屢次獲取,不會被阻塞。 它是由類InterProcessMutex
來實現。 它的構造函數爲:
public InterProcessMutex(CuratorFramework client, String path)
經過acquire()
得到鎖,並提供超時機制:
public void acquire() Acquire the mutex - blocking until it's available. Note: the same thread can call acquire re-entrantly. Each call to acquire must be balanced by a call to release() public boolean acquire(long time,TimeUnit unit) Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release() Parameters: time - time to wait unit - time unit Returns: true if the mutex was acquired, false if not
經過release()
方法釋放鎖。 InterProcessMutex 實例能夠重用。
Revoking ZooKeeper recipes wiki定義了可協商的撤銷機制。 爲了撤銷mutex, 調用下面的方法:
public void makeRevocable(RevocationListener<T> listener) 將鎖設爲可撤銷的. 當別的進程或線程想讓你釋放鎖時Listener會被調用。 Parameters: listener - the listener
若是你請求撤銷當前的鎖, 調用attemptRevoke()
方法,注意鎖釋放時RevocationListener
將會回調。
public static void attemptRevoke(CuratorFramework client,String path) throws Exception Utility to mark a lock for revocation. Assuming that the lock has been registered with a RevocationListener, it will get called and the lock should be released. Note, however, that revocation is cooperative. Parameters: client - the client path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()
二次提醒:錯誤處理 仍是強烈推薦你使用ConnectionStateListener
處理鏈接狀態的改變。 當鏈接LOST時你再也不擁有鎖。
首先讓咱們建立一個模擬的共享資源, 這個資源指望只能單線程的訪問,不然會有併發問題。
public class FakeLimitedResource { private final AtomicBoolean inUse = new AtomicBoolean(false); public void use() throws InterruptedException { // 真實環境中咱們會在這裏訪問/維護一個共享的資源 //這個例子在使用鎖的狀況下不會非法併發異常IllegalStateException //可是在無鎖的狀況因爲sleep了一段時間,很容易拋出異常 if (!inUse.compareAndSet(false, true)) { throw new IllegalStateException("Needs to be used by one client at a time"); } try { Thread.sleep((long) (3 * Math.random())); } finally { inUse.set(false); } } }
而後建立一個InterProcessMutexDemo
類, 它負責請求鎖, 使用資源,釋放鎖這樣一個完整的訪問過程。
public class InterProcessMutexDemo { private InterProcessMutex lock; private final FakeLimitedResource resource; private final String clientName; public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; this.lock = new InterProcessMutex(client, lockPath); } public void doWork(long time, TimeUnit unit) throws Exception { if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " could not acquire the lock"); } try { System.out.println(clientName + " get the lock"); resource.use(); //access resource exclusively } finally { System.out.println(clientName + " releasing the lock"); lock.release(); // always release the lock in a finally block } } private static final int QTY = 5; private static final int REPETITIONS = QTY * 10; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { final FakeLimitedResource resource = new FakeLimitedResource(); ExecutorService service = Executors.newFixedThreadPool(QTY); final TestingServer server = new TestingServer(); try { for (int i = 0; i < QTY; ++i) { final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); try { client.start(); final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index); for (int j = 0; j < REPETITIONS; ++j) { example.doWork(10, TimeUnit.SECONDS); } } catch (Throwable e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } finally { CloseableUtils.closeQuietly(server); } } }
代碼也很簡單,生成10個client, 每一個client重複執行10次 請求鎖–訪問資源–釋放鎖的過程。每一個client都在獨立的線程中。 結果能夠看到,鎖是隨機的被每一個實例排他性的使用。
既然是可重用的,你能夠在一個線程中屢次調用acquire()
,在線程擁有鎖時它老是返回true。
你不該該在多個線程中用同一個InterProcessMutex
, 你能夠在每一個線程中都生成一個新的InterProcessMutex實例,它們的path都同樣,這樣它們能夠共享同一個鎖。
這個鎖和上面的InterProcessMutex
相比,就是少了Reentrant的功能,也就意味着它不能在同一個線程中重入。這個類是InterProcessSemaphoreMutex
,使用方法和InterProcessMutex
相似
public class InterProcessSemaphoreMutexDemo { private InterProcessSemaphoreMutex lock; private final FakeLimitedResource resource; private final String clientName; public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; this.lock = new InterProcessSemaphoreMutex(client, lockPath); } public void doWork(long time, TimeUnit unit) throws Exception { if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得互斥鎖"); } System.out.println(clientName + " 已獲取到互斥鎖"); if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得互斥鎖"); } System.out.println(clientName + " 再次獲取到互斥鎖"); try { System.out.println(clientName + " get the lock"); resource.use(); //access resource exclusively } finally { System.out.println(clientName + " releasing the lock"); lock.release(); // always release the lock in a finally block lock.release(); // 獲取鎖幾回 釋放鎖也要幾回 } } private static final int QTY = 5; private static final int REPETITIONS = QTY * 10; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { final FakeLimitedResource resource = new FakeLimitedResource(); ExecutorService service = Executors.newFixedThreadPool(QTY); final TestingServer server = new TestingServer(); try { for (int i = 0; i < QTY; ++i) { final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); try { client.start(); final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index); for (int j = 0; j < REPETITIONS; ++j) { example.doWork(10, TimeUnit.SECONDS); } } catch (Throwable e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } finally { CloseableUtils.closeQuietly(server); } Thread.sleep(Integer.MAX_VALUE); } }
運行後發現,有且只有一個client成功獲取第一個鎖(第一個acquire()
方法返回true),而後它本身阻塞在第二個acquire()
方法,獲取第二個鎖超時;其餘全部的客戶端都阻塞在第一個acquire()
方法超時而且拋出異常。
這樣也就驗證了InterProcessSemaphoreMutex
實現的鎖是不可重入的。
相似JDK的ReentrantReadWriteLock。一個讀寫鎖管理一對相關的鎖。一個負責讀操做,另一個負責寫操做。讀操做在寫鎖沒被使用時可同時由多個進程使用,而寫鎖在使用時不容許讀(阻塞)。
此鎖是可重入的。一個擁有寫鎖的線程可重入讀鎖,可是讀鎖卻不能進入寫鎖。這也意味着寫鎖能夠降級成讀鎖, 好比請求寫鎖 --->請求讀鎖--->釋放讀鎖 ---->釋放寫鎖。從讀鎖升級成寫鎖是不行的。
可重入讀寫鎖主要由兩個類實現:InterProcessReadWriteLock
、InterProcessMutex
。使用時首先建立一個InterProcessReadWriteLock
實例,而後再根據你的需求獲得讀鎖或者寫鎖,讀寫鎖的類型是InterProcessMutex
。
public class ReentrantReadWriteLockDemo { private final InterProcessReadWriteLock lock; private final InterProcessMutex readLock; private final InterProcessMutex writeLock; private final FakeLimitedResource resource; private final String clientName; public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessReadWriteLock(client, lockPath); readLock = lock.readLock(); writeLock = lock.writeLock(); } public void doWork(long time, TimeUnit unit) throws Exception { // 注意只能先獲得寫鎖再獲得讀鎖,不能反過來!!! if (!writeLock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得寫鎖"); } System.out.println(clientName + " 已獲得寫鎖"); if (!readLock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能獲得讀鎖"); } System.out.println(clientName + " 已獲得讀鎖"); try { resource.use(); // 使用資源 Thread.sleep(1000); } finally { System.out.println(clientName + " 釋放讀寫鎖"); readLock.release(); writeLock.release(); } } private static final int QTY = 5; private static final int REPETITIONS = QTY ; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { final FakeLimitedResource resource = new FakeLimitedResource(); ExecutorService service = Executors.newFixedThreadPool(QTY); final TestingServer server = new TestingServer(); try { for (int i = 0; i < QTY; ++i) { final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); try { client.start(); final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index); for (int j = 0; j < REPETITIONS; ++j) { example.doWork(10, TimeUnit.SECONDS); } } catch (Throwable e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } finally { CloseableUtils.closeQuietly(server); } } }
一個計數的信號量相似JDK的Semaphore。 JDK中Semaphore維護的一組許可(permits),而Curator中稱之爲租約(Lease)。 有兩種方式能夠決定semaphore的最大租約數。第一種方式是用戶給定path而且指定最大LeaseSize。第二種方式用戶給定path而且使用SharedCountReader
類。若是不使用SharedCountReader, 必須保證全部實例在多進程中使用相同的(最大)租約數量,不然有可能出現A進程中的實例持有最大租約數量爲10,可是在B進程中持有的最大租約數量爲20,此時租約的意義就失效了。
此次調用acquire()
會返回一個租約對象。 客戶端必須在finally中close這些租約對象,不然這些租約會丟失掉。 可是, 可是,若是客戶端session因爲某種緣由好比crash丟掉, 那麼這些客戶端持有的租約會自動close, 這樣其它客戶端能夠繼續使用這些租約。 租約還能夠經過下面的方式返還:
public void returnAll(Collection<Lease> leases) public void returnLease(Lease lease)
注意你能夠一次性請求多個租約,若是Semaphore當前的租約不夠,則請求線程會被阻塞。 同時還提供了超時的重載方法。
public Lease acquire() public Collection<Lease> acquire(int qty) public Lease acquire(long time, TimeUnit unit) public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
Shared Semaphore使用的主要類包括下面幾個:
InterProcessSemaphoreV2
Lease
SharedCountReader
public class InterProcessSemaphoreDemo { private static final int MAX_LEASE = 10; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE); Collection<Lease> leases = semaphore.acquire(5); System.out.println("get " + leases.size() + " leases"); Lease lease = semaphore.acquire(); System.out.println("get another lease"); resource.use(); Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS); System.out.println("Should timeout and acquire return " + leases2); System.out.println("return one lease"); semaphore.returnLease(lease); System.out.println("return another 5 leases"); semaphore.returnAll(leases); } } }
首先咱們先得到了5個租約, 最後咱們把它還給了semaphore。 接着請求了一個租約,由於semaphore還有5個租約,因此請求能夠知足,返回一個租約,還剩4個租約。 而後再請求一個租約,由於租約不夠,阻塞到超時,仍是沒能知足,返回結果爲null(租約不足會阻塞到超時,而後返回null,不會主動拋出異常;若是不設置超時時間,會一致阻塞)。
上面說講的鎖都是公平鎖(fair)。 總ZooKeeper的角度看, 每一個客戶端都按照請求的順序得到鎖,不存在非公平的搶佔的狀況。
Multi Shared Lock是一個鎖的容器。 當調用acquire()
, 全部的鎖都會被acquire()
,若是請求失敗,全部的鎖都會被release。 一樣調用release時全部的鎖都被release(失敗被忽略)。 基本上,它就是組鎖的表明,在它上面的請求釋放操做都會傳遞給它包含的全部的鎖。
主要涉及兩個類:
InterProcessMultiLock
InterProcessLock
它的構造函數須要包含的鎖的集合,或者一組ZooKeeper的path。
public InterProcessMultiLock(List<InterProcessLock> locks) public InterProcessMultiLock(CuratorFramework client, List<String> paths)
用法和Shared Lock相同。
public class MultiSharedLockDemo { private static final String PATH1 = "/examples/locks1"; private static final String PATH2 = "/examples/locks2"; public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessLock lock1 = new InterProcessMutex(client, PATH1); InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2); InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2)); if (!lock.acquire(10, TimeUnit.SECONDS)) { throw new IllegalStateException("could not acquire the lock"); } System.out.println("has got all lock"); System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess()); System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess()); try { resource.use(); //access resource exclusively } finally { System.out.println("releasing the lock"); lock.release(); // always release the lock in a finally block } System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess()); System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess()); } } }
新建一個InterProcessMultiLock
, 包含一個重入鎖和一個非重入鎖。 調用acquire()
後能夠看到線程同時擁有了這兩個鎖。 調用release()
看到這兩個鎖都被釋放了。
最後再重申一次, 強烈推薦使用ConnectionStateListener監控鏈接的狀態,當鏈接狀態爲LOST,鎖將會丟失。
顧名思義,計數器是用來計數的, 利用ZooKeeper能夠實現一個集羣共享的計數器。 只要使用相同的path就能夠獲得最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數(SharedCount
),一個用long來計數(DistributedAtomicLong
)。
這個類使用int類型來計數。 主要涉及三個類。
SharedCount
表明計數器, 能夠爲它增長一個SharedCountListener
,當計數器改變時此Listener能夠監聽到改變的事件,而SharedCountReader
能夠讀取到最新的值, 包括字面值和帶版本信息的值VersionedValue。
public class SharedCounterDemo implements SharedCountListener { private static final int QTY = 5; private static final String PATH = "/examples/counter"; public static void main(String[] args) throws IOException, Exception { final Random rand = new Random(); SharedCounterDemo example = new SharedCounterDemo(); try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); SharedCount baseCount = new SharedCount(client, PATH, 0); baseCount.addListener(example); baseCount.start(); List<SharedCount> examples = Lists.newArrayList(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final SharedCount count = new SharedCount(client, PATH, 0); examples.add(count); Callable<Void> task = () -> { count.start(); Thread.sleep(rand.nextInt(10000)); System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))); return null; }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); for (int i = 0; i < QTY; ++i) { examples.get(i).close(); } baseCount.close(); } Thread.sleep(Integer.MAX_VALUE); } @Override public void stateChanged(CuratorFramework arg0, ConnectionState arg1) { System.out.println("State changed: " + arg1.toString()); } @Override public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { System.out.println("Counter's value is changed to " + newCount); } }
在這個例子中,咱們使用baseCount
來監聽計數值(addListener
方法來添加SharedCountListener )。 任意的SharedCount, 只要使用相同的path,均可以獲得這個計數值。 而後咱們使用5個線程爲計數值增長一個10之內的隨機數。相同的path的SharedCount對計數值進行更改,將會回調給baseCount
的SharedCountListener。
count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))
這裏咱們使用trySetCount
去設置計數器。 第一個參數提供當前的VersionedValue,若是期間其它client更新了此計數值, 你的更新可能不成功, 可是這時你的client更新了最新的值,因此失敗了你能夠嘗試再更新一次。 而setCount
是強制更新計數器的值。
注意計數器必須start
,使用完以後必須調用close
關閉它。
強烈推薦使用ConnectionStateListener
。 在本例中SharedCountListener
擴展ConnectionStateListener
。
再看一個Long類型的計數器。 除了計數的範圍比SharedCount
大了以外, 它首先嚐試使用樂觀鎖的方式設置計數器, 若是不成功(好比期間計數器已經被其它client更新了), 它使用InterProcessMutex
方式來更新計數值。
能夠從它的內部實現DistributedAtomicValue.trySet()
中看出:
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception { MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false); tryOptimistic(result, makeValue); if ( !result.succeeded() && (mutex != null) ) { tryWithMutex(result, makeValue); } return result; }
此計數器有一系列的操做:
你必須檢查返回結果的succeeded()
, 它表明此操做是否成功。 若是操做成功, preValue()
表明操做前的值, postValue()
表明操做後的值。
public class DistributedAtomicLongDemo { private static final int QTY = 5; private static final String PATH = "/examples/counter"; public static void main(String[] args) throws IOException, Exception { List<DistributedAtomicLong> examples = Lists.newArrayList(); try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10)); examples.add(count); Callable<Void> task = () -> { try { AtomicValue<Long> value = count.increment(); System.out.println("succeed: " + value.succeeded()); if (value.succeeded()) System.out.println("Increment: from " + value.preValue() + " to " + value.postValue()); } catch (Exception e) { e.printStackTrace(); } return null; }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); Thread.sleep(Integer.MAX_VALUE); } } }
使用Curator也能夠簡化Ephemeral Node (臨時節點)的操做。Curator也提供ZK Recipe的分佈式隊列實現。 利用ZK的 PERSISTENTS_EQUENTIAL節點, 能夠保證放入到隊列中的項目是按照順序排隊的。 若是單一的消費者從隊列中取數據, 那麼它是先入先出的,這也是隊列的特色。 若是你嚴格要求順序,你就的使用單一的消費者,可使用Leader選舉只讓Leader做爲惟一的消費者。
可是, 根據Netflix的Curator做者所說, ZooKeeper真心不適合作Queue,或者說ZK沒有實現一個好的Queue, 緣由有五:
儘管如此, Curator仍是建立了各類Queue的實現。 若是Queue的數據量不太多,數據量不太大的狀況下,酌情考慮,仍是可使用的。
DistributedQueue是最普通的一種隊列。 它設計如下四個類:
QueueConsumer是消費者,它能夠接收隊列的數據。處理隊列中的數據的代碼邏輯能夠放在QueueConsumer.consumeMessage()中。
正常狀況下先將消息從隊列中移除,再交給消費者消費。但這是兩個步驟,不是原子的。能夠調用Builder的lockPath()消費者加鎖,當消費者消費數據時持有鎖,這樣其它消費者不能消費此消息。若是消費失敗或者進程死掉,消息能夠交給其它進程。這會帶來一點性能的損失。最好仍是單消費者模式使用隊列。
public class DistributedQueueDemo { private static final String PATH = "/example/queue"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); clientA.start(); CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); clientB.start(); DistributedQueue<String> queueA; QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH); queueA = builderA.buildQueue(); queueA.start(); DistributedQueue<String> queueB; QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH); queueB = builderB.buildQueue(); queueB.start(); for (int i = 0; i < 100; i++) { queueA.put(" test-A-" + i); Thread.sleep(10); queueB.put(" test-B-" + i); } Thread.sleep(1000 * 10);// 等待消息消費完成 queueB.close(); queueA.close(); clientB.close(); clientA.close(); System.out.println("OK!"); } /** * 隊列消息序列化實現類 */ private static QueueSerializer<String> createQueueSerializer() { return new QueueSerializer<String>() { @Override public byte[] serialize(String item) { return item.getBytes(); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } }; } /** * 定義隊列消費者 */ private static QueueConsumer<String> createQueueConsumer(final String name) { return new QueueConsumer<String>() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println("鏈接狀態改變: " + newState.name()); } @Override public void consumeMessage(String message) throws Exception { System.out.println("消費消息(" + name + "): " + message); } }; } }
例子中定義了兩個分佈式隊列和兩個消費者,由於PATH是相同的,會存在消費者搶佔消費消息的狀況。
DistributedIdQueue和上面的隊列相似,可是能夠爲隊列中的每個元素設置一個ID。 能夠經過ID把隊列中任意的元素移除。 它涉及幾個類:
經過下面方法建立:
builder.buildIdQueue()
放入元素時:
queue.put(aMessage, messageId);
移除元素時:
int numberRemoved = queue.remove(messageId);
在這個例子中, 有些元素尚未被消費者消費前就移除了,這樣消費者不會收到刪除的消息。
public class DistributedIdQueueDemo { private static final String PATH = "/example/queue"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = null; DistributedIdQueue<String> queue = null; try { client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name())); client.start(); QueueConsumer<String> consumer = createQueueConsumer(); QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH); queue = builder.buildIdQueue(); queue.start(); for (int i = 0; i < 10; i++) { queue.put(" test-" + i, "Id" + i); Thread.sleep((long) (15 * Math.random())); queue.remove("Id" + i); } Thread.sleep(20000); } catch (Exception ex) { } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } } private static QueueSerializer<String> createQueueSerializer() { return new QueueSerializer<String>() { @Override public byte[] serialize(String item) { return item.getBytes(); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } }; } private static QueueConsumer<String> createQueueConsumer() { return new QueueConsumer<String>() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println("connection new state: " + newState.name()); } @Override public void consumeMessage(String message) throws Exception { System.out.println("consume one message: " + message); } }; } }
優先級隊列對隊列中的元素按照優先級進行排序。 Priority越小, 元素越靠前, 越先被消費掉。 它涉及下面幾個類:
經過builder.buildPriorityQueue(minItemsBeforeRefresh)方法建立。 當優先級隊列獲得元素增刪消息時,它會暫停處理當前的元素隊列,而後刷新隊列。minItemsBeforeRefresh指定刷新前當前活動的隊列的最小數量。 主要設置你的程序能夠容忍的不排序的最小值。
放入隊列時須要指定優先級:
queue.put(aMessage, priority);
例子:
public class DistributedPriorityQueueDemo { private static final String PATH = "/example/queue"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = null; DistributedPriorityQueue<String> queue = null; try { client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name())); client.start(); QueueConsumer<String> consumer = createQueueConsumer(); QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH); queue = builder.buildPriorityQueue(0); queue.start(); for (int i = 0; i < 10; i++) { int priority = (int) (Math.random() * 100); System.out.println("test-" + i + " priority:" + priority); queue.put("test-" + i, priority); Thread.sleep((long) (50 * Math.random())); } Thread.sleep(20000); } catch (Exception ex) { } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } } private static QueueSerializer<String> createQueueSerializer() { return new QueueSerializer<String>() { @Override public byte[] serialize(String item) { return item.getBytes(); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } }; } private static QueueConsumer<String> createQueueConsumer() { return new QueueConsumer<String>() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println("connection new state: " + newState.name()); } @Override public void consumeMessage(String message) throws Exception { Thread.sleep(1000); System.out.println("consume one message: " + message); } }; } }
有時候你可能會有錯覺,優先級設置並無起效。那是由於優先級是對於隊列積壓的元素而言,若是消費速度過快有可能出如今後一個元素入隊操做以前前一個元素已經被消費,這種狀況下DistributedPriorityQueue會退化爲DistributedQueue。
JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了相似的功能, 元素有個delay值, 消費者隔一段時間才能收到元素。 涉及到下面四個類。
經過下面的語句建立:
QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();
放入元素時能夠指定delayUntilEpoch
:
queue.put(aMessage, delayUntilEpoch);
注意delayUntilEpoch
不是離如今的一個時間間隔, 好比20毫秒,而是將來的一個時間戳,如 System.currentTimeMillis() + 10秒。 若是delayUntilEpoch的時間已通過去,消息會馬上被消費者接收。
public class DistributedDelayQueueDemo { private static final String PATH = "/example/queue"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = null; DistributedDelayQueue<String> queue = null; try { client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name())); client.start(); QueueConsumer<String> consumer = createQueueConsumer(); QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH); queue = builder.buildDelayQueue(); queue.start(); for (int i = 0; i < 10; i++) { queue.put("test-" + i, System.currentTimeMillis() + 10000); } System.out.println(new Date().getTime() + ": already put all items"); Thread.sleep(20000); } catch (Exception ex) { } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } } private static QueueSerializer<String> createQueueSerializer() { return new QueueSerializer<String>() { @Override public byte[] serialize(String item) { return item.getBytes(); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } }; } private static QueueConsumer<String> createQueueConsumer() { return new QueueConsumer<String>() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println("connection new state: " + newState.name()); } @Override public void consumeMessage(String message) throws Exception { System.out.println(new Date().getTime() + ": consume one message: " + message); } }; } }
前面雖然實現了各類隊列,可是你注意到沒有,這些隊列並無實現相似JDK同樣的接口。 SimpleDistributedQueue
提供了和JDK基本一致的接口(可是沒有實現Queue接口)。 建立很簡單:
public SimpleDistributedQueue(CuratorFramework client,String path)
增長元素:
public boolean offer(byte[] data) throws Exception
刪除元素:
public byte[] take() throws Exception
另外還提供了其它方法:
public byte[] peek() throws Exception public byte[] poll(long timeout, TimeUnit unit) throws Exception public byte[] poll() throws Exception public byte[] remove() throws Exception public byte[] element() throws Exception
沒有add
方法, 多了take
方法。
take
方法在成功返回以前會被阻塞。 而poll
方法在隊列爲空時直接返回null。
public class SimpleDistributedQueueDemo { private static final String PATH = "/example/queue"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = null; SimpleDistributedQueue queue; try { client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name())); client.start(); queue = new SimpleDistributedQueue(client, PATH); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer, "producer").start(); new Thread(consumer, "consumer").start(); Thread.sleep(20000); } catch (Exception ex) { } finally { CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } } public static class Producer implements Runnable { private SimpleDistributedQueue queue; public Producer(SimpleDistributedQueue queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { boolean flag = queue.offer(("zjc-" + i).getBytes()); if (flag) { System.out.println("發送一條消息成功:" + "zjc-" + i); } else { System.out.println("發送一條消息失敗:" + "zjc-" + i); } } catch (Exception e) { e.printStackTrace(); } } } } public static class Consumer implements Runnable { private SimpleDistributedQueue queue; public Consumer(SimpleDistributedQueue queue) { this.queue = queue; } @Override public void run() { try { byte[] datas = queue.take(); System.out.println("消費一條消息成功:" + new String(datas, "UTF-8")); } catch (Exception e) { e.printStackTrace(); } } } }
可是實際上發送了100條消息,消費完第一條以後,後面的消息沒法消費,目前沒找到緣由。查看一下官方文檔推薦的demo使用下面幾個Api:
Creating a SimpleDistributedQueue public SimpleDistributedQueue(CuratorFramework client, String path) Parameters: client - the client path - path to store queue nodes Add to the queue public boolean offer(byte[] data) throws Exception Inserts data into queue. Parameters: data - the data Returns: true if data was successfully added Take from the queue public byte[] take() throws Exception Removes the head of the queue and returns it, blocks until it succeeds. Returns: The former head of the queue NOTE: see the Javadoc for additional methods
可是實際使用發現仍是存在消費阻塞問題。
分佈式Barrier是這樣一個類: 它會阻塞全部節點上的等待進程,直到某一個被知足, 而後全部的節點繼續進行。
好比賽馬比賽中, 等賽馬陸續來到起跑線前。 一聲令下,全部的賽馬都飛奔而出。
DistributedBarrier
類實現了柵欄的功能。 它的構造函數以下:
public DistributedBarrier(CuratorFramework client, String barrierPath) Parameters: client - client barrierPath - path to use as the barrier
首先你須要設置柵欄,它將阻塞在它上面等待的線程:
setBarrier();
而後須要阻塞的線程調用方法等待放行條件:
public void waitOnBarrier()
當條件知足時,移除柵欄,全部等待的線程將繼續執行:
removeBarrier();
異常處理 DistributedBarrier 會監控鏈接狀態,當鏈接斷掉時waitOnBarrier()
方法會拋出異常。
public class DistributedBarrierDemo { private static final int QTY = 5; private static final String PATH = "/examples/barrier"; public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); ExecutorService service = Executors.newFixedThreadPool(QTY); DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH); controlBarrier.setBarrier(); for (int i = 0; i < QTY; ++i) { final DistributedBarrier barrier = new DistributedBarrier(client, PATH); final int index = i; Callable<Void> task = () -> { Thread.sleep((long) (3 * Math.random())); System.out.println("Client #" + index + " waits on Barrier"); barrier.waitOnBarrier(); System.out.println("Client #" + index + " begins"); return null; }; service.submit(task); } Thread.sleep(10000); System.out.println("all Barrier instances should wait the condition"); controlBarrier.removeBarrier(); service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); Thread.sleep(20000); } } }
這個例子建立了controlBarrier
來設置柵欄和移除柵欄。 咱們建立了5個線程,在此Barrier上等待。 最後移除柵欄後全部的線程才繼續執行。
若是你開始不設置柵欄,全部的線程就不會阻塞住。
雙柵欄容許客戶端在計算的開始和結束時同步。當足夠的進程加入到雙柵欄時,進程開始計算, 當計算完成時,離開柵欄。 雙柵欄類是DistributedDoubleBarrier
。 構造函數爲:
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty) Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until all members have entered. When leave() is called, it blocks until all members have left. Parameters: client - the client barrierPath - path to use memberQty - the number of members in the barrier
memberQty
是成員數量,當enter()
方法被調用時,成員被阻塞,直到全部的成員都調用了enter()
。 當leave()
方法被調用時,它也阻塞調用線程,直到全部的成員都調用了leave()
。 就像百米賽跑比賽, 發令槍響, 全部的運動員開始跑,等全部的運動員跑過終點線,比賽才結束。
DistributedDoubleBarrier會監控鏈接狀態,當鏈接斷掉時enter()
和leave()
方法會拋出異常。
public class DistributedDoubleBarrierDemo { private static final int QTY = 5; private static final String PATH = "/examples/barrier"; public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY); final int index = i; Callable<Void> task = () -> { Thread.sleep((long) (3 * Math.random())); System.out.println("Client #" + index + " enters"); barrier.enter(); System.out.println("Client #" + index + " begins"); Thread.sleep((long) (3000 * Math.random())); barrier.leave(); System.out.println("Client #" + index + " left"); return null; }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); Thread.sleep(Integer.MAX_VALUE); } } }