<dependency>java
<groupId>org.apache.curator</groupId>node
<artifactId>curator-recipes</artifactId>web
<version>2.8.0</version> apache
</dependency>緩存
Path Cache用來監控一個ZNode的子節點. 當一個子節點增長, 更新,刪除時, Path Cache會改變它的狀態, 會包含最新的子節點, 子節點的數據和狀態。(此處須要注意,他只會監聽一級子節點,不會循環監聽子節點下面的child)
session
實際使用時會涉及到四個類:maven
PathChildrenCacheide
PathChildrenCacheEvent函數
PathChildrenCacheListenerui
ChildData
經過下面的構造函數建立Path Cache:
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
想使用cache,必須調用它的start
方法,不用之後調用close
方法。
start有兩個, 其中一個能夠傳入StartMode,用來爲初始的cache設置暖場方式(warm):
我的建議使用此構造方法,ExecutorService使用本身構建的線程池,保證線程可控
添加監控節點狀態的改變的Listener時,建議使用此方法,手動傳入咱們構建的線程池,保證線程可控。
childrenCache.getListenable().(T listener, Executor executor);
爲何要本身傳入線程池呢?看下圖和源碼
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) { this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); }
public void addListener(T listener) { addListener(listener, MoreExecutors.sameThreadExecutor()); }
由於在zk丟失連接的時候,默認不傳線程池的方法,在每次重連時都會會新new 出來一個線程池,線程一直處於活躍狀態,若是zk服務端長時間未能恢復,會致使客戶端現成打滿
設置/更新、移除實際上是使用client (CuratorFramework)來操做, 不經過PathChildrenCache操做:
client.setData().forPath(path, bytes); client.create().creatingParentsIfNeeded().forPath(path, bytes); client.delete().forPath(path);
而查詢緩存使用下面的方法:
for (ChildData data : cache.getCurrentData()) { System.out.println(data.getPath() + " = " + new String(data.getData())); }
最後附上一段完整的代碼
import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.retry.ExponentialBackoffRetry; /** * ZKClient * * @author jiangzhixiong * @email xingxuan_jzx@foxmail.com * @date 2015年10月12日 下午5:18:21 */ public class ChildPathZkClient { // ip和端口url private String url; // 須要監聽的base path private String basePath; private static CuratorFramework client = null; private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 5l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy()); public void init() throws Throwable { if (basePath == null) { basePath = "o2o/zk/cache"; } client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 0)).build(); client.start(); /** * 監聽子節點的變化狀況 */ watchChild("/"); } protected static void watchChild(String path) throws Exception { PathChildrenCache childrenCache = new PathChildrenCache(client, path, true, false, executor); ZkPathListener listener = new ZkPathListener(); listener.setPathChildrenCache(childrenCache); childrenCache.getListenable().addListener(listener, executor); childrenCache.start(); } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getBasePath() { return basePath; } public void setBasePath(String basePath) { this.basePath = basePath; } public static void main(String[] args) throws Throwable { CountDownLatch latch = new CountDownLatch(1); client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000) .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); /** * 監聽子節點的變化狀況 */ watchChild("/"); latch.await(); } }
import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import com.jd.o2o.web.product.constants.zk.ZkSwitchEnum; import com.jd.o2o.web.product.constants.zk.ZkValueEnum; import com.jd.o2o.zk.cache.serializer.ZooKeeperSerializer; import com.jd.o2o.zk.cache.serializer.impl.JdkSerializationZooKeeperSerializer; /** * ZK監聽器 * * @author jiangzhixiong * @email xingxuan_jzx@foxmail.com * @date 2015年10月12日 下午5:18:38 */ public class ZkPathListener implements PathChildrenCacheListener { private final Logger logger = LogManager.getLogger(ZkPathListener .class); private final ZooKeeperSerializer<?> defaultSerializer = new JdkSerializationZooKeeperSerializer(); private PathChildrenCache pathChildrenCache; @Override public void childEvent(CuratorFramework paramCuratorFramework, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: // TODO System.out.println(defaultSerializer.deserialize(event.getData().getData())); break; case CHILD_UPDATED: // TODO System.out.println(defaultSerializer.deserialize(event.getData().getData())); break; case CHILD_REMOVED: // TODO System.out.println(defaultSerializer.deserialize(event.getData().getData())); break; default: break; } } }
Node Cache用來監控一個ZNode. 當節點的數據修改或者刪除時,Node Cache能更新它的狀態包含最新的改變。
涉及到下面的三個類:
NodeCache
NodeCacheListener
ChildData
public NodeCache(CuratorFramework client, String path)
想使用cache,依然要調用它的start
方法,不用以後調用close
方法。
添加監控節點狀態的改變的Listener時,建議使用此方法,手動傳入咱們構建的線程池,保證線程可控。
NodeCache.getListenable().(T listener, Executor executor);
一樣附一段完整的代碼
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * ZK連接 * * @author jiangzhixiong * @email xingxuan_jzx@foxmail.com * @date 2015年10月12日 下午5:18:21 */ public class ZkNodeClient { // ip和端口url private String url; // 須要監聽的base path private String basePath; private static NodeCache nodeCache; private static CuratorFramework client = null; private final static ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(); public void init() throws Throwable { if (basePath == null) { basePath = "o2o/zk/cache"; } client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 0)).build(); client.start(); nodeCache = new NodeCache(client, "/"); /** * 監聽子節點的變化狀況 */ watchChild(); } protected static void watchChild() throws Exception { nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println(nodeCache.getCurrentData().getPath() + ":" + nodeCache.getCurrentData().getData()); } }, EXECUTOR_SERVICE); nodeCache.start(); } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getBasePath() { return basePath; } public void setBasePath(String basePath) { this.basePath = basePath; } public static void main(String[] args) throws Throwable { CountDownLatch latch = new CountDownLatch(1); client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000) .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); /** * 監聽子節點的變化狀況 */ watchChild("/"); latch.await(); } }
這種類型的便可以監控節點的狀態,還監控節點的子節點的狀態, 相似上面兩種cache的組合。 這也就是Tree的概念。 它監控整個樹中節點的狀態。(只要是所監聽的路徑下的全部葉子節點都會監聽)
涉及到下面四個類。
TreeCache
TreeCacheListener
TreeCacheEvent
ChildData
而關鍵的TreeCache的構造函數爲
public TreeCache(CuratorFramework client, String path)
添加監控節點狀態的改變的Listener時,建議使用此方法,手動傳入咱們構建的線程池,保證線程可控。
TreeCache.getListenable().(T listener, Executor executor);
想使用cache,依然要調用它的start
方法,不用以後調用close
方法。
getCurrentChildren()
返回cache的狀態,類型爲Map。 而getCurrentData()
返回監控的path的數據。
最後一樣附一段完整代碼
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.retry.ExponentialBackoffRetry; /** * ZK連接 * * @author jiangzhixiong * @email xingxuan_jzx@foxmail.com * @date 2015年10月12日 下午5:18:21 */ public class TreeClient { // ip和端口url private String url; // 須要監聽的base path private String basePath; private static CuratorFramework client = null; private static TreeCache cache = null; private static ZkTreeListener listener = new ZkTreeListener(); private static ExecutorService executorService = Executors.newSingleThreadExecutor(); public void init() throws Throwable { if (basePath == null) { basePath = "o2o/zk/cache"; } // 修改重連次數,使用最初的線程進行重連監聽,不從新新建線程 ExponentialBackoffRetry(1000, 0) client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); client.start(); /** * 監聽子節點的變化狀況 */ watchChild("/product"); watchChild("/switch"); } protected static void watchChild(String path) throws Exception { // 改用TreeCacheListener,免除循環監聽子節點的問題 cache = new TreeCache(client, path); cache.getListenable().addListener(listener, executorService); cache.start(); } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getBasePath() { return basePath; } public void setBasePath(String basePath) { this.basePath = basePath; } public static void main(String[] args) throws Throwable { CountDownLatch latch = new CountDownLatch(1); client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000) .retryPolicy(new ExponentialBackoffRetry(1000, 0)).build(); client.start(); /** * 監聽子節點的變化狀況 */ watchChild("/product"); watchChild("/switch"); latch.await(); } }
import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import com.jd.o2o.web.product.constants.zk.ZkSwitchEnum; import com.jd.o2o.web.product.constants.zk.ZkValueEnum; import com.jd.o2o.zk.cache.serializer.ZooKeeperSerializer; import com.jd.o2o.zk.cache.serializer.impl.JdkSerializationZooKeeperSerializer; /** * TreeCache ZK監聽器 * * @author jiangzhixiong * @email xingxuan_jzx@foxmail.com * @date 2015年10月12日 下午5:18:38 */ public class ZkTreeListener implements TreeCacheListener { private final Logger logger = LogManager.getLogger(TreeCacheListener .class); private final ZooKeeperSerializer<?> defaultSerializer = new JdkSerializationZooKeeperSerializer(); @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println(event.getData().getPath()); switch (event.getType()) { case NODE_ADDED: if (event.getData().getData() == null) { break; } //TODO break; case NODE_UPDATED: if (event.getData().getData() == null) { break; } //TODO break; default: break; } } }