Zookeeper+Curator 分佈式鎖

原本想着基於zk臨時節點,實現一下分佈式鎖,結果發現有curator框架。PS:原聲API真的難用,連遞歸建立path都沒有?spring

配置curator maven的時候,md配置了好幾個小時,最後發現集中定義依賴版本號 我原本都是寫數字的,結果到了zookeeper.version ,我居然寫了 <zookeeper.version>zookeeper-3.4.7</zookeeper.version> 把英文也寫上去了 多是從maven-repository copy過來的 很鬱悶。apache

curator提供的可重入分佈式鎖看起來也沒什麼可封裝的,由於它和ReentrantLock確實很。在須要的地方,new一個,再調用對象的方法就行了。網絡

這個鎖就是 InterProcessMutex 類,其構造方法須要咱們傳入當前CuratorFramework對象,還有要鎖定的節點。對了 這個節點是臨時節點,再客戶端斷開鏈接後,鎖不會一直存在,但也不會當即就失去鎖,由於ZK須要根據缺省的時間判斷你是真的斷開了仍是某種網絡緣由。多線程

首先說明它是一把可重入鎖。注意在當前線程的for循環中,他們都是用的是同一把鎖,同把鎖纔可重入。app

    public void fun() {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml");
        CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework");
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws");
        for (int i = 0; i < 10; i++) {
            try {
                lock.acquire();
                System.out.println("yes");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

其次模擬分佈式環境,在十個線程中各獲取鎖(鎖相同的path),並執行1s的任務,能夠發現,多線程被鎖同步。框架

public void fun() throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml");
        CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework");
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws");
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        lock.acquire();
                        System.out.println("yes");
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                }
            }).start();
        }
        System.in.read();
    }

acquire支持傳遞等待超時時間,返回值是boolean類型。表明超時時間內是否成功獲取到鎖。maven

public void fun() throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml");
        CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework");
        System.out.println(curatorFramework);
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws");
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        if (lock.acquire(1000, TimeUnit.MILLISECONDS)) {
                            System.out.println("yes");
                            Thread.sleep(1000);
                        } else {
                            System.out.println("no");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                }
            }).start();
        }
        System.in.read();
    }

最後附上spring配置:分佈式

    <!-- 重連配置 -->
    <bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
        <constructor-arg index="0" value="1000"/>
        <constructor-arg index="1" value="3"/>
    </bean>

    <bean id="curatorFramework" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient"
          init-method="start">
        <constructor-arg index="0" value="server:port,server:port,server:port"/>
        <constructor-arg index="1" ref="retryPolicy"/>
    </bean>
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息