Python 基於python操縱zookeeper介紹

基於python操縱zookeeper介紹html

 

by:授客  QQ:1033553122 node

測試環境

Win7 64位python

 

Python 3.3.4linux

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)apache

kazoo-2.6.1.tar.gz (linux)windows

https://pypi.org/project/kazoo/#filesapi

 

zookeeper-3.4.13.tar.gzsession

下載地址:多線程

http://zookeeper.apache.org/releases.html#download函數

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

 

 

 

代碼實踐

kazooStudy.py

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

import threading

import time

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

from kazoo.retry import KazooRetry

 

 

def restart_zk_client():

    '''重啓zookeeper會話'''

    global zk_client

    global zk_conn_stat

    try:

        zk_client.restart()

    except Exception as e:

        print('重啓zookeeper客戶端異常:%s' % e)

 

zk_conn_stat = 0 # zookeeper鏈接狀態 1-LOST   2-SUSPENDED 3-CONNECTED/RECONNECTED

def zk_conn_listener(state):

    '''zookeeper鏈接狀態監聽器'''

 

    global  zk_conn_stat

    if state == KazooState.LOST:

        print('zookeeper connection lost')

        zk_conn_stat = 1

        # Register somewhere that the session was lost

 

        thread = threading.Thread(target=restart_zk_client)

        thread.start()

 

    elif state == KazooState.SUSPENDED:

        print('zookeeper connection dicconnected')

        zk_conn_stat = 2

        # Handle being disconnected from Zookeeper

    else:

        zk_conn_stat = 3

        print('zookeeper connection cconnected/reconnected')

        # Handle being connected/reconnected to Zookeeper

 

# 監視器

# 當節點有變化、節點被刪除時,將以多線程的方式調用以參數形式傳遞給get()、exists()的監視函數,監視函數將會接收到一個WatchedEvent實例

def event_listener(event):

    print(event)

 

 

 

if __name__ == '__main__':

    try:

        # 創建鏈接

        zk_client = KazooClient(hosts='127.0.0.1:2181')

        zk_client.add_listener(zk_conn_listener) # 添加監聽器,監聽鏈接狀態

        zk_client.start() # 初始化到zk的鏈接,能夠設置超時時間 zk_client.start(timeout=15) 默認15秒

 

        print('zk_client state:', zk_client.state) # 查看連接狀態

 

        # 建立節點

        # ensure_path() 遞歸建立path中不存在的節點,可是不能爲節點設置數據,僅ACL.

        zk_client.ensure_path('/node1')

 

        # 建立永久節點

        # create建立節點的同時,可爲節點設置數據,要求path路徑必須存在

        if not zk_client.exists('/node1/subNode1'):

            zk_client.create('/node1/subNode1', b'sub node1')

 

        # 建立臨時節點

        # 注意:會話丟失、重啓會話會致使zookeeper刪除重啓會話前建立的臨時節點

        if not zk_client.exists('/node1/subNode2'):

            zk_client.create('/node1/subNode2', b'sub node2', ephemeral=True)

 

        # 建立有序臨時節點

        zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)

        # 讀取數據

        # 判斷節點是否存在

        if zk_client.exists('/node1'): # 若是返回值爲None則表示不存在給定節點

            print('存在節點node1,節點路徑/node1')

 

            # 獲取節點相關數據

            data, stat = zk_client.get('/node1')

            if stat:

                print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

 

            # 獲取給定節點的子節點

            children = zk_client.get_children('/node1')

            print('node1子節點 有 %s 子節點,節點名稱爲: %s' % (len(children), children))

 

            print('/ 子節點', zk_client.get_children('/'))

 

        # 更新節點

        # 更新節點數據

        zk_client.set("/node1/subNode2", b"some new data")

 

        # 刪除節點 recursive參數可選,遞歸刪除節點數據

        zk_client.delete("/node1", recursive=True)

 

 

        # 重試命令

        try:

            result = zk_client.retry(zk_client.get, "/node1/subNode3")

            print(result)

 

            # 自定義重試

            # max_tries 出錯最大重試次數, ignore_expire False-重試的時候忽略會話過時,不然不忽略

            kr = KazooRetry(max_tries=3, ignore_expire=False)

            result = kr(zk_client.get, "/node1/subNode3")

        except Exception as e:

            print('/node1/subNode3 不存在,因此會運行出錯')

 

 

        # 釋放客戶端佔用資源,移除鏈接

        zk_client.stop()

 

        #  zk_client.stop() 會致使zk_client鏈接狀態變成 LOST,進而觸發線程調用函數 restart_zk_client,

        # 該函數未執行完成的狀況下,若是立刻執行相似get,create等函數,會致使運行出錯

        #

 

        while zk_conn_stat != 3:

            continue

        else:

            i = 0

            while i < 3000:

                if i % 200 == 0:

                    time.sleep(2)

                    print('建立新節點')

                    zk_client.ensure_path('/node1')

                    zk_client.ensure_path('/node1/subNode2')

                    zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)

                    zk_client.set('/node1/subNode2', b'new data')

                i += 1

 

 

 

        # 關閉客戶端前必須先調用stop,不然會報錯

        zk_client.stop()

 

        # 關閉客戶端

        zk_client.close()

    except Exception as e:

        print('運行出錯:%s' % e)

 

 

monitor.py

#!/usr/bin/env python

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import time

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

 

zk = KazooClient(hosts='10.118.52.26:2181')

zk.start()

 

@zk.add_listener

def my_listener(state):

    if state == KazooState.LOST:

        print('LOST')

        # Register somewhere that the session was lost

    elif state == KazooState.SUSPENDED:

        print('SUSPENDED')

        # Handle being disconnected from Zookeeper

    else:

        pass

        print('CONNECTED')

        # Handle being connected/reconnected to Zookeeper

 

# 監視器

# 當節點有變化、節點被刪除時,將以多線程的方式調用以參數形式傳遞給get()、exists()的監視函數,監視函數將會接收到一個WatchedEvent實例

def event_listener(event):

    print(event)

 

children = zk.get_children('/node1',watch=event_listener)

print('node1 has %s children with names %s' % (len(children), children))

 

# 更高級監視api

# 監視子節點的編號

@zk.ChildrenWatch('/node1')

def watch_children(children):

    print("Children are now: %s" % children)

 

 

# 監視節點數據變動

@zk.DataWatch("/node1/subNode2") #

def watch_node(data, state):

    """監視節點數據是否變化"""

    if state:

        print("Version:", state.version, "data:", data)

 

# 空轉

i = 0

while i< 100:

    # children = zk.get_children('/node1',watch=event_listener)

    # print('node1 has %s children with names %s' % (len(children), children))

    time.sleep(1)

 

zk.stop()

zk.close()

 

 

 

關於kazooClient鏈接狀態說明

LOST

CONNECTED

SUSPENDED

 

KazooClient客戶端實例剛建立時,處於LOST狀態,同zookeeper創建鏈接後,轉爲CONNECTED 。若是鏈接出問題、切換到不一樣的zookeeper集羣幾點,轉爲SUSPENDED狀態,當你知道暫時不能執行命令,若是zookeeper節點再也不是集羣的一部分,鏈接將丟失,也會致使 SUSPENDED狀態

 

客戶端再次同zookeeper創建鏈接,若是會話不存在,客戶端鏈接狀態將轉爲LOST,若是會話沒有過時,可用則轉爲CONNECTED

 

 

運行效果

 

參考連接:

https://kazoo.readthedocs.io/en/latest/basic_usage.html

相關文章
相關標籤/搜索