Python Redis pipeline操做(秒殺實現)

設想這樣的一個場景,你要批量的執行一系列redis命令,例如執行100次get key,這時你要向redis請求100次+獲取響應100次。若是能一次性將100個請求提交給redis server,執行完成以後批量的獲取相應,只須要向redis請求1次,而後批量執行完命令,一次性結果,性能是否是會好不少呢?

答案是確定的,節約的時間是客戶端client和服務器redis server之間往返網絡延遲的時間。這個時間能夠用ping命令查看。html

網絡延遲高:批量執行,性能提高明顯java

網絡延遲低(本機):批量執行,性能提高不明顯python

某些客戶端(java和python)提供了一種叫作pipeline的編程模式用來解決批量提交請求的方式。redis

這裏咱們用python客戶端來舉例說明一下。編程

 

一、pipeline服務器

網絡延遲網絡

client與server機器之間網絡延遲以下,大約是30ms。函數

 

測試用例post

分別執行其中的try_pipeline和without_pipeline統計處理時間。 性能

複製代碼
# -*- coding:utf-8 -*-

import redis
import time
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='10.93.84.53', port=6379, password='bigdata123')


def try_pipeline():
    start = time.time()
    with r.pipeline(transaction=False) as p:
        p.sadd('seta', 1).sadd('seta', 2).srem('seta', 2).lpush('lista', 1).lrange('lista', 0, -1)
        p.execute()
    print time.time() - start


def without_pipeline():
    start = time.time()
    r.sadd('seta', 1)
    r.sadd('seta', 2)
    r.srem('seta', 2)
    r.lpush('lista', 1)
    r.lrange('lista', 0, -1)
    print time.time() - start


def worker():
    while True:
        try_pipeline()

with ProcessPoolExecutor(max_workers=12) as pool:
    for _ in range(10):
        pool.submit(worker)
複製代碼

結果分析

try_pipeline平均處理時間:0.04659

without_pipeline平均處理時間:0.16672

咱們的批量裏有5個操做,在處理時間維度上性能提高了4倍!

網絡延遲大約是30ms,不使用批量的狀況下,網絡上的時間損耗就有0.15s(30ms*5)以上。而pipeline批量操做只進行一次網絡往返,因此延遲只有0.03s。能夠看到節省的時間基本都是網路延遲。

 

二、pipeline與transation

pipeline不單單用來批量的提交命令,還用來實現事務transation。

這裏對redis事務的討論不會太多,只是給出一個demo。詳細的描述你能夠參見這篇博客。redis事務

細心的你可能發現了,使用transaction與否不一樣之處在與建立pipeline實例的時候,transaction是否打開,默認是打開的。

複製代碼
# -*- coding:utf-8 -*-

import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='127.0.0.1', port=6379)


# 減庫存函數, 循環直到減庫存完成
# 庫存充足, 減庫存成功, 返回True
# 庫存不足, 減庫存失敗, 返回False
def decr_stock():

    # python中redis事務是經過pipeline的封裝實現的
    with r.pipeline() as pipe:
        while True:
            try:
                # watch庫存鍵, multi後若是該key被其餘客戶端改變, 事務操做會拋出WatchError異常
                pipe.watch('stock:count')
                count = int(pipe.get('stock:count'))
                if count > 0:  # 有庫存
                    # 事務開始
                    pipe.multi()
                    pipe.decr('stock:count')
                    # 把命令推送過去
                    # execute返回命令執行結果列表, 這裏只有一個decr返回當前值
                    print pipe.execute()[0]
                    return True
                else:
                    return False
            except WatchError, ex:
                # 打印WatchError異常, 觀察被watch鎖住的狀況
                print ex
                pipe.unwatch()


def worker():
    while True:
        # 沒有庫存就退出
        if not decr_stock():
            break


# 實驗開始
# 設置庫存爲100
r.set("stock:count", 100)

# 多進程模擬多個客戶端提交
with ProcessPoolExecutor(max_workers=2) as pool:
    for _ in range(10):
        pool.submit(worker)
複製代碼
相關文章
相關標籤/搜索