python 帶參裝飾器 +reids ,處理高頻操做

因業務須要寫了個python的裝飾器與redis同時使用,用於處理高併發操做,防止記錄重複操做python

裝飾器部分代碼redis

import hashlib
from functools import wraps
from api.api_state import APIState
from api.base_api import APIResponse
from utils.redis import acquire_lock_with_timeout, release_lock


def lock_operation(option_name: str, lock_key: str, option_type="GET", paramer_type="request"):
    """
    操做加鎖防止同一記錄暴擊
    @param option_name: 操做名稱
    @param paramer_type: 取參類型 : request
    @param option_type: 請求類型 :GET,POST
    @param lock_key: 取參關鍵字
    @return:
    """

    def wrapper(func):
        def redislock(request, *args, **kwargs):
            if paramer_type == "request":
                if option_type == "GET":
                    key_paramer = request.request.query_params.get(lock_key, None)
                else:
                    key_paramer = request.request.data.get(lock_key, None)
            else:
                key_paramer = kwargs.get(lock_key, None)

            lock_name = hashlib.md5(f'{option_name}{key_paramer}'.encode()).hexdigest()
            identifier = acquire_lock_with_timeout(lock_name)
            if not identifier:
                return APIResponse(status=APIState.PARAMTER_ERROR.value, msg='操做太快了,請稍後再試')
            try:
                return func(request, *args, **kwargs)
            finally:
                release_lock(lock_name, identifier)

        return redislock

    return wrapper

reids 處理部分express

import json
import logging
import math
import time
import uuid
from decimal import Decimal

import redis
from django.utils.timezone import now
from django_redis import get_redis_connection

from users.models import TCustomerinfo

logger = logging.getLogger(__name__)

conn = get_redis_connection("default")

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return json.JSONEncoder.default(self, obj)


def get_redis_value_by_key(key):
    return conn.get(key)


def set_redis_value(key, value, expires):
    conn.set(key, value, expires)


def set_cache_data(key, data, TOKEN_EXPIRED_TIME):
    cache_data = json.dumps(data, cls=DecimalEncoder)
    set_redis_value(key, cache_data, TOKEN_EXPIRED_TIME)
    return cache_data


def acquire_lock_with_timeout(lockname, acquire_timeout=3, lock_timeout=5):
    logger.info(f'acquire_lock: {lockname}')
    # 128位隨機標識符。
    identifier = str(uuid.uuid4())
    lockname = 'lock:' + lockname
    # 確保傳給EXPIRE的都是整數。
    lock_timeout = int(math.ceil(lock_timeout))

    end = time.time() + acquire_timeout
    while time.time() < end:
        # 獲取鎖並設置過時時間。
        if conn.setnx(lockname, identifier):
            conn.expire(lockname, lock_timeout)
            return identifier
        # 檢查過時時間,並在有須要時對其進行更新。
        elif not conn.ttl(lockname):
            conn.expire(lockname, lock_timeout)

        time.sleep(.001)

    logger.info(f'can not lock: {lockname}')
    return False


def release_lock(lockname, identifier):
    logger.info(f'release_lock: {lockname}')
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname

    while True:
        try:
            # 檢查並確認進程還持有着鎖。
            pipe.watch(lockname)
            #取得鎖值
            lockname_value = pipe.get(lockname)
            if lockname_value:
                lockname_value = lockname_value.decode()

            if lockname_value == identifier:

                # 釋放鎖。
                pipe.multi()
                pipe.delete(lockname)
                pipe.execute()
                logger.info("釋放結束")
                return True

            pipe.unwatch()
            break

        # 有其餘客戶端修改了鎖;重試。
        except redis.exceptions.WatchError:
            pass

    # 進程已經失去了鎖。
    return False

使用方式:django

class OrderLogisticsDetailsView(APIView):
    """
    get
        express_id:
        express_no:
    """

    @lock_operation(option_name="WL", paramer_type="kwargs", option_type="GET", lock_key="express_no")
    def get(self, request, *args, **kwargs):
        express_no = kwargs.get("express_no", None)
        express_id = kwargs.get("express_id", None)
        if (not express_id is None) and (not express_no is None):
            Logisticsinfo_ins = TLogisticsinfo.objects.filter(ls_id=express_id).first()
            ret = query_logistics_information(Logisticsinfo_ins, express_no)

            if ret:
                return ret

        return APIResponse(status=APIState.PARAMTER_ERROR.value)

參考連接:
https://mp.weixin.qq.com/s/nPNowhyNPQah-eQBbsj29Qjson

相關文章
相關標籤/搜索