上一章採用的是tcc方案,可是在進行批量操做時,好比說幾百臺主機一塊兒分配策略時,會執行很長時間,這時體驗比較差。 因爲zabbix隱藏域後臺,而這個慢主要是集中在調用zabbix接口,這裏咱們就基於消息最終一致性來進行優化 消息一致性方案是經過消息中間件保證上、下游應用數據操做的一致性。基本思路是將本地操做和發送消息放在一個事務中,保證本地操做和消息發送要麼二者都成功或者都失敗。下游應用向消息系統訂閱該消息,收到消息後執行相應操做。python
本地消息表是一種簡化版的方案,將數據庫中的表來做爲消息中間件。 本地消息表這種實現方式應該是業界使用最多的,其核心思想是將分佈式事務拆分紅本地事務進行處理,這種思路是來源於ebay。咱們能夠從下面的流程圖中看出其中的一些細節: mysql
基本思路就是:sql
消息生產方,須要額外建一個消息表,並記錄消息發送狀態。消息表和業務數據要在一個事務裏提交,也就是說他們要在一個數據庫裏面。而後消息會通過MQ發送到消息的消費方。若是消息發送失敗,會進行重試發送。數據庫
消息消費方,須要處理這個消息,並完成本身的業務邏輯。此時若是本地事務處理成功,代表已經處理成功了,若是處理失敗,那麼就會重試執行。若是是業務上面的失敗,能夠給生產方發送一個業務補償消息,通知生產方進行回滾等操做。json
生產方和消費方定時掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發送一遍。若是有靠譜的自動對帳補帳邏輯,這種方案仍是很是實用的。api
這種方案遵循BASE理論,採用的是最終一致性,筆者認爲是這幾種方案裏面比較適合實際業務場景的,即不會出現像2PC那樣複雜的實現(當調用鏈很長的時候,2PC的可用性是很是低的),也不會像TCC那樣可能出現確認或者回滾不了的狀況。框架
優勢: 一種很是經典的實現,避免了分佈式事務,實現了最終一致性。在 .NET中 有現成的解決方案。async
缺點: 消息表會耦合到業務系統中,若是沒有封裝好的解決方案,會有不少雜活須要處理。分佈式
下面是實現步驟:優化
一、先建立本地消息表
MESSAGE_STATUS={ 'active':'active', 'fail':'fail', 'success':'success' } class Message(models.Model): topic = models.CharField(max_length=50, blank=True) event_module = models.CharField(max_length=50, blank=True,null=True) event_fun= models.CharField(max_length=30, blank=True,null=True) params=models.TextField(null=True) remark=models.CharField(max_length=300, blank=True,null=True) status = models.CharField(max_length=20, blank=True) exec_count=models.SmallIntegerField(null=True) error_msg = models.TextField(null=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) index_together = ('status','exec_count') #聯合索引 def __unicode__(self): return '%s' % self.remark def __str__(self): return '%s' % self.remark
二、定義生產者api 這裏提早定義了mysql和rabbix兩種消息存儲方式
from models import Message from serializers import MessageSerializer def event_add(message): MysqlQueue().add(message) class MessageQueue(): def __init__(self): pass def add(self,message): pass class MysqlQueue(MessageQueue): def add(self,message): message["status"]="active" message["exec_count"]=0 serializer=MessageSerializer(data=message) serializer.is_valid(raise_exception=True) serializer.save() class RabbitQueue(MessageQueue): def add(self,message): pass
三、定義消費者api,這裏使用定時任務框架celery
import json from models import Message from celery.utils.log import get_task_logger logger = get_task_logger(__name__) class MessageConsumer: def receive(self, topic=None): queryset = Message.objects.filter(exec_count__lt=5).exclude(status='success') if topic: queryset = queryset.filter(topic=topic) messages = queryset.order_by('id').all() for message in messages: try: m = __import__(message.event_module, fromlist=True) if hasattr(m, message.event_fun): target_func = getattr(m, message.event_fun) logger.info(message.params) target_func(json.loads(message.params)) message.status='success' message.exec_count=message.exec_count + 1 message.save() else: logger.error("can not find function " + message.event_fun) message.status='fail' message.exec_count=message.exec_count + 1 message.error_msg="can not find function" + message.event_fun message.save() except Exception ,e: logger.error("exec message fail,id:" + str(message.id)+",cause by "+e.message) logger.exception(e) message.status='fail' message.exec_count=message.exec_count + 1 message.error_msg=e.message message.save()
四、定義定時任務,這裏若是已經有一個定時任務在跑,則直接跳過
exec_flag=False @shared_task(ignore_result=True) def reveive_event_message(): global exec_flag if exec_flag: logger.warning("exists a tast exec reveive_event_message") return exec_flag=True logger.info("reveive_event_message start") MessageConsumer().receive() logger.info("reveive_event_message end") exec_flag=False
五、下面定義業務調用
def add_message(event_fun,params,remark): event_message = dict() event_message["topic"] = "topci" event_message["event_module"] = "callback path" event_message["event_fun"] =event_fun event_message["params"] = json.dumps(params) event_message["remark"] =remark logger.debug(event_message) event_add(event_message) def create(self, request, *args, **kwargs): ''' policy add ''' assets = request.data["data"] try: with transaction.atomic(): #save policy for ;; #發送消息 add_message("async_update_zabbix_trigger",params,"update trigger ") except rest_framework_serializers.ValidationError, e: logger.exception(e) raise
六、定義回調方法,這裏因爲是使用python能夠直接傳方法名,就能夠進行回調 好比說建立定時器
def async_create_zabbix_trigger(params): client = ZabbixClientProxy() host_id = get_zabbix_host_by_uuid(uuid) zabbix_items = get_zabbix_items(host_id) if zabbix_items is None or len(zabbix_items) == 0: return condition = alert_models.ConditionItem.objects.get(id=params["condition_id"]) condition.alert_duration = params["alert_duration"] condition.item_threshold = params["item_threshold"] triggers = create_zabbix_trigger(client, asset, zabbix_items, condition, uuid) serializer = policy_serializers.ConditionTriggerSerializer(data=triggers, many=True) serializer.is_valid(raise_exception=True) serializer.save()
這裏能夠配置一個最大重試次數,若是超過就不會進行重試,這時就會發送郵件通知管理員進行手工重試,來達到最終一致性