工做中須要寫一個定時任務,因爲是集羣環境,天然而然想到須要經過分佈式鎖來保證單臺執行..相信你們都會想到使用zk來實現對應的分佈式鎖.下面就簡單介紹一下幾種實現java
準備工做
有幾個幫助類,先把代碼放上來apache
ZKClient 對zk的操做作了一個簡單的封裝服務器
Java代碼 session
- package zk.lock;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import zk.util.ZKUtil;
-
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
-
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午8:50
- * 封裝一個zookeeper實例.
- */
- public class ZKClient implements Watcher {
-
- private ZooKeeper zookeeper;
-
- private CountDownLatch connectedSemaphore = new CountDownLatch(1);
-
-
- public ZKClient(String connectString, int sessionTimeout) throws Exception {
- zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
- System.out.println("connecting zk server");
- if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
- System.out.println("connect zk server success");
- } else {
- System.out.println("connect zk server error.");
- throw new Exception("connect zk server error.");
- }
- }
-
- public void close() throws InterruptedException {
- if (zookeeper != null) {
- zookeeper.close();
- }
- }
-
- public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
- CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
- path = ZKUtil.normalize(path);
- if (!this.exists(path)) {
- zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
- }
- }
-
- public boolean exists(String path) throws Exception {
- path = ZKUtil.normalize(path);
- Stat stat = zookeeper.exists(path, null);
- return stat != null;
- }
-
- public String getData(String path) throws Exception {
- path = ZKUtil.normalize(path);
- try {
- byte[] data = zookeeper.getData(path, null, null);
- return new String(data);
- } catch (KeeperException e) {
- if (e instanceof KeeperException.NoNodeException) {
- throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
- } else {
- throw new Exception(e);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new Exception(e);
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (event == null) return;
-
- // 鏈接狀態
- Watcher.Event.KeeperState keeperState = event.getState();
- // 事件類型
- Watcher.Event.EventType eventType = event.getType();
- // 受影響的path
- // String path = event.getPath();
- if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
- // 成功鏈接上ZK服務器
- if (Watcher.Event.EventType.None == eventType) {
- System.out.println("zookeeper connect success");
- connectedSemaphore.countDown();
- }
- }
- //下面能夠作一些重連的工做.
- else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
- System.out.println("zookeeper Disconnected");
- } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
- System.out.println("zookeeper AuthFailed");
- } else if (Watcher.Event.KeeperState.Expired == keeperState) {
- System.out.println("zookeeper Expired");
- }
- }
- }
ZKUtil 針對zk路徑的一個工具類mybatis
Java代碼 mvc
- package zk.util;
-
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午9:56
- */
- public class ZKUtil {
-
- public static final String SEPARATOR = "/";
-
- /**
- * 轉換path爲zk的標準路徑 以/開頭,最後不帶/
- */
- public static String normalize(String path) {
- String temp = path;
-
- if(!path.startsWith(SEPARATOR)) {
- temp = SEPARATOR + path;
- }
- if(path.endsWith(SEPARATOR)) {
- temp = temp.substring(0, temp.length()-1);
- return normalize(temp);
- }else {
- return temp;
- }
- }
-
- /**
- * 連接兩個path,並轉化爲zk的標準路徑
- */
- public static String contact(String path1,String path2){
- if(path2.startsWith(SEPARATOR)) {
- path2 = path2.substring(1);
- }
- if(path1.endsWith(SEPARATOR)) {
- return normalize(path1 + path2);
- } else {
- return normalize(path1 + SEPARATOR + path2);
- }
- }
-
- /**
- * 字符串轉化成byte類型
- */
- public static byte[] toBytes(String data) {
- if(data == null || data.trim().equals("")) return null;
- return data.getBytes();
- }
- }
NetworkUtil 獲取本機IP的工具方法框架
Java代碼 分佈式
- package zk.util;
-
- import java.net.InetAddress;
- import java.net.NetworkInterface;
- import java.util.Enumeration;
-
- /**
- * User: zhenghui
- * Date: 14-4-1
- * Time: 下午4:47
- */
- public class NetworkUtil {
-
- static private final char COLON = ':';
-
- /**
- * 獲取當前機器ip地址
- * 聽說多網卡的時候會有問題.
- */
- public static String getNetworkAddress() {
- Enumeration<NetworkInterface> netInterfaces;
- try {
- netInterfaces = NetworkInterface.getNetworkInterfaces();
- InetAddress ip;
- while (netInterfaces.hasMoreElements()) {
- NetworkInterface ni = netInterfaces
- .nextElement();
- Enumeration<InetAddress> addresses=ni.getInetAddresses();
- while(addresses.hasMoreElements()){
- ip = addresses.nextElement();
- if (!ip.isLoopbackAddress()
- && ip.getHostAddress().indexOf(COLON) == -1) {
- return ip.getHostAddress();
- }
- }
- }
- return "";
- } catch (Exception e) {
- return "";
- }
- }
- }
--------------------------- 正文開始 -----------------------------------ide
這種實現很是簡單,具體的流程以下工具

對應的實現以下
Java代碼
- package zk.lock;
-
-
- import zk.util.NetworkUtil;
- import zk.util.ZKUtil;
-
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午8:37
- * 分佈式鎖實現.
- *
- * 這種實現的原理是,建立某一個任務的節點,好比 /lock/tasckname 而後獲取對應的值,若是是當前的Ip,那麼得到鎖,若是不是,則沒得到
- * .若是該節點不存在,則建立該節點,並把改節點的值設置成當前的IP
- */
- public class DistributedLock01 {
-
- private ZKClient zkClient;
-
-
- public static final String LOCK_ROOT = "/lock";
- private String lockName;
-
-
- public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
- //先建立zk連接.
- this.createConnection(connectString,sessionTimeout);
-
- this.lockName = lockName;
- }
-
- public boolean tryLock(){
- String path = ZKUtil.contact(LOCK_ROOT,lockName);
- String localIp = NetworkUtil.getNetworkAddress();
- try {
- if(zkClient.exists(path)){
- String ownnerIp = zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- return true;
- }
- } else {
- zkClient.createPathIfAbsent(path,false);
- if(zkClient.exists(path)){
- String ownnerIp = zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- return true;
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }
-
-
- /**
- * 建立zk鏈接
- *
- */
- protected void createConnection(String connectString, int sessionTimeout) throws Exception {
- if(zkClient != null){
- releaseConnection();
- }
- zkClient = new ZKClient(connectString,sessionTimeout);
- zkClient.createPathIfAbsent(LOCK_ROOT,true);
- }
- /**
- * 關閉ZK鏈接
- */
- protected void releaseConnection() throws InterruptedException {
- if (zkClient != null) {
- zkClient.close();
- }
- }
-
- }
總結
網上有不少文章,你們的方法大多數都是建立一個root根節點,每個trylock的客戶端都會在root下建立一個 EPHEMERAL_SEQUENTIAL 的子節點,同時設置root的child 變動watcher(爲了不羊羣效應,能夠只添加前一個節點的變動通知) .若是建立的節點的序號是最小,則獲取到鎖,不然繼續等待root的child 變動