zk系列-c++下zookeeper使用實例

ZooKeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務。分佈式應用能夠使用它來實現諸如:統一命名服務、配置管理、分佈式鎖服務、集羣管理等功能。公司經常使用到的是Java服務集羣的管理。 node

1.函數介紹 分佈式

[cpp]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //create a handle to used communicate with zookeeper  
  2. zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags)  
  3. //create a node synchronously  
  4. int zoo_create(zhandle_t *zh, const char *path, const char *value,int valuelen,  
  5.    const struct ACL_vector *acl, int flags,char *path_buffer, int path_buffer_len);  
  6. //lists the children of a node synchronously.  
  7. int zoo_wget_children(zhandle_t *zh, const char *path, watcher_fn watcher, void* watcherCtx, struct String_vector *strings)  
  8. //close the zookeeper handle and free up any resources.  
  9. int zookeeper_close(zhandle_t *zh)  

2.實例 函數

用上面3個函數,就能建立一個簡單的集羣管理。 spa

數據存儲結構爲 .net


服務端向Zk註冊服務 code

[cpp]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //鏈接zk  
  2. void ZkRegistry::ConnectZK() {  
  3.   if (zhandle_) {  
  4.     zookeeper_close(zhandle_);  
  5.   }  
  6.   int count = 0;  
  7.   do {  
  8.     ++count;  
  9.     zhandle_ = zookeeper_init(zk_hosts_.c_str(),InitWatcher, timeout_, NULL, NULL, 0);  
  10.   } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);  
  11.   
  12.   if (count >= ZK_MAX_CONNECT_TIMES) {  
  13.     SLOG_WARN("ZkRegistry::Init --> connect host " << zk_hosts_ << " over max times:" << count);  
  14.     return;  
  15.   }  
  16. }  
  17.   
  18. //發佈服務,創建臨時節點  
  19. void ZkRegistry::PublishService() {  
  20.   if (zhandle_ == NULL) {  
  21.     ConnectZK();  
  22.   }  
  23.   string server_path = PING_SERVER + "/" + PingConfig::instance().get_index() + "/"  
  24.     + GetIp() + ":" + PingConfig::instance().get_port();  
  25.   char res_path[128];  
  26.   int rc = zoo_create(zhandle_, server_path.c_str(), GetIp().c_str(), GetIp().size(),  
  27.       &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, res_path, 128);  
  28.   if (rc) {  
  29.     SLOG_INFO("ZkRegistry::PublishService --> zoo_create() path=" << server_path << "," << zerror(rc));  
  30.   }  
  31. }  
  32.   
  33. //每隔10s註冊一次服務  
  34. void ZkRegistry::run() {  
  35.   while(true) {  
  36.     SLOG_INFO("publish service");  
  37.     ConnectZK();  
  38.     PublishService();  
  39.     sleep(10);  
  40.   }  
  41. }  
  42.   
  43. void ZkRegistry::InitWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcher_ctx) {  
  44.   if (state == ZOO_CONNECTED_STATE) {  
  45.     connected_ = true;  
  46.     SLOG_INFO("InitWatcher() ZOO_CONNECTED_STATE");  
  47.   } else if (state == ZOO_AUTH_FAILED_STATE) {  
  48.     SLOG_INFO("InitWatcher() ZOO_AUTH_FAILED_STATE");  
  49.   } else if (state == ZOO_EXPIRED_SESSION_STATE) {  
  50.     SLOG_INFO("InitWatcher() ZOO_EXPIRED_SESSION_STATE");  
  51.   } else if (state == ZOO_CONNECTING_STATE) {  
  52.     SLOG_INFO("InitWatcher() ZOO_CONNECTING_STATE");  
  53.   } else if (state == ZOO_ASSOCIATING_STATE) {  
  54.     SLOG_INFO("InitWatcher() ZOO_ASSOCIATING_STATE");  
  55.   }  
  56. }  

客戶端獲取服務列表 server

[cpp]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. //鏈接zk server  
  2. void ZkClient::ConnectZK() {  
  3.   cout << "ZkClient::ConnectZK" << endl;  
  4.   if (zhandle_) {  
  5.     zookeeper_close(zhandle_);  
  6.   }  
  7.   zhandle_ = NULL;  
  8.   connected_ = false;  
  9.   
  10.   int count = 0;  
  11.   do {  
  12.     ++count;  
  13.     zhandle_ = zookeeper_init(zk_hosts_.c_str(), InitWatcher, timeout_, NULL, NULL, 0);  
  14.     sleep(5 * ONE_SECONDS);  
  15.   } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);  
  16.   
  17.   if (count >= ZK_MAX_CONNECT_TIMES){  
  18.     cout << "ZkClient::Init --> connecting zookeeper host: " << zk_hosts_ << " over times: " << count << endl;  
  19.   }  
  20. }  
  21. //更新服務列表,冷備和熱備  
  22. void ZkClient::Update() {  
  23.   cout << "ZkClient::Update" << endl;  
  24.   if (zhandle_ == NULL || connected_ == false) {  
  25.     Init();  
  26.   }  
  27.   //得到服務份數  
  28.   struct String_vector str_vec;  
  29.   int ret = zoo_wget_children(zhandle_, PING_SERVER.c_str(), ServiceWatcher, NULL, &str_vec);  
  30.   if (ret) {  
  31.     cout << "Update --> read path:" << PING_SERVER << " wrong, " << zerror(ret) << endl;  
  32.     return;  
  33.   }  
  34.   
  35.   //得到各份服務ip:port  
  36.   for (int i = 0; i < str_vec.count; ++i) {  
  37.     struct String_vector node_vec;  
  38.     string path = PING_SERVER + "/" + str_vec.data[i];  
  39.     int ret = zoo_wget_children(zhandle_, path.c_str(), ServiceWatcher, NULL, &node_vec);  
  40.     cout << "Update --> path:" << path << ", ret:" << ret << ", node's size:" << node_vec.count << endl;  
  41.     if (ret || node_vec.count != 1) {  
  42.       continue;  
  43.     }  
  44.     ....  
  45.   }  
  46. }  
  47. //監控服務變化  
  48. void ZkClient::ServiceWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {  
  49.   cout << "type:" << type << endl;  
  50.   cout << "state:" << state << endl;  
  51.   cout << "path:" << path << endl;  
  52. //  cout << "watcherCtx:" << (char*)watcherCtx << endl;  
  53.   cout << "ZOO_CHILD_EVENT:" << ZOO_CHILD_EVENT << endl;  
  54.   if (ZOO_CHILD_EVENT == type) {  
  55.     cout << "ServiceWatcher ZOO_CHILD_EVENT" << endl;  
  56.     ZkClient::Instance().Update();//更新服務列表      
  57.   }  
  58. }  

服務端會每隔一段時間從新註冊本身; blog

客戶端在第一次與zk創建鏈接獲取服務列表時,註冊監聽函數。zk當節點發生變化時,通知客戶端,客戶端從新獲取服務列表,並註冊事件。 事件

相關文章
相關標籤/搜索