ZooKeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務。分佈式應用能夠使用它來實現諸如:統一命名服務、配置管理、分佈式鎖服務、集羣管理等功能。公司經常使用到的是Java服務集羣的管理。 node
1.函數介紹 分佈式
- //create a handle to used communicate with zookeeper
- zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags)
- //create a node synchronously
- int zoo_create(zhandle_t *zh, const char *path, const char *value,int valuelen,
- const struct ACL_vector *acl, int flags,char *path_buffer, int path_buffer_len);
- //lists the children of a node synchronously.
- int zoo_wget_children(zhandle_t *zh, const char *path, watcher_fn watcher, void* watcherCtx, struct String_vector *strings)
- //close the zookeeper handle and free up any resources.
- int zookeeper_close(zhandle_t *zh)
2.實例 函數
用上面3個函數,就能建立一個簡單的集羣管理。 spa
數據存儲結構爲 .net
服務端向Zk註冊服務 code
- //鏈接zk
- void ZkRegistry::ConnectZK() {
- if (zhandle_) {
- zookeeper_close(zhandle_);
- }
- int count = 0;
- do {
- ++count;
- zhandle_ = zookeeper_init(zk_hosts_.c_str(),InitWatcher, timeout_, NULL, NULL, 0);
- } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);
-
- if (count >= ZK_MAX_CONNECT_TIMES) {
- SLOG_WARN("ZkRegistry::Init --> connect host " << zk_hosts_ << " over max times:" << count);
- return;
- }
- }
-
- //發佈服務,創建臨時節點
- void ZkRegistry::PublishService() {
- if (zhandle_ == NULL) {
- ConnectZK();
- }
- string server_path = PING_SERVER + "/" + PingConfig::instance().get_index() + "/"
- + GetIp() + ":" + PingConfig::instance().get_port();
- char res_path[128];
- int rc = zoo_create(zhandle_, server_path.c_str(), GetIp().c_str(), GetIp().size(),
- &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, res_path, 128);
- if (rc) {
- SLOG_INFO("ZkRegistry::PublishService --> zoo_create() path=" << server_path << "," << zerror(rc));
- }
- }
-
- //每隔10s註冊一次服務
- void ZkRegistry::run() {
- while(true) {
- SLOG_INFO("publish service");
- ConnectZK();
- PublishService();
- sleep(10);
- }
- }
-
- void ZkRegistry::InitWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcher_ctx) {
- if (state == ZOO_CONNECTED_STATE) {
- connected_ = true;
- SLOG_INFO("InitWatcher() ZOO_CONNECTED_STATE");
- } else if (state == ZOO_AUTH_FAILED_STATE) {
- SLOG_INFO("InitWatcher() ZOO_AUTH_FAILED_STATE");
- } else if (state == ZOO_EXPIRED_SESSION_STATE) {
- SLOG_INFO("InitWatcher() ZOO_EXPIRED_SESSION_STATE");
- } else if (state == ZOO_CONNECTING_STATE) {
- SLOG_INFO("InitWatcher() ZOO_CONNECTING_STATE");
- } else if (state == ZOO_ASSOCIATING_STATE) {
- SLOG_INFO("InitWatcher() ZOO_ASSOCIATING_STATE");
- }
- }
客戶端獲取服務列表 server
- //鏈接zk server
- void ZkClient::ConnectZK() {
- cout << "ZkClient::ConnectZK" << endl;
- if (zhandle_) {
- zookeeper_close(zhandle_);
- }
- zhandle_ = NULL;
- connected_ = false;
-
- int count = 0;
- do {
- ++count;
- zhandle_ = zookeeper_init(zk_hosts_.c_str(), InitWatcher, timeout_, NULL, NULL, 0);
- sleep(5 * ONE_SECONDS);
- } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);
-
- if (count >= ZK_MAX_CONNECT_TIMES){
- cout << "ZkClient::Init --> connecting zookeeper host: " << zk_hosts_ << " over times: " << count << endl;
- }
- }
- //更新服務列表,冷備和熱備
- void ZkClient::Update() {
- cout << "ZkClient::Update" << endl;
- if (zhandle_ == NULL || connected_ == false) {
- Init();
- }
- //得到服務份數
- struct String_vector str_vec;
- int ret = zoo_wget_children(zhandle_, PING_SERVER.c_str(), ServiceWatcher, NULL, &str_vec);
- if (ret) {
- cout << "Update --> read path:" << PING_SERVER << " wrong, " << zerror(ret) << endl;
- return;
- }
-
- //得到各份服務ip:port
- for (int i = 0; i < str_vec.count; ++i) {
- struct String_vector node_vec;
- string path = PING_SERVER + "/" + str_vec.data[i];
- int ret = zoo_wget_children(zhandle_, path.c_str(), ServiceWatcher, NULL, &node_vec);
- cout << "Update --> path:" << path << ", ret:" << ret << ", node's size:" << node_vec.count << endl;
- if (ret || node_vec.count != 1) {
- continue;
- }
- ....
- }
- }
- //監控服務變化
- void ZkClient::ServiceWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
- cout << "type:" << type << endl;
- cout << "state:" << state << endl;
- cout << "path:" << path << endl;
- // cout << "watcherCtx:" << (char*)watcherCtx << endl;
- cout << "ZOO_CHILD_EVENT:" << ZOO_CHILD_EVENT << endl;
- if (ZOO_CHILD_EVENT == type) {
- cout << "ServiceWatcher ZOO_CHILD_EVENT" << endl;
- ZkClient::Instance().Update();//更新服務列表
- }
- }
服務端會每隔一段時間從新註冊本身; blog
客戶端在第一次與zk創建鏈接獲取服務列表時,註冊監聽函數。zk當節點發生變化時,通知客戶端,客戶端從新獲取服務列表,並註冊事件。 事件