Kafka學習筆記(4)----Kafka的Leader Election

1. Zookeeper的基本操做

  zookeeper中的節點能夠持久化/有序的兩個維度分爲四種類型:java

  PERSIST:持久化無序(保存在磁盤中)node

  PERSIST_SEQUENTIAL:持久化有序遞增apache

  EPHEMERAL:非持久化的無序的,保存在內存中,當客戶端關閉後消失。api

  EPHEMERAL_SEQUENTIAL:非持久有序遞增,保存在內存中,當客戶端關閉後消失函數

  每一個節點均可以註冊Watch操做,用於監聽節點的變化,有四種事件類型以下:this

  Created event: Enabled with a call to exists編碼

  Deleted event: Enabled with a call to exists, getData, and getChildrenspa

  Changed event: Enabled with a call to exists and getDatacode

  Child event: Enabled with a call to getChildrenxml

  Watch的基本特徵是客戶端先獲得通知,而後才能獲得數據,Watch被fire以後就當即取消了,不會再有Watch後續變化,想要監聽只能從新註冊;

使用原生Zookeeper建立節點和監聽節點變化代碼以下:

  1. 引入依賴,pom.xml

 <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.13</version>
    </dependency>

  2. 客戶端鏈接類

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立連接,並監聽鏈接狀態
        ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("連接客戶端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立節點,/parent:節點路徑, data.xx:數據,Ids:設置權限CreateNode.PERSISTENT:建立節點類型
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        //監聽節點變化
        zooKeeper.exists("/testRoot", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
            }
        });
       System.out.println(parent);
        Thread.sleep(10000000);
    }
}

  運行建立一個持久化的節點。

  查看客戶端能夠看到:

  

  parent節點建立成功。

  刪除parent節點,觀察watche變化。

  控制檯打印:

  

  表示監聽了刪除節點事件,此時再在客戶端手動建立節點,觀察變化

  

  控制檯並無打印任何建立信息,說明沒有監聽到,這就是咱們說的一旦watche被fire以後就會被關閉,此時改造一下代碼:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立連接,並監聽鏈接狀態
        final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("連接客戶端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立節點
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        //監聽節點變化
        zooKeeper.exists("/parent", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
                try {
                   //從新註冊監聽事件
                    zooKeeper.exists("/parent", this);
                } catch (KeeperException e) {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
//        System.out.println(newNode);
        Thread.sleep(10000000);
    }
}

  刪除節點,再手動建立節點:

  

  控制檯打印以下:

  

  這樣建立節點的事件就又被從新註冊並監聽到了。

2. 基於Zookeeper的Leader Election

  1. 搶注Leader節點——非公平模式

  編碼流程:

  1. 建立Leader父節點,如/chroot,並將其設置爲persist節點

  2. 各客戶端經過在/chroot下建立Leader節點,如/chroot/leader,來競爭Leader。該節點應被設置爲ephemeral

  3. 若某建立Leader節點成功,則該客戶端成功競選爲Leader

  4. 若建立Leader節點失敗,則競選Leader失敗,在/chroot/leader節點上註冊exist的watch,一旦該節點被刪除則得到通知

  5. Leader可經過刪除Leader節點來放棄Leader

  6. 若是Leader宕機,因爲Leader節點被設置爲ephemeral,Leader節點會自行刪除。而其它節點因爲在Leader節點上註冊了watch,故可獲得通知,參與下一輪競選,從而保證總有客戶端以Leader角色工做。

  實現代碼以下:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立連接,並監聽鏈接狀態
        final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("連接客戶端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立節點
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

        //監聽節點變化
        zooKeeper.exists("/parent", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
                try {
                    zooKeeper.exists("/parent", this);
                } catch (KeeperException e) {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
//        System.out.println(newNode);
        Thread.sleep(10000000);
    }
}

  當存在節點以後,會拋出異常,這樣就會致使節點建立不成功,因此只有建立成功的node才能成爲leader。使用watcher監聽能夠在節點被刪除或宕機以後來搶佔leader.

  2.  先到先得,後者監視前者——公平模式

  1. 建立Leader父節點,如/chroot,並將其設置爲persist節點

  2. 各客戶端經過在/chroot下建立Leader節點,如/chroot/leader,來競爭Leader。該節點應被設置爲ephemeral_sequential

  3. 客戶端經過getChildren方法獲取/chroot/下全部子節點,若是其註冊的節點的id在全部子節點中最小,則當前客戶端競選Leader成功

  4. 不然,在前面一個節點上註冊watch,一旦前者被刪除,則它獲得通知,返回step 3(並不能直接認爲本身成爲新Leader,由於可能前面的節點只是宕機了)

  5. Leader節點可經過自行刪除本身建立的節點以放棄Leader

  代碼實現以下:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //建立連接,並監聽鏈接狀態
        final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("連接客戶端");
                System.out.println(watchedEvent.getState());
            }
        });
        //建立節點
        String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

        //監聽節點變化
        zooKeeper.exists("/parent", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println("state" + watchedEvent.getState());
                try {
                    zooKeeper.exists("/parent", this);
                } catch (KeeperException e) {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
//        System.out.println(newNode);
        Thread.sleep(10000000);
    }
}

  能夠看到zk中的parent下多出了三個節點:

  

  默認以node+十個十進制數命名節點名稱,數據遞增。

  當id在全部子節點中最小,選舉成爲leader.

3. Leader Election在Curator中的實現

  手下引入Curator依賴,pom.xml以下:

 <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>3.2.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>3.2.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>3.2.1</version>
    </dependency>

  1. Curator LeaderLatch特色及api的做用:

  1. 競選爲Leader後,不可自行放棄領導權

  2. 只能經過close方法放棄領導權

  3. 強烈建議增長ConnectionStateListener,當鏈接SUSPENDED或者LOST時視爲丟失領導權

  4. 可經過await方法等待成功獲取領導權,並可加入timeout

  5. 可經過hasLeadership方法判斷是否爲Leader

  6. 可經過getLeader方法獲取當前Leader

  7. 可經過getParticipants方法獲取當前競選Leader的參與方

  簡單實現:

package com.wangx.kafka.zk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLeaderLatch {
    public static void main(String[] args) throws Exception {
        //設置重試策略,這裏是沉睡一秒後開始重試,重試五次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
        //經過工廠類獲取curatorFramework
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
        //leader節點建立
        LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,"/parent","node");
        //監聽leader節點
        leaderLatch.addListener(new LeaderLatchListener() {
            //當前節點是leader時回調
            public void isLeader() {
                System.out.println("I am a listener");
            }
            //再也不是leader時回調
            public void notLeader() {
                System.out.println("I am not a  listener");
            }
        });
        //啓動
        curatorFramework.start();
        leaderLatch.start();
        Thread.sleep(100000000);
        leaderLatch.close();
        curatorFramework.close();
    }
}

  2. Curator LeaderSelector特色及api的做用:

  1. 競選Leader成功後回調takeLeadership方法

  2. 可在takeLeadership方法中實現業務邏輯

  3. 一旦takeLeadership方法返回,即視爲放棄領導權

  4. 可經過autoRequeue方法循環獲取領導權

  5. 可經過hasLeadership方法判斷是否爲Leader

  6. 可經過getLeader方法獲取當前Leader

  7. 可經過getParticipants方法獲取當前競選Leader的參與方

簡單實現:

package com.wangx.kafka.zk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLeaderSelector {
    public static void main(String[] args) throws Exception {
        //設置重試策略,這裏是沉睡一秒後開始重試,重試五次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
        //經過工廠類獲取curatorFramework
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
        //leader節點建立,監聽Leader狀態,並在takeLeadership回調函數中作本身的業務邏輯
        LeaderSelector leaderSelector = new LeaderSelector(curatorFramework,"/node", new LeaderSelectorListenerAdapter() {
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                Thread.sleep(1000);
                System.out.println("啓動了 takeLeadership");
            }
        });
        leaderSelector.autoRequeue();
        leaderSelector.start();
        //啓動
        curatorFramework.start();
        Thread.sleep(100000000);
        leaderSelector.close();
        curatorFramework.close();
    }
}

  這裏的LeaderSelectorListenerAdapter實現了LeaderSelectorListener接口,源碼以下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.curator.framework.recipes.leader;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;

public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
    public LeaderSelectorListenerAdapter() {
    }
    //當鏈接失敗時,會拋出異常,這樣就會中斷takeLeadership方法,防止業務邏輯錯誤操做
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (client.getConnectionStateErrorPolicy().isErrorState(newState)) {
            throw new CancelLeadershipException();
        }
    }
}

4. Kafka的Leader Election

  1. Kafka「各自爲政」Leader Election

  每一個Partition的多個Replica同時競爭Leader,這樣作的好處是實現起來比較簡單,可是一樣出現的問題的就是Herd Effect(可能會有不少的leader節點),Zookeeper負載太重,Latency較大(可能會產生不少其餘的問題)

  2. Kafka基於Controller的Leader Election

  原理是在整個集羣中選舉出一個Broker做爲Controller,Controller爲全部Topic的全部Partition指定Leader及Follower,Kafka經過在zookeeper上建立/controller臨時節點來實現leader選舉,並在該節點中寫入當前broker的信息 {「version」:1,」brokerid」:1,」timestamp」:」1512018424988」} 

  利用Zookeeper的強一致性特性,一個節點只能被一個客戶端建立成功,建立成功的broker即爲leader,即先到先得原則,leader也就是集羣中的controller,負責集羣中全部大小事務。 當leader和zookeeper失去鏈接時,臨時節點會刪除,而其餘broker會監聽該節點的變化,當節點刪除時,其餘broker會收到事件通知,從新發起leader選舉。

  這樣作極大緩解Herd Effect問題,減輕Zookeeper負載,Controller與Leader及Follower間經過RPC通訊,高效且實時,可是因爲引入Controller增長了複雜度,同時須要考慮Controller的Failover(容錯)

相關文章
相關標籤/搜索