【Zookeeper系列五】ZooKeeper 實時更新server列表

#0 系列目錄#java

#1 場景描述# 在分佈式應用中, 咱們常常同時啓動多個server, 調用方(client)選擇其中之一發起請求.

分佈式應用必須考慮高可用性和可擴展性: server的應用進程可能會崩潰, 或者server自己也可能會宕機. 當server不夠時, 也有可能增長server的數量. 總而言之, server列表並不是一成不變, 而是一直處於動態的增減中.

那麼client如何才能實時的更新server列表呢? 解決的方案不少, 本文將講述利用ZooKeeper的解決方案.

#2 思路# 啓動server時, 在zookeeper的某個znode(假設爲/sgroup)下建立一個子節點. 所建立的子節點的類型應該爲ephemeral, 這樣一來, 若是server進程崩潰, 或者server宕機, 與zookeeper鏈接的session就結束了, 那麼其所建立的子節點會被zookeeper自動刪除. 當崩潰的server恢復後, 或者新增server時, 一樣須要在/sgroup節點下建立新的子節點.

對於client, 只需註冊/sgroup子節點的監聽, 當/sgroup下的子節點增長或減小時, zookeeper會通知client, 此時client更新server列表.

#3 實現AppServer# AppServer的邏輯很是簡單, 只須在啓動時, 在zookeeper的"/sgroup"節點下新增一個子節點便可.

AppServer.java源碼:

package com.king.server;

import org.apache.zookeeper.*;

public class AppServer {

	private String groupNode = "sgroup";

	private String subNode = "sub";

	/**
	 * 鏈接zookeeper
	 *
	 * @param address server的地址
	 */
	public void connectZookeeper(String address) throws Exception {
		ZooKeeper zk = new ZooKeeper("localhost:2180,localhost:2181,localhost:2182", 5000, new Watcher() {
			public void process(WatchedEvent event) {
				// 不作處理
			}
		});
		// 在"/sgroup"下建立子節點
		// 子節點的類型設置爲EPHEMERAL_SEQUENTIAL, 代表這是一個臨時節點, 且在子節點的名稱後面加上一串數字後綴
		// 將server的地址數據關聯到新建立的子節點上
		String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),
				ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println("create: " + createdPath);
	}

	/**
	 * server的工做邏輯寫在這個方法中
	 * 此處不作任何處理, 只讓server sleep
	 */
	public void handle() throws InterruptedException {
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception {
		// 在參數中指定server的地址
		if (args.length == 0) {
			System.err.println("The first argument must be server address");
			System.exit(1);
		}

		AppServer as = new AppServer();
		as.connectZookeeper(args[0]);

		as.handle();
	}
}

#4 實現AppClient# AppClient的邏輯比AppServer稍微複雜一些, 須要監聽"/sgroup"下子節點的變化事件, 當事件發生時, 須要更新server列表.

註冊監聽"/sgroup"下子節點的變化事件, 可在getChildren方法中完成. 當zookeeper回調監聽器的process方法時, 判斷該事件是不是"/sgroup"下子節點的變化事件, 若是是, 則調用更新邏輯, 並再次註冊該事件的監聽.

AppClient.java源碼:

package com.king.server;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class AppClient {

	private String groupNode = "sgroup";

	private ZooKeeper zk;

	private Stat stat = new Stat();

	private volatile List<String> serverList;

	/**
	 * 鏈接zookeeper
	 */
	public void connectZookeeper() throws Exception {
		zk = new ZooKeeper("localhost:2180,localhost:2181,localhost:2182", 5000, new Watcher() {
			public void process(WatchedEvent event) {
				// 若是發生了"/sgroup"節點下的子節點變化事件, 更新server列表, 並從新註冊監聽
				if (event.getType() == Event.EventType.NodeChildrenChanged
						&& ("/" + groupNode).equals(event.getPath())) {
					try {
						updateServerList();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		});

		updateServerList();
	}

	/**
	 * 更新server列表
	 */
	private void updateServerList() throws Exception {
		List<String> newServerList = new ArrayList<String>();

		// 獲取並監聽groupNode的子節點變化
		// watch參數爲true, 表示監聽子節點變化事件. 
		// 每次都須要從新註冊監聽, 由於一次註冊, 只能監聽一次事件, 若是還想繼續保持監聽, 必須從新註冊
		List<String> subList = zk.getChildren("/" + groupNode, true);
		for (String subNode : subList) {
			// 獲取每一個子節點下關聯的server地址
			byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);
			newServerList.add(new String(data, "utf-8"));
		}

		// 替換server列表
		serverList = newServerList;

		System.out.println("server list updated: " + serverList);
	}

	/**
	 * client的工做邏輯寫在這個方法中
	 * 此處不作任何處理, 只讓client sleep
	 */
	public void handle() throws InterruptedException {
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception {
		AppClient ac = new AppClient();
		ac.connectZookeeper();

		ac.handle();
	}
}
相關文章
相關標籤/搜索