ZooKeeper 分佈式鎖

ZooKeeper   分佈式鎖

  目前分佈式鎖,比較成熟、主流的方案有基於redis及基於zookeeper的二種方案。  基於zookeeper的分佈式鎖java

  大致來說,基於redis的分佈式鎖核心指令爲SETNX,即若是目標key存在,寫入緩存失敗返回0,反之若是目標key不存在,寫入緩存成功返回1,經過區分這二個不一樣的返回值,能夠認爲SETNX成功即爲得到了鎖。redis

  redis分佈式鎖,看上去很簡單,但其實要考慮周全,並不容易,網上有一篇文章討論得很詳細:http://blog.csdn.net/ugg/article/details/41894947/,有興趣的能夠閱讀一下。apache

  其主要問題在於某些異常狀況下,鎖的釋放會有問題,好比SETNX成功,應用得到鎖,這時出於某種緣由,好比網絡中斷,或程序出異常退出,會致使鎖沒法及時釋放,只能依賴於緩存的過時時間,可是過時時間這個值設置多大,也是一個糾結的問題,設置小了,應用處理邏輯很複雜的話,可能會致使鎖提早釋放,若是設置大了,又會致使鎖不能及時釋放,因此那篇文章中針對這些細節討論了不少。json

  而基於zk的分佈式鎖,在鎖的釋放問題上處理起來要容易一些,其大致思路是利用zk的「臨時順序」節點,須要獲取鎖時,在某個約定節點下注冊一個臨時順序節點,而後將全部臨時節點按小從到大排序,若是本身註冊的臨時節點正好是最小的,表示得到了鎖。(zk能保證臨時節點序號始終遞增,因此若是後面有其它應用也註冊了臨時節點,序號確定比獲取鎖的應用更大)緩存

  當應用處理完成,或者處理過程當中出現某種緣由,致使與zk斷開,超過期間閾值(可配置)後,zk server端會自動刪除該臨時節點,即:鎖被釋放。全部參與鎖競爭的應用,只要監聽父路徑的子節點變化便可,有變化時(即:有應用斷開或註冊時),開始搶鎖,搶完了你們都在一邊等着,直到有新變化時,開始新一輪搶鎖。網絡

  關於zk的分佈式鎖,網上也有一篇文章寫得不錯,見http://blog.csdn.net/desilting/article/details/41280869併發

 

我的感受:zk作分佈式鎖機制更完善,但zk抗併發的能力弱於redis,性能上略差,建議若是併發要求高,鎖競爭激烈,可考慮用redis,若是搶鎖的頻度不高,用zk更適合。分佈式

 

最後送福利時間到:性能

  文中提到的基於zk分佈式鎖的那篇文章,邏輯上雖然沒有問題,可是有些場景下,鎖的數量限制可能要求不止1個,好比:某些應用,我但願同時啓動2個實例來處理,可是出於HA的考慮,又擔憂這二個實例會掛掉,這時能夠啓動4個(或者更多),這些實例中,只容許2個搶到鎖的實例能夠進行業務處理,其它實例處於standby狀態(即:備胎),若是這二個搶到鎖的實例掛了(好比異常退出),那麼standby的實例會獲得鎖,即:備胎轉正,開始正常業務處理,從而保證了系統的HA。this

對於這些場景,我封裝了一個抽象類,你們可在此基礎上自行修改:(主要看明白思路就行,代碼細節並不重要)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

package cn.cnblogs.yjmyzz.zookeeper;

 

import org.I0Itec.zkclient.ZkClient;

import org.apache.commons.collections4.CollectionUtils;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import java.util.Collections;

import java.util.Date;

import java.util.List;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

 

/**

 * Created by yangjunming on 5/27/16.

 * 基於Zookeeper的分佈式鎖

 */

public abstract class AbstractLock {

 

    private int lockNumber = 1//容許獲取的鎖數量(默認爲1,即最小節點=自身時,認爲得到鎖)

    private ZkClient zk = null;

    private String rootNode = "/lock"//根節點名稱

    private String selfNode;

    private final String className = this.getClass().getSimpleName(); //當前實例的className

    private String selfNodeName;//自身註冊的臨時節點名

    private boolean handling = false;

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final JsonUtil jsonUtil = new FastJsonUtil();

    private static final String SPLIT = "/";

    private String selfNodeFullName;

 

 

    /**

     * 經過Zk獲取分佈式鎖

     */

    protected void getLock(int lockNumber) {

        setLockNumber(lockNumber);

        initBean();

        initNode();

        subscribe();

        register();

        heartBeat();

        remainRunning();

    }

 

 

    protected void getLock() {

        getLock(1);

    }

 

 

    /**

     * 初始化結點

     */

    private void initNode() {

 

        String error;

        if (!rootNode.startsWith(SPLIT)) {

            error = "rootNode必須以" + SPLIT + "開頭";

            logger.error(error);

            throw new RuntimeException(error);

        }

 

        if (rootNode.endsWith(SPLIT)) {

            error = "不能以" + SPLIT + "結尾";

            logger.error(error);

            throw new RuntimeException(error);

        }

 

        int start = 1;

        int index = rootNode.indexOf(SPLIT, start);

        String path;

        while (index != -1) {

            path = rootNode.substring(0, index);

            if (!zk.exists(path)) {

                zk.createPersistent(path);

            }

            start = index + 1;

            if (start >= rootNode.length()) {

                break;

            }

            index = rootNode.indexOf(SPLIT, start);

        }

 

        if (start < rootNode.length()) {

            if (!zk.exists(rootNode)) {

                zk.createPersistent(rootNode);

            }

        }

 

        selfNode = rootNode + SPLIT + className;

 

        if (!zk.exists(selfNode)) {

            zk.createPersistent(selfNode);

        }

    }

 

    /**

     * 向zk註冊自身節點

     */

    private void register() {

        selfNodeName = zk.createEphemeralSequential(selfNode + SPLIT, StringUtils.EMPTY);

        if (!StringUtils.isEmpty(selfNodeName)) {

            selfNodeFullName = selfNodeName;

            logger.info("自身節點:" + selfNodeName + ",註冊成功!");

            selfNodeName = selfNodeName.substring(selfNode.length() + 1);

        }

        checkMin();

    }

 

    /**

     * 訂閱zk的節點變化

     */

    private void subscribe() {

        zk.subscribeChildChanges(selfNode, (parentPath, currentChilds) -> {

            checkMin();

        });

    }

 

    /**

     * 檢測是否得到鎖

     */

    private void checkMin() {

        List<String> list = zk.getChildren(selfNode);

        if (CollectionUtils.isEmpty(list)) {

            logger.error(selfNode + " 無任何子節點!");

            lockFail();

            handling = false;

            return;

        }

        //按序號從小到大排

        Collections.sort(list);

 

        //若是自身ID在前N個鎖中,則認爲獲取成功

        int max = Math.min(getLockNumber(), list.size());

        for (int i = 0; i < max; i++) {

            if (list.get(i).equals(selfNodeName)) {

                if (!handling) {

                    lockSuccess();

                    handling = true;

                    logger.info("得到鎖成功!");

                }

                return;

            }

        }

 

        int selfIndex = list.indexOf(selfNodeName);

        if (selfIndex > 0) {

            logger.info("前面還有節點" + list.get(selfIndex - 1) + ",獲取鎖失敗!");

        else {

            logger.info("獲取鎖失敗!");

        }

        lockFail();

 

        handling = false;

    }

 

    /**

     * 得到鎖成功的處理回調

     */

    protected abstract void lockSuccess();

 

    /**

     * 得到鎖失敗的處理回調

     */

    protected abstract void lockFail();

 

    /**

     * 初始化相關的Bean對象

     */

    protected abstract void initBean();

 

 

    protected void setZkClient(ZkClient zk) {

        this.zk = zk;

    }

 

    protected int getLockNumber() {

        return lockNumber;

    }

 

    protected void setLockNumber(int lockNumber) {

        this.lockNumber = lockNumber;

    }

 

    protected void setRootNode(String value) {

        this.rootNode = value;

    }

 

    /**

     * 防程序退出

     */

    private void remainRunning() {

        byte[] lock = new byte[0];

        synchronized (lock) {

            try {

                lock.wait();

            catch (InterruptedException e) {

                Thread.currentThread().interrupt();

                logger.error("remainRunning出錯:", e);

            }

        }

    }

 

    /**

     * 定時向zk發送心跳

     */

    private void heartBeat() {

        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

        service.scheduleAtFixedRate(() -> {

            HeartBeat heartBeat = new HeartBeat();

            heartBeat.setHostIp(NetworkUtil.getHostAddress());

            heartBeat.setHostName(NetworkUtil.getHostName());

            heartBeat.setLastTime(new Date());

            heartBeat.setPid(RuntimeUtil.getPid());

            zk.writeData(selfNodeFullName, jsonUtil.toJson(heartBeat));

        }, 015, TimeUnit.SECONDS);

    }

}

這個類中,提供了三個抽象方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

/**

 * 得到鎖成功的處理回調

 */

protected abstract void lockSuccess();

 

/**

 * 得到鎖失敗的處理回調

 */

protected abstract void lockFail();

 

/**

 * 初始化相關的Bean對象

 */

protected abstract void initBean();

用於處理搶鎖成功、搶鎖失敗、及開搶前的一些對象初始化處理,子類繼承後,只要實現這3個具體的方法便可,同時該抽象類默認還提供了心跳機制,用於定時向zk彙報自身的健康狀態。

相關文章
相關標籤/搜索