Zookeeper深刻理解(三)kazoo接口

zookeeper的開發接口之前主要以java和c爲主,隨着python項目愈來愈多的使用zookeeper做爲分佈式集羣實現,python的zookeeper接口也出現了不少,如今主流的純python的zookeeper接口是kazoo。所以如何使用kazoo開發基於python的分佈式程序是必須掌握的。html

 

1.安裝kazoojava

yum install python-pipnode

pip install kazoopython

安裝過程當中會出現一些python依賴包未安裝的狀況,安裝便可。服務器

 

2.運行kazoo基礎例子kazoo_basic.py分佈式

import time測試

from kazoo.client import KazooClientui

from kazoo.client import KazooStatespa

 

def main():調試

    zk=KazooClient(hosts='127.0.0.1:2182')

    zk.start()

    

    @zk.add_listener

    def my_listener(state):

        if state == KazooState.LOST:

            print("LOST")

        elif state == KazooState.SUSPENDED:

            print("SUSPENDED")

        else:

            print("Connected")

    #Creating Nodes

    # Ensure a path, create if necessary

    zk.ensure_path("/my/favorite")

    # Create a node with data

    zk.create("/my/favorite/node", b"")

    zk.create("/my/favorite/node/a", b"A")

    #Reading Data

    # Determine if a node exists

    if zk.exists("/my/favorite"):

        print("/my/favorite is existed")

    @zk.ChildrenWatch("/my/favorite/node")

    def watch_children(children):

        print("Children are now: %s" % children)

    # Above function called immediately, and from then on

    @zk.DataWatch("/my/favorite/node")

    def watch_node(data, stat):

        print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

    # Print the version of a node and its data

    data, stat = zk.get("/my/favorite/node")

    print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

    # List the children

    children = zk.get_children("/my/favorite/node")

    print("There are %s children with names %s" % (len(children), children))

    #Updating Data

    zk.set("/my/favorite", b"some data")

    #Deleting Nodes

    zk.delete("/my/favorite/node/a")

    #Transactions

    transaction = zk.transaction()

    transaction.check('/my/favorite/node', version=-1)

    transaction.create('/my/favorite/node/b', b"B")

    results = transaction.commit()

    print ("Transaction results is %s" % results)

    zk.delete("/my/favorite/node/b")

    zk.delete("/my", recursive=True)

    time.sleep(2)

    zk.stop()

 

if __name__ == "__main__":

    try:

        main()

    except Exception, ex:

        print "Ocurred Exception: %s" % str(ex)

        quit()

 

運行結果:

Children are now: [u'a']

Version: 0, data: 

Version: 0, data: 

There are 1 children with names [u'a']

Children are now: []

Transaction results is [True, u'/my/favorite/node/b']

Children are now: [u'b']

Children are now: []

No handlers could be found for logger "kazoo.recipe.watchers"

LOST

以上程序運行了基本kazoo接口命令,包括建立刪除加watcher等操做,經過調試並對比zookeeper服務節點znode目錄結構的變化,就能夠理解具體的操做結果。

 

3.運行經過kazoo實現的分佈式鎖程序kazoo_lock.py

import logging, os, time

from kazoo.client import KazooClient

from kazoo.client import KazooState

from kazoo.recipe.lock import Lock

 

class ZooKeeperLock():

    def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):

        self.hosts = hosts

        self.id_str = id_str

        self.zk_client = None

        self.timeout = timeout

        self.logger = logger

        self.name = lock_name

        self.lock_handle = None

        self.create_lock()

    def create_lock(self):

        try:

            self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)

            self.zk_client.start(timeout=self.timeout)

        except Exception, ex:

            self.init_ret = False

            self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return

        try:

            lock_path = os.path.join("/", "locks", self.name)

            self.lock_handle = Lock(self.zk_client, lock_path)

        except Exception, ex:

            self.init_ret = False

            self.err_str = "Create lock failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return

    def destroy_lock(self):

        #self.release()

        if self.zk_client != None:

            self.zk_client.stop()

            self.zk_client = None

    def acquire(self, blocking=True, timeout=None):

        if self.lock_handle == None:

            return None

        try:

            return self.lock_handle.acquire(blocking=blocking, timeout=timeout)

        except Exception, ex:

            self.err_str = "Acquire lock failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return None

    def release(self):

        if self.lock_handle == None:

            return None

        return self.lock_handle.release()

    def __del__(self):

        self.destroy_lock()

 

def main():

    logger = logging.getLogger()

    logger.setLevel(logging.INFO)

    sh = logging.StreamHandler()

    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')

    sh.setFormatter(formatter)

    logger.addHandler(sh)

    zookeeper_hosts = "127.0.0.1:2182"

    lock_name = "test"

    lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)

    ret = lock.acquire()

    if not ret:

        logging.info("Can't get lock! Ret: %s", ret)

        return

    logging.info("Get lock! Do something! Sleep 10 secs!")

    for i in range(1, 11):

        time.sleep(1)

        print str(i)

    lock.release()

 

if __name__ == "__main__":

    try:

        main()

    except Exception, ex:

        print "Ocurred Exception: %s" % str(ex)

        quit()

將該測試文件copy到多個服務器,同時運行,就能夠看到分佈式鎖的效果了。

 

參考連接:

http://kazoo.readthedocs.org/en/latest/basic_usage.html

http://yunjianfei.iteye.com/blog/2164888

相關文章
相關標籤/搜索