python分佈式事務方案(二)基於消息最終一致性

python分佈式事務方案(二)基於消息最終一致性

上一章採用的是tcc方案,可是在進行批量操做時,好比說幾百臺主機一塊兒分配策略時,會執行很長時間,這時體驗比較差。 因爲zabbix隱藏域後臺,而這個慢主要是集中在調用zabbix接口,這裏咱們就基於消息最終一致性來進行優化 消息一致性方案是經過消息中間件保證上、下游應用數據操做的一致性。基本思路是將本地操做和發送消息放在一個事務中,保證本地操做和消息發送要麼二者都成功或者都失敗。下游應用向消息系統訂閱該消息,收到消息後執行相應操做。python

本地消息表是一種簡化版的方案,將數據庫中的表來做爲消息中間件。 本地消息表這種實現方式應該是業界使用最多的,其核心思想是將分佈式事務拆分紅本地事務進行處理,這種思路是來源於ebay。咱們能夠從下面的流程圖中看出其中的一些細節: mysql

基本思路就是:sql

消息生產方,須要額外建一個消息表,並記錄消息發送狀態。消息表和業務數據要在一個事務裏提交,也就是說他們要在一個數據庫裏面。而後消息會通過MQ發送到消息的消費方。若是消息發送失敗,會進行重試發送。數據庫

消息消費方,須要處理這個消息,並完成本身的業務邏輯。此時若是本地事務處理成功,代表已經處理成功了,若是處理失敗,那麼就會重試執行。若是是業務上面的失敗,能夠給生產方發送一個業務補償消息,通知生產方進行回滾等操做。json

生產方和消費方定時掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發送一遍。若是有靠譜的自動對帳補帳邏輯,這種方案仍是很是實用的。api

這種方案遵循BASE理論,採用的是最終一致性,筆者認爲是這幾種方案裏面比較適合實際業務場景的,即不會出現像2PC那樣複雜的實現(當調用鏈很長的時候,2PC的可用性是很是低的),也不會像TCC那樣可能出現確認或者回滾不了的狀況。框架

  • 優勢: 一種很是經典的實現,避免了分佈式事務,實現了最終一致性。在 .NET中 有現成的解決方案。async

  • 缺點: 消息表會耦合到業務系統中,若是沒有封裝好的解決方案,會有不少雜活須要處理。分佈式

下面是實現步驟:優化

一、先建立本地消息表

MESSAGE_STATUS={
    'active':'active',
    'fail':'fail',
    'success':'success'
}

class Message(models.Model):
    topic = models.CharField(max_length=50, blank=True)
    event_module = models.CharField(max_length=50, blank=True,null=True)
    event_fun= models.CharField(max_length=30, blank=True,null=True)
    params=models.TextField(null=True)
    remark=models.CharField(max_length=300, blank=True,null=True)
    status = models.CharField(max_length=20, blank=True)
    exec_count=models.SmallIntegerField(null=True)
    error_msg = models.TextField(null=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    index_together = ('status','exec_count') #聯合索引

    def __unicode__(self):
        return '%s' % self.remark

    def __str__(self):
        return '%s' % self.remark

二、定義生產者api 這裏提早定義了mysql和rabbix兩種消息存儲方式

from models import Message
from serializers import MessageSerializer

def event_add(message):
    MysqlQueue().add(message)

class MessageQueue():
    def __init__(self):
        pass

    def add(self,message):
        pass


class MysqlQueue(MessageQueue):

    def add(self,message):
        message["status"]="active"
        message["exec_count"]=0
        serializer=MessageSerializer(data=message)
        serializer.is_valid(raise_exception=True)
        serializer.save()

class RabbitQueue(MessageQueue):

    def add(self,message):
        pass

三、定義消費者api,這裏使用定時任務框架celery

import json
from models import Message
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


class MessageConsumer:
    def receive(self, topic=None):
        queryset = Message.objects.filter(exec_count__lt=5).exclude(status='success')
        if topic:
            queryset = queryset.filter(topic=topic)
        messages = queryset.order_by('id').all()
        for message in messages:
            try:
                m = __import__(message.event_module, fromlist=True)
                if hasattr(m, message.event_fun):
                    target_func = getattr(m, message.event_fun)
                    logger.info(message.params)
                    target_func(json.loads(message.params))
                    message.status='success'
                    message.exec_count=message.exec_count + 1
                    message.save()
                else:
                    logger.error("can not find function " + message.event_fun)
                    message.status='fail'
                    message.exec_count=message.exec_count + 1
                    message.error_msg="can not find function" + message.event_fun
                    message.save()
            except Exception ,e:
                logger.error("exec message fail,id:" + str(message.id)+",cause by "+e.message)
                logger.exception(e)
                message.status='fail'
                message.exec_count=message.exec_count + 1
                message.error_msg=e.message
                message.save()

四、定義定時任務,這裏若是已經有一個定時任務在跑,則直接跳過

exec_flag=False
@shared_task(ignore_result=True)
def reveive_event_message():
    global exec_flag
    if exec_flag:
        logger.warning("exists a tast exec reveive_event_message")
        return
    exec_flag=True
    logger.info("reveive_event_message start")
    MessageConsumer().receive()
    logger.info("reveive_event_message end")
    exec_flag=False

五、下面定義業務調用

def add_message(event_fun,params,remark):
    event_message = dict()
    event_message["topic"] = "topci"
    event_message["event_module"] = "callback path"
    event_message["event_fun"] =event_fun
    event_message["params"] = json.dumps(params)
    event_message["remark"] =remark
    logger.debug(event_message)
    event_add(event_message)

def create(self, request, *args, **kwargs):
    '''
    policy add
    '''
    assets = request.data["data"]
    try:
        with transaction.atomic():
            #save policy
            for  ;;
               #發送消息
               add_message("async_update_zabbix_trigger",params,"update trigger ")
    except rest_framework_serializers.ValidationError, e:
        logger.exception(e)
        raise

六、定義回調方法,這裏因爲是使用python能夠直接傳方法名,就能夠進行回調 好比說建立定時器

def async_create_zabbix_trigger(params):
    client = ZabbixClientProxy()
    host_id = get_zabbix_host_by_uuid(uuid)
    zabbix_items = get_zabbix_items(host_id)
    if zabbix_items is None or len(zabbix_items) == 0:
        return
    condition = alert_models.ConditionItem.objects.get(id=params["condition_id"])
    condition.alert_duration = params["alert_duration"]
    condition.item_threshold = params["item_threshold"]
    triggers = create_zabbix_trigger(client, asset, zabbix_items, condition, uuid)
    serializer = policy_serializers.ConditionTriggerSerializer(data=triggers, many=True)
    serializer.is_valid(raise_exception=True)
    serializer.save()

這裏能夠配置一個最大重試次數,若是超過就不會進行重試,這時就會發送郵件通知管理員進行手工重試,來達到最終一致性

相關文章
相關標籤/搜索