ZooKeeper客戶端Curator(監聽篇)

maven依賴

<dependency>java

            <groupId>org.apache.curator</groupId>node

            <artifactId>curator-recipes</artifactId>web

            <version>2.8.0</version>   apache

        </dependency>緩存

Path Cache

Path Cache用來監控一個ZNode的子節點. 當一個子節點增長, 更新,刪除時, Path Cache會改變它的狀態, 會包含最新的子節點, 子節點的數據和狀態。(此處須要注意,他只會監聽一級子節點,不會循環監聽子節點下面的child)
session

實際使用時會涉及到四個類:maven

  • PathChildrenCacheide

  • PathChildrenCacheEvent函數

  • PathChildrenCacheListenerui

  • ChildData

建立Path Cache

經過下面的構造函數建立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)

想使用cache,必須調用它的start方法,不用之後調用close方法。
start有兩個, 其中一個能夠傳入StartMode,用來爲初始的cache設置暖場方式(warm):

我的建議使用此構造方法,ExecutorService使用本身構建的線程池,保證線程可控

添加監控節點狀態

添加監控節點狀態的改變的Listener時,建議使用此方法,手動傳入咱們構建的線程池,保證線程可控。

childrenCache.getListenable().(T listener, Executor executor);

爲何要本身傳入線程池呢?看下圖和源碼

 public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
    {
        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
    }
 public void addListener(T listener)
    {
        addListener(listener, MoreExecutors.sameThreadExecutor());
    }

由於在zk丟失連接的時候,默認不傳線程池的方法,在每次重連時都會會新new 出來一個線程池,線程一直處於活躍狀態,若是zk服務端長時間未能恢復,會致使客戶端現成打滿

設置/更新、移除

設置/更新、移除實際上是使用client (CuratorFramework)來操做, 不經過PathChildrenCache操做:

client.setData().forPath(path, bytes);
client.create().creatingParentsIfNeeded().forPath(path, bytes);
client.delete().forPath(path);

而查詢緩存使用下面的方法:

for (ChildData data : cache.getCurrentData()) {	
    System.out.println(data.getPath() + " = " + new String(data.getData()));
}

最後附上一段完整的代碼

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * ZKClient
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:21
 */
public class ChildPathZkClient {
	// ip和端口url
	private String url;
	// 須要監聽的base path
	private String basePath;

	private static CuratorFramework client = null;
	private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 5l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());

	public void init() throws Throwable {
		if (basePath == null) {
			basePath = "o2o/zk/cache";
		}
		client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 0)).build();
		client.start();
		/**
		 * 監聽子節點的變化狀況
		 */
		watchChild("/");
	}

	protected static void watchChild(String path) throws Exception {
		PathChildrenCache childrenCache = new PathChildrenCache(client, path, true, false, executor);
		ZkPathListener listener = new ZkPathListener();
		listener.setPathChildrenCache(childrenCache);
		childrenCache.getListenable().addListener(listener, executor);
		childrenCache.start();
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getBasePath() {
		return basePath;
	}

	public void setBasePath(String basePath) {
		this.basePath = basePath;
	}

	public static void main(String[] args) throws Throwable {
		CountDownLatch latch = new CountDownLatch(1);
		client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000)
				.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		client.start();

		/**
		 * 監聽子節點的變化狀況
		 */
		watchChild("/");
		latch.await();
	}

}
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;

import com.jd.o2o.web.product.constants.zk.ZkSwitchEnum;
import com.jd.o2o.web.product.constants.zk.ZkValueEnum;
import com.jd.o2o.zk.cache.serializer.ZooKeeperSerializer;
import com.jd.o2o.zk.cache.serializer.impl.JdkSerializationZooKeeperSerializer;

/**
 * ZK監聽器
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:38
 */
public class ZkPathListener implements PathChildrenCacheListener {
	private final Logger logger = LogManager.getLogger(ZkPathListener .class);
	private final ZooKeeperSerializer<?> defaultSerializer = new JdkSerializationZooKeeperSerializer();
	private PathChildrenCache pathChildrenCache;

	@Override
	public void childEvent(CuratorFramework paramCuratorFramework, PathChildrenCacheEvent event) throws Exception {
		switch (event.getType()) {
		case CHILD_ADDED:
			// TODO
			System.out.println(defaultSerializer.deserialize(event.getData().getData()));
			break;
		case CHILD_UPDATED:
			// TODO
			System.out.println(defaultSerializer.deserialize(event.getData().getData()));
			break;
		case CHILD_REMOVED:
			// TODO
			System.out.println(defaultSerializer.deserialize(event.getData().getData()));
			break;
		default:
			break;
		}
	}
}



Node Cache

Node Cache用來監控一個ZNode. 當節點的數據修改或者刪除時,Node Cache能更新它的狀態包含最新的改變。

涉及到下面的三個類:

  • NodeCache

  • NodeCacheListener

  • ChildData

構建NodeCache

 public NodeCache(CuratorFramework client, String path)

想使用cache,依然要調用它的start方法,不用以後調用close方法。

添加監控節點狀態

添加監控節點狀態的改變的Listener時,建議使用此方法,手動傳入咱們構建的線程池,保證線程可控。

NodeCache.getListenable().(T listener, Executor executor);

一樣附一段完整的代碼

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * ZK連接
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:21
 */
public class ZkNodeClient {
	// ip和端口url
	private String url;
	// 須要監聽的base path
	private String basePath;
        private static NodeCache nodeCache;
	private static CuratorFramework client = null;
	private final static ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();

	public void init() throws Throwable {
		if (basePath == null) {
			basePath = "o2o/zk/cache";
		}
		client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 0)).build();
		client.start();
		nodeCache = new NodeCache(client, "/");
		/**
		 * 監聽子節點的變化狀況
		 */
		watchChild();
		
	}

	protected static void watchChild() throws Exception {
		
		nodeCache.getListenable().addListener(new NodeCacheListener() {

			@Override
			public void nodeChanged() throws Exception {
				System.out.println(nodeCache.getCurrentData().getPath() + ":" + nodeCache.getCurrentData().getData());
			}
		}, EXECUTOR_SERVICE);
		nodeCache.start();
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getBasePath() {
		return basePath;
	}

	public void setBasePath(String basePath) {
		this.basePath = basePath;
	}

	public static void main(String[] args) throws Throwable {
		CountDownLatch latch = new CountDownLatch(1);
		client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000)
				.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		client.start();

		/**
		 * 監聽子節點的變化狀況
		 */
		watchChild("/");
		latch.await();
	}

}



Tree Node

這種類型的便可以監控節點的狀態,還監控節點的子節點的狀態, 相似上面兩種cache的組合。 這也就是Tree的概念。 它監控整個樹中節點的狀態。(只要是所監聽的路徑下的全部葉子節點都會監聽)

涉及到下面四個類。

  • TreeCache

  • TreeCacheListener

  • TreeCacheEvent

  • ChildData

構造TreeCache

而關鍵的TreeCache的構造函數爲

public TreeCache(CuratorFramework client, String path)

添加監控節點狀態

添加監控節點狀態的改變的Listener時,建議使用此方法,手動傳入咱們構建的線程池,保證線程可控。

TreeCache.getListenable().(T listener, Executor executor);

想使用cache,依然要調用它的start方法,不用以後調用close方法。

getCurrentChildren()返回cache的狀態,類型爲Map。 而getCurrentData()返回監控的path的數據。

最後一樣附一段完整代碼

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * ZK連接
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:21
 */
public class TreeClient {
	// ip和端口url
	private String url;
	// 須要監聽的base path
	private String basePath;

	private static CuratorFramework client = null;
	private static TreeCache cache = null;
	private static ZkTreeListener listener = new ZkTreeListener();
	private static ExecutorService executorService = Executors.newSingleThreadExecutor();

	public void init() throws Throwable {
		if (basePath == null) {
			basePath = "o2o/zk/cache";
		}
		// 修改重連次數,使用最初的線程進行重連監聽,不從新新建線程 ExponentialBackoffRetry(1000, 0)
		client = CuratorFrameworkFactory.builder().namespace(basePath).connectString(url).sessionTimeoutMs(5000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
		client.start();
		/**
		 * 監聽子節點的變化狀況
		 */
		watchChild("/product");
		watchChild("/switch");

	}

	protected static void watchChild(String path) throws Exception {
		// 改用TreeCacheListener,免除循環監聽子節點的問題
		cache = new TreeCache(client, path);
		cache.getListenable().addListener(listener, executorService);
		cache.start();
	}

	public String getUrl() {
		return url;
	}

	public void setUrl(String url) {
		this.url = url;
	}

	public String getBasePath() {
		return basePath;
	}

	public void setBasePath(String basePath) {
		this.basePath = basePath;
	}

	public static void main(String[] args) throws Throwable {
		CountDownLatch latch = new CountDownLatch(1);
		client = CuratorFrameworkFactory.builder().namespace("o2o/zk/cache").connectString("192.168.200.98:2181").sessionTimeoutMs(5000).connectionTimeoutMs(3000)
				.retryPolicy(new ExponentialBackoffRetry(1000, 0)).build();
		client.start();

		/**
		 * 監聽子節點的變化狀況
		 */
		watchChild("/product");
                watchChild("/switch");
		latch.await();
	}

}
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;

import com.jd.o2o.web.product.constants.zk.ZkSwitchEnum;
import com.jd.o2o.web.product.constants.zk.ZkValueEnum;
import com.jd.o2o.zk.cache.serializer.ZooKeeperSerializer;
import com.jd.o2o.zk.cache.serializer.impl.JdkSerializationZooKeeperSerializer;

/**
 * TreeCache ZK監聽器
 * 
 * @author jiangzhixiong
 * @email xingxuan_jzx@foxmail.com
 * @date 2015年10月12日 下午5:18:38
 */
public class ZkTreeListener implements TreeCacheListener {
	private final Logger logger = LogManager.getLogger(TreeCacheListener .class);
	private final ZooKeeperSerializer<?> defaultSerializer = new JdkSerializationZooKeeperSerializer();

	@Override
	public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
		System.out.println(event.getData().getPath());
		switch (event.getType()) {
		case NODE_ADDED:
			if (event.getData().getData() == null) {
				break;
			}
			//TODO
			break;
		case NODE_UPDATED:
			if (event.getData().getData() == null) {
				break;
			}
			//TODO
			break;
		default:
			break;
		}
	}
}
相關文章
相關標籤/搜索