zookeeper的開發接口之前主要以java和c爲主,隨着python項目愈來愈多的使用zookeeper做爲分佈式集羣實現,python的zookeeper接口也出現了不少,如今主流的純python的zookeeper接口是kazoo。所以如何使用kazoo開發基於python的分佈式程序是必須掌握的。html
1.安裝kazoojava
yum install python-pipnode
pip install kazoopython
安裝過程當中會出現一些python依賴包未安裝的狀況,安裝便可。服務器
2.運行kazoo基礎例子kazoo_basic.py分佈式
import time測試
from kazoo.client import KazooClientui
from kazoo.client import KazooStatespa
def main():調試
zk=KazooClient(hosts='127.0.0.1:2182')
zk.start()
@zk.add_listener
def my_listener(state):
if state == KazooState.LOST:
print("LOST")
elif state == KazooState.SUSPENDED:
print("SUSPENDED")
else:
print("Connected")
#Creating Nodes
# Ensure a path, create if necessary
zk.ensure_path("/my/favorite")
# Create a node with data
zk.create("/my/favorite/node", b"")
zk.create("/my/favorite/node/a", b"A")
#Reading Data
# Determine if a node exists
if zk.exists("/my/favorite"):
print("/my/favorite is existed")
@zk.ChildrenWatch("/my/favorite/node")
def watch_children(children):
print("Children are now: %s" % children)
# Above function called immediately, and from then on
@zk.DataWatch("/my/favorite/node")
def watch_node(data, stat):
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# Print the version of a node and its data
data, stat = zk.get("/my/favorite/node")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# List the children
children = zk.get_children("/my/favorite/node")
print("There are %s children with names %s" % (len(children), children))
#Updating Data
zk.set("/my/favorite", b"some data")
#Deleting Nodes
zk.delete("/my/favorite/node/a")
#Transactions
transaction = zk.transaction()
transaction.check('/my/favorite/node', version=-1)
transaction.create('/my/favorite/node/b', b"B")
results = transaction.commit()
print ("Transaction results is %s" % results)
zk.delete("/my/favorite/node/b")
zk.delete("/my", recursive=True)
time.sleep(2)
zk.stop()
if __name__ == "__main__":
try:
main()
except Exception, ex:
print "Ocurred Exception: %s" % str(ex)
quit()
運行結果:
Children are now: [u'a']
Version: 0, data:
Version: 0, data:
There are 1 children with names [u'a']
Children are now: []
Transaction results is [True, u'/my/favorite/node/b']
Children are now: [u'b']
Children are now: []
No handlers could be found for logger "kazoo.recipe.watchers"
LOST
以上程序運行了基本kazoo接口命令,包括建立刪除加watcher等操做,經過調試並對比zookeeper服務節點znode目錄結構的變化,就能夠理解具體的操做結果。
3.運行經過kazoo實現的分佈式鎖程序kazoo_lock.py
import logging, os, time
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.recipe.lock import Lock
class ZooKeeperLock():
def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):
self.hosts = hosts
self.id_str = id_str
self.zk_client = None
self.timeout = timeout
self.logger = logger
self.name = lock_name
self.lock_handle = None
self.create_lock()
def create_lock(self):
try:
self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)
self.zk_client.start(timeout=self.timeout)
except Exception, ex:
self.init_ret = False
self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)
logging.error(self.err_str)
return
try:
lock_path = os.path.join("/", "locks", self.name)
self.lock_handle = Lock(self.zk_client, lock_path)
except Exception, ex:
self.init_ret = False
self.err_str = "Create lock failed! Exception: %s" % str(ex)
logging.error(self.err_str)
return
def destroy_lock(self):
#self.release()
if self.zk_client != None:
self.zk_client.stop()
self.zk_client = None
def acquire(self, blocking=True, timeout=None):
if self.lock_handle == None:
return None
try:
return self.lock_handle.acquire(blocking=blocking, timeout=timeout)
except Exception, ex:
self.err_str = "Acquire lock failed! Exception: %s" % str(ex)
logging.error(self.err_str)
return None
def release(self):
if self.lock_handle == None:
return None
return self.lock_handle.release()
def __del__(self):
self.destroy_lock()
def main():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sh = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
sh.setFormatter(formatter)
logger.addHandler(sh)
zookeeper_hosts = "127.0.0.1:2182"
lock_name = "test"
lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)
ret = lock.acquire()
if not ret:
logging.info("Can't get lock! Ret: %s", ret)
return
logging.info("Get lock! Do something! Sleep 10 secs!")
for i in range(1, 11):
time.sleep(1)
print str(i)
lock.release()
if __name__ == "__main__":
try:
main()
except Exception, ex:
print "Ocurred Exception: %s" % str(ex)
quit()
將該測試文件copy到多個服務器,同時運行,就能夠看到分佈式鎖的效果了。
參考連接:
http://kazoo.readthedocs.org/en/latest/basic_usage.html
http://yunjianfei.iteye.com/blog/2164888