使用Zookeeper 實現選主從或者分佈式鎖

概述


1.zookeeper實現選主從的原理java

2.zookeeper實現選主從代碼node

選主從的原理

在分佈式場景中常常會用到zookeeper,經常使用的有利用zookeeper來選舉主從,管理節點狀態,或者使用zookeeper來實現分佈式鎖;具體原理是什麼呢?apache

這裏只將實現方式的一種,根據編號大小來實現:(其餘方式有經過建立節點實現的,等等)分佈式

全部的節點向zk的某個路徑下注冊,建立臨時節點(臨時節點,zookeeper會主動監控,一旦鏈接失效,zk會刪除該臨時節點),每一個註冊者建立時會有一個編號,每次選舉編號最小的爲主節點,其餘節點就爲從節點,從節點會監控主節點是否失效(怎麼監控? zk有事件,監聽事件的狀態變化,而後從新選舉),爲避免「驚羣」現象,每一個節點只監控比它小的一個臨近節點。ide

代碼實現

選主從代碼:測試

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Created by  on 17/11/23.
 */
public class ChooseMaster implements Watcher {


    private ZooKeeper zk=null;
    private String selfPath=null;
    private String waitPath=null;
    private static final String ZK_ROOT_PATH="/zkmaster";  //選主從的跟路徑
    private static final String ZK_SUB_PATH=ZK_ROOT_PATH+"/register";
    private CountDownLatch successCountDownLatch=new CountDownLatch(1);
    private CountDownLatch threadCompleteLatch=null;

    public ChooseMaster(CountDownLatch countDownLatch){

        this.threadCompleteLatch=countDownLatch;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {  //監聽事件

        Event.KeeperState keeperState=watchedEvent.getState(); 
        Event.EventType eventType=watchedEvent.getType();
        if(Event.KeeperState.SyncConnected==keeperState){  //創建鏈接

            if(Event.EventType.None==eventType){

                System.out.println(Thread.currentThread().getName()+" connected to server");
                successCountDownLatch.countDown();
            }else if(Event.EventType.NodeDeleted==eventType && watchedEvent.getPath().equals(waitPath)){ //監測到節點刪除,且爲當前線程的等待節點

                System.out.println(Thread.currentThread().getName() + " some node was deleted,I'll check if I am the minimum node");
                try{

                    if(checkMinPath()){  //判斷本身是否是最小的編號

                        processMasterEvent();  //處理主節點作的事情
                    }
                }catch (Exception e){

                    e.printStackTrace();
                }

            }

        }else if(Event.KeeperState.Disconnected==keeperState){  //鏈接斷開

            System.out.println(Thread.currentThread().getName()+ " release connection");
        }else if(Event.KeeperState.Expired==keeperState){  //超時

            System.out.println(Thread.currentThread().getName()+ " connection expire");
        }
    }


    public void chooseMaster() throws Exception {

        selfPath=zk.create(ZK_SUB_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  //建立臨時節點
        System.out.println(Thread.currentThread().getName()+ "create path "+selfPath);
        if(checkMinPath()){  //判斷是否爲主節點

            processMasterEvent();  
        }
    }

    public boolean createPersistPath(String path,String data,boolean needWatch) throws KeeperException, InterruptedException {

        if(zk.exists(path,needWatch)==null){

            zk.create(path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            System.out.println(Thread.currentThread().getName()+" create persist path "+path);
        }
        return true;


    }

    public void createConnection(String connection,int timeout) throws IOException, InterruptedException {

        zk=new ZooKeeper(connection,timeout,this);
        successCountDownLatch.await();

    }

    private void processMasterEvent() throws KeeperException, InterruptedException {

        if(zk.exists(selfPath,false)==null){

            System.out.println(Thread.currentThread().getName()+ " selfnode is not exist "+ selfPath);
            return;
        }
        System.out.println(Thread.currentThread().getName()+ " I'm the master,now do work");
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName()+" Finish do work,leave master");
        //zk.delete(selfPath,-1);
        releaseConnection();
        threadCompleteLatch.countDown();

    }

    private void releaseConnection() {

        if(zk!=null){

            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private boolean checkMinPath() throws Exception {

//獲取根節點下的全部子節點,進行排序,取當前路徑的index,若是排在第一個,則爲主,不然檢測前一個節點是否存在,不存在則從新選舉最小的節點 List
<String> subNodes=zk.getChildren(ZK_ROOT_PATH,false); System.out.println(subNodes.toString()); Collections.sort(subNodes); System.out.println(Thread.currentThread().getName()+" tmp node index is "+selfPath.substring(ZK_ROOT_PATH.length()+1)); int index=subNodes.indexOf(selfPath.substring(ZK_ROOT_PATH.length()+1)); switch (index){ case -1: System.out.println(Thread.currentThread().getName()+" create node is not exist"); return false; case 0: System.out.println(Thread.currentThread().getName()+" I'm the master"); return true; default: waitPath=ZK_ROOT_PATH+"/"+subNodes.get(index-1); System.out.println(Thread.currentThread().getName()+" the node before me is "+waitPath); try{ zk.getData(waitPath,true,new Stat()); return false; }catch (Exception e){ if(zk.exists(waitPath,false)==null){ System.out.println(Thread.currentThread().getName()+" the node before me is not exist,now is me"); return checkMinPath(); }else{ throw e; } } } } }

測試代碼:this

import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.concurrent.*;

/**
 * Created by  on 17/11/23.
 */
public class MasterChoiceTest {

    private final static String ZK_CONNECT_STRING="127.0.0.1:2181";
    private final static String ZK_ROOT_PATH="/zkmaster";
    private final static int SESSION_TIMEOUT=10000;
    private static final int THREAD_NUM=5;
    private static int threadNo=0;
    private static ExecutorService executorService=null;
    private static CountDownLatch threadCompleteLatch=new CountDownLatch(THREAD_NUM);

    public static void main(String[] args){

        executorService= Executors.newFixedThreadPool(THREAD_NUM, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {

                String name=String.format("The %s thread",++threadNo);
                Thread ret=new Thread(Thread.currentThread().getThreadGroup(),r,name,0);
                ret.setDaemon(false);
                return ret;
            }
        });
        if(executorService!=null){

            startProcess();
        }
    }

    private static void startProcess() {

        Runnable masterChoiceTest=new Runnable() {
            @Override
            public void run() {

                String threadName=Thread.currentThread().getName();
                ChooseMaster chooseMaster=new ChooseMaster(threadCompleteLatch);
                try {
                    chooseMaster.createConnection(ZK_CONNECT_STRING,SESSION_TIMEOUT);
                    System.out.println(Thread.currentThread().getName()+" connected to server");
                    synchronized (MasterChoiceTest.class){

                        chooseMaster.createPersistPath(ZK_ROOT_PATH,"thread "+threadName,true);
                    }
                    chooseMaster.chooseMaster();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        for(int i=0;i<THREAD_NUM;i++){

            executorService.execute(masterChoiceTest);
        }
        executorService.shutdown();
        try {
            threadCompleteLatch.await();
            System.out.println("All thread finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

輸出結果:spa

The 1 thread-EventThread connected to server
The 5 thread-EventThread connected to server
The 3 thread-EventThread connected to server
The 2 thread-EventThread connected to server
The 5 thread connected to server
The 3 thread connected to server
The 2 thread connected to server
The 1 thread connected to server
The 4 thread-EventThread connected to server
The 4 thread connected to server
The 5 thread create persist path /zkmaster
The 5 threadcreate path /zkmaster/register0000000000
[register0000000000]
The 5 thread tmp node index is register0000000000
The 5 thread I'm the master
The 1 threadcreate path /zkmaster/register0000000001
The 2 threadcreate path /zkmaster/register0000000002
The 5 thread I'm the master,now do work
The 3 threadcreate path /zkmaster/register0000000003
[register0000000000, register0000000002, register0000000001]
The 1 thread tmp node index is register0000000001
The 1 thread the node before me is /zkmaster/register0000000000
[register0000000000, register0000000002, register0000000001, register0000000003]
[register0000000000, register0000000002, register0000000001, register0000000003]
The 3 thread tmp node index is register0000000003
The 2 thread tmp node index is register0000000002
The 3 thread the node before me is /zkmaster/register0000000002
The 2 thread the node before me is /zkmaster/register0000000001
The 4 threadcreate path /zkmaster/register0000000004
[register0000000000, register0000000002, register0000000001, register0000000004, register0000000003]
The 4 thread tmp node index is register0000000004
The 4 thread the node before me is /zkmaster/register0000000003
The 5 thread Finish do work,leave master
The 1 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000002, register0000000001, register0000000004, register0000000003]
The 1 thread-EventThread tmp node index is register0000000001
The 1 thread-EventThread I'm the master
The 1 thread-EventThread I'm the master,now do work
The 1 thread-EventThread Finish do work,leave master
The 2 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000002, register0000000004, register0000000003]
The 2 thread-EventThread tmp node index is register0000000002
The 2 thread-EventThread I'm the master
The 2 thread-EventThread I'm the master,now do work
The 2 thread-EventThread Finish do work,leave master
The 3 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000004, register0000000003]
The 3 thread-EventThread tmp node index is register0000000003
The 3 thread-EventThread I'm the master
The 3 thread-EventThread I'm the master,now do work
The 3 thread-EventThread Finish do work,leave master
The 4 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000004]
The 4 thread-EventThread tmp node index is register0000000004
The 4 thread-EventThread I'm the master
The 4 thread-EventThread I'm the master,now do work
The 4 thread-EventThread Finish do work,leave master
All thread finished線程

 

 

分佈式鎖的原理也是同樣,每次編號最小的獲取鎖。code

相關文章
相關標籤/搜索