因業務須要寫了個python的裝飾器與redis同時使用,用於處理高併發操做,防止記錄重複操做python
裝飾器部分代碼redis
import hashlib from functools import wraps from api.api_state import APIState from api.base_api import APIResponse from utils.redis import acquire_lock_with_timeout, release_lock def lock_operation(option_name: str, lock_key: str, option_type="GET", paramer_type="request"): """ 操做加鎖防止同一記錄暴擊 @param option_name: 操做名稱 @param paramer_type: 取參類型 : request @param option_type: 請求類型 :GET,POST @param lock_key: 取參關鍵字 @return: """ def wrapper(func): def redislock(request, *args, **kwargs): if paramer_type == "request": if option_type == "GET": key_paramer = request.request.query_params.get(lock_key, None) else: key_paramer = request.request.data.get(lock_key, None) else: key_paramer = kwargs.get(lock_key, None) lock_name = hashlib.md5(f'{option_name}{key_paramer}'.encode()).hexdigest() identifier = acquire_lock_with_timeout(lock_name) if not identifier: return APIResponse(status=APIState.PARAMTER_ERROR.value, msg='操做太快了,請稍後再試') try: return func(request, *args, **kwargs) finally: release_lock(lock_name, identifier) return redislock return wrapper
reids 處理部分express
import json import logging import math import time import uuid from decimal import Decimal import redis from django.utils.timezone import now from django_redis import get_redis_connection from users.models import TCustomerinfo logger = logging.getLogger(__name__) conn = get_redis_connection("default") class DecimalEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, Decimal): return float(obj) return json.JSONEncoder.default(self, obj) def get_redis_value_by_key(key): return conn.get(key) def set_redis_value(key, value, expires): conn.set(key, value, expires) def set_cache_data(key, data, TOKEN_EXPIRED_TIME): cache_data = json.dumps(data, cls=DecimalEncoder) set_redis_value(key, cache_data, TOKEN_EXPIRED_TIME) return cache_data def acquire_lock_with_timeout(lockname, acquire_timeout=3, lock_timeout=5): logger.info(f'acquire_lock: {lockname}') # 128位隨機標識符。 identifier = str(uuid.uuid4()) lockname = 'lock:' + lockname # 確保傳給EXPIRE的都是整數。 lock_timeout = int(math.ceil(lock_timeout)) end = time.time() + acquire_timeout while time.time() < end: # 獲取鎖並設置過時時間。 if conn.setnx(lockname, identifier): conn.expire(lockname, lock_timeout) return identifier # 檢查過時時間,並在有須要時對其進行更新。 elif not conn.ttl(lockname): conn.expire(lockname, lock_timeout) time.sleep(.001) logger.info(f'can not lock: {lockname}') return False def release_lock(lockname, identifier): logger.info(f'release_lock: {lockname}') pipe = conn.pipeline(True) lockname = 'lock:' + lockname while True: try: # 檢查並確認進程還持有着鎖。 pipe.watch(lockname) #取得鎖值 lockname_value = pipe.get(lockname) if lockname_value: lockname_value = lockname_value.decode() if lockname_value == identifier: # 釋放鎖。 pipe.multi() pipe.delete(lockname) pipe.execute() logger.info("釋放結束") return True pipe.unwatch() break # 有其餘客戶端修改了鎖;重試。 except redis.exceptions.WatchError: pass # 進程已經失去了鎖。 return False
使用方式:django
class OrderLogisticsDetailsView(APIView): """ get express_id: express_no: """ @lock_operation(option_name="WL", paramer_type="kwargs", option_type="GET", lock_key="express_no") def get(self, request, *args, **kwargs): express_no = kwargs.get("express_no", None) express_id = kwargs.get("express_id", None) if (not express_id is None) and (not express_no is None): Logisticsinfo_ins = TLogisticsinfo.objects.filter(ls_id=express_id).first() ret = query_logistics_information(Logisticsinfo_ins, express_no) if ret: return ret return APIResponse(status=APIState.PARAMTER_ERROR.value)