隨着單體應用的拆分以及服務化的流行,如今分佈式事務已經比較常見,分佈式事務理論ACID、CAP、BASE等我就不說了,如今就直接說一下一種常見的解決方案-tcc TCC 其實就是採用的補償機制,其核心思想是:針對每一個操做,都要註冊一個與其對應的確認和補償(撤銷)操做。它分爲三個階段:python
優勢: 跟和兩階段提交比起來,實現以及流程相對簡單了一些,但數據的一致性比2PC也要差一些程序員
缺點: 缺點仍是比較明顯的,在2,3步中都有可能失敗。TCC屬於應用層的一種補償方式,因此須要程序員在實現的時候多寫不少補償的代碼,在一些場景中,一些業務流程可能用TCC不太好定義及處理。express
下面介紹下咱們應用的一種場景,有一個運維繫統須要運用到zabbix,而運維繫統拆分出了一個配置中心,下面是子系統依賴圖 網絡
在配置告警策略時須要調用zabbix接口 app
這時就涉及到一個分佈式事務。因爲咱們這裏只涉及到兩個事務,因此我這裏就寫了一個zabbix代理client,來做爲事務協調器運維
class ZabbixClientProxy(object): ''' zabbix client simple proxy ''' client = models.get_zbx_client() def __init__(self): self.create_triggers = list() self.update_triggers = list() self.delete_triggers = list() self.update_macros = list() def trigger_create(self, name, expression,uuid): try: trigger = self.client.hosts.trigger_create(name, expression, 1) trigger["uuid"]=uuid self.create_triggers.append(trigger) logger.debug("trigger_create " + name) return trigger except Exception, e: logger.error("trigger_create fail,cause by " + e.message) raise def trigger_update(self, triggerid, name, expression,uuid): try: logger.debug("trigger_update " + name) old_trigger = self.client.hosts.trigger_get(triggerid) update_result = self.client.hosts.trigger_update( triggerid, name=name, expression=expression, priority=1, enable=True) old_trigger["uuid"]=uuid logger.debug(old_trigger) self.update_triggers.append(old_trigger) return update_result except Exception, e: logger.error("trigger_update fail,cause by " + e.message) def trigger_delete(self, triggerid,uuid): try: logger.debug("trigger_delete " + triggerid) old_trigger = self.client.hosts.trigger_get(triggerid) delete_result = self.client.hosts.trigger_delete(triggerid) old_trigger["uuid"]=uuid self.delete_triggers.append(old_trigger) return delete_result except Exception, e: logger.error("trigger_delete fail,cause by " + e.message) def update_trigger_macro(self, uuid, item_threshold, alert_duration): all_hmacros = self.get_macro_by_name(uuid) if all_hmacros and len(all_hmacros) > 2: self.update_macro(all_hmacros, "DISK_USER_MAX", item_threshold) self.update_macro(all_hmacros, "DISK_USER_TIMES", str(alert_duration) + "m") self.update_macro(all_hmacros, "DISK_USER_ENABLE", 1) else: self.create_macro("DISK_USER_MAX", item_threshold, uuid) self.create_macro("DISK_USER_TIMES", str(alert_duration) + "m", uuid) self.create_macro("DISK_USER_ENABLE", 1, uuid) def stop_trigger(self, assets): if assets: for asset in assets: if asset.host is None: continue all_hmacros = self.get_macro_by_name(asset.host.uuid) if all_hmacros and len(all_hmacros) > 2: self.update_macro(all_hmacros, "DISK_USER_ENABLE", 0) else: self.create_macro("DISK_USER_MAX", 80, asset.host.uuid) self.create_macro("DISK_USER_TIMES", "5m", asset.host.uuid) self.create_macro("DISK_USER_ENABLE", 0, asset.host.uuid) def get_macro_by_name(self, uuid): return self.client.macros.list(uuid) def update_macro(self, all_hmacros, macro_name, value): for macro in all_hmacros: if macro['macro'] == ('{$' + macro_name + '}'): try: self.client.macros.update(macro['hostmacroid'], macro=macro_name, value=value) macro['name'] = macro_name self.update_macros.append(macro) logger.debug('update_macro ' + macro_name + ' to ' + str(value)) except Exception, e: logger.error('update_macro ' + macro_name + ' fail,case by ' + e.message) def create_macro(self, macro_name, value, uuid): try: hostid = self.client.macros._get_hostid(uuid) hmacro = self.client.macros.create(macro_name, value, hostid) logger.debug("create_macro success,macro_name:" + macro_name + ",value:" + str(value)) except Exception, e: logger.error("create_macro fail,cause by " + e.message) def trigger_get(self, triggerid): return self.client.hosts.trigger_get(triggerid) def trigger_list(self, hostid): return self.client.hosts.trigger_list(hostid) def item_list(self, uuid): return self.client.hosts.item_list(uuid) def rollback(self): logger.debug("start rollback") # rollback create for trigger in self.create_triggers: try: self.client.hosts.trigger_delete(trigger["triggerid"]) logger.debug('rollback_create_trigger ' + trigger["name"]) except Exception, e: logger.error('rollback_create_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.create_triggers = [] for trigger in self.update_triggers: try: expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]') self.client.hosts.trigger_update(trigger["triggerid"], name=trigger["name"], expression=expression, priority=1, enable=True) logger.debug('rollback_update_trigger ' + trigger["name"]) except Exception, e: logger.error('rollback_update_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.update_triggers = [] for trigger in self.delete_triggers: try: expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]') new_trigger = self.client.hosts.trigger_create(trigger["name"], expression, 1) logger.debug(new_trigger) logger.debug('rollback_delete_trigger ' + trigger["name"]) # 更新數據中的zabbix trigger id alert_models.ConditionTrigger.objects.filter(zabbix_trigger_id=trigger["triggerid"]).update( zabbix_trigger_id=new_trigger["triggerid"]) except Exception, e: logger.error('rollback_delete_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.delete_triggers = [] for macro in self.update_macros: try: self.client.macros.update(macro['hostmacroid'], macro=macro['name'], value=macro['value']) except Exception, e: logger.error('rollback_update_macro ' + macro['name'] + ' fail,case by ' + str(e.message)) logger.debug("end rollback")
事務成功,則提交本地事務,若是失敗則調用rollback分佈式
def create(self, request, *args, **kwargs): ''' policy add ''' assets = request.data["data"] client = ZabbixClientProxy() try: with transaction.atomic(): #save policy #將client做爲參數,對主機、監控項、觸發器進行增刪改 except rest_framework_serializers.ValidationError, e: logger.exception(e) client.rollback() raise
這樣作還有一個問題就是,在回滾中若是網絡忽然斷了這時會回滾失敗,這裏咱們記錄了日誌,後面咱們會經過掃描日誌來作到最終一致性,這裏咱們後面坐了補償,下一次修改時會自動修正回滾失敗問題。ui