基於python操縱zookeeper介紹html
by:授客 QQ:1033553122 node
Win7 64位python
Python 3.3.4linux
kazoo-2.6.1-py2.py3-none-any.whl(windows)apache
kazoo-2.6.1.tar.gz (linux)windows
https://pypi.org/project/kazoo/#filesapi
zookeeper-3.4.13.tar.gzsession
下載地址:多線程
http://zookeeper.apache.org/releases.html#download函數
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
kazooStudy.py
#!/usr/bin/env python 3.4.0
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import threading
import time
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.retry import KazooRetry
def restart_zk_client():
'''重啓zookeeper會話'''
global zk_client
global zk_conn_stat
try:
zk_client.restart()
except Exception as e:
print('重啓zookeeper客戶端異常:%s' % e)
zk_conn_stat = 0 # zookeeper鏈接狀態 1-LOST 2-SUSPENDED 3-CONNECTED/RECONNECTED
def zk_conn_listener(state):
'''zookeeper鏈接狀態監聽器'''
global zk_conn_stat
if state == KazooState.LOST:
print('zookeeper connection lost')
zk_conn_stat = 1
# Register somewhere that the session was lost
thread = threading.Thread(target=restart_zk_client)
thread.start()
elif state == KazooState.SUSPENDED:
print('zookeeper connection dicconnected')
zk_conn_stat = 2
# Handle being disconnected from Zookeeper
else:
zk_conn_stat = 3
print('zookeeper connection cconnected/reconnected')
# Handle being connected/reconnected to Zookeeper
# 監視器
# 當節點有變化、節點被刪除時,將以多線程的方式調用以參數形式傳遞給get()、exists()的監視函數,監視函數將會接收到一個WatchedEvent實例
def event_listener(event):
print(event)
if __name__ == '__main__':
try:
# 創建鏈接
zk_client = KazooClient(hosts='127.0.0.1:2181')
zk_client.add_listener(zk_conn_listener) # 添加監聽器,監聽鏈接狀態
zk_client.start() # 初始化到zk的鏈接,能夠設置超時時間 zk_client.start(timeout=15) 默認15秒
print('zk_client state:', zk_client.state) # 查看連接狀態
# 建立節點
# ensure_path() 遞歸建立path中不存在的節點,可是不能爲節點設置數據,僅ACL.
zk_client.ensure_path('/node1')
# 建立永久節點
# create建立節點的同時,可爲節點設置數據,要求path路徑必須存在
if not zk_client.exists('/node1/subNode1'):
zk_client.create('/node1/subNode1', b'sub node1')
# 建立臨時節點
# 注意:會話丟失、重啓會話會致使zookeeper刪除重啓會話前建立的臨時節點
if not zk_client.exists('/node1/subNode2'):
zk_client.create('/node1/subNode2', b'sub node2', ephemeral=True)
# 建立有序臨時節點
zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)
# 讀取數據
# 判斷節點是否存在
if zk_client.exists('/node1'): # 若是返回值爲None則表示不存在給定節點
print('存在節點node1,節點路徑/node1')
# 獲取節點相關數據
data, stat = zk_client.get('/node1')
if stat:
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# 獲取給定節點的子節點
children = zk_client.get_children('/node1')
print('node1子節點 有 %s 子節點,節點名稱爲: %s' % (len(children), children))
print('/ 子節點', zk_client.get_children('/'))
# 更新節點
# 更新節點數據
zk_client.set("/node1/subNode2", b"some new data")
# 刪除節點 recursive參數可選,遞歸刪除節點數據
zk_client.delete("/node1", recursive=True)
# 重試命令
try:
result = zk_client.retry(zk_client.get, "/node1/subNode3")
print(result)
# 自定義重試
# max_tries 出錯最大重試次數, ignore_expire False-重試的時候忽略會話過時,不然不忽略
kr = KazooRetry(max_tries=3, ignore_expire=False)
result = kr(zk_client.get, "/node1/subNode3")
except Exception as e:
print('/node1/subNode3 不存在,因此會運行出錯')
# 釋放客戶端佔用資源,移除鏈接
zk_client.stop()
# zk_client.stop() 會致使zk_client鏈接狀態變成 LOST,進而觸發線程調用函數 restart_zk_client,
# 該函數未執行完成的狀況下,若是立刻執行相似get,create等函數,會致使運行出錯
#
while zk_conn_stat != 3:
continue
else:
i = 0
while i < 3000:
if i % 200 == 0:
time.sleep(2)
print('建立新節點')
zk_client.ensure_path('/node1')
zk_client.ensure_path('/node1/subNode2')
zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)
zk_client.set('/node1/subNode2', b'new data')
i += 1
# 關閉客戶端前必須先調用stop,不然會報錯
zk_client.stop()
# 關閉客戶端
zk_client.close()
except Exception as e:
print('運行出錯:%s' % e)
monitor.py
#!/usr/bin/env python
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import time
from kazoo.client import KazooClient
from kazoo.client import KazooState
zk = KazooClient(hosts='10.118.52.26:2181')
zk.start()
@zk.add_listener
def my_listener(state):
if state == KazooState.LOST:
print('LOST')
# Register somewhere that the session was lost
elif state == KazooState.SUSPENDED:
print('SUSPENDED')
# Handle being disconnected from Zookeeper
else:
pass
print('CONNECTED')
# Handle being connected/reconnected to Zookeeper
# 監視器
# 當節點有變化、節點被刪除時,將以多線程的方式調用以參數形式傳遞給get()、exists()的監視函數,監視函數將會接收到一個WatchedEvent實例
def event_listener(event):
print(event)
children = zk.get_children('/node1',watch=event_listener)
print('node1 has %s children with names %s' % (len(children), children))
# 更高級監視api
# 監視子節點的編號
@zk.ChildrenWatch('/node1')
def watch_children(children):
print("Children are now: %s" % children)
# 監視節點數據變動
@zk.DataWatch("/node1/subNode2") #
def watch_node(data, state):
"""監視節點數據是否變化"""
if state:
print("Version:", state.version, "data:", data)
# 空轉
i = 0
while i< 100:
# children = zk.get_children('/node1',watch=event_listener)
# print('node1 has %s children with names %s' % (len(children), children))
time.sleep(1)
zk.stop()
zk.close()
LOST
CONNECTED
SUSPENDED
KazooClient客戶端實例剛建立時,處於LOST狀態,同zookeeper創建鏈接後,轉爲CONNECTED 。若是鏈接出問題、切換到不一樣的zookeeper集羣幾點,轉爲SUSPENDED狀態,當你知道暫時不能執行命令,若是zookeeper節點再也不是集羣的一部分,鏈接將丟失,也會致使 SUSPENDED狀態
客戶端再次同zookeeper創建鏈接,若是會話不存在,客戶端鏈接狀態將轉爲LOST,若是會話沒有過時,可用則轉爲CONNECTED