答案是確定的,節約的時間是客戶端client和服務器redis server之間往返網絡延遲的時間。這個時間能夠用ping命令查看。html
網絡延遲高:批量執行,性能提高明顯java
網絡延遲低(本機):批量執行,性能提高不明顯python
某些客戶端(java和python)提供了一種叫作pipeline的編程模式用來解決批量提交請求的方式。redis
這裏咱們用python客戶端來舉例說明一下。編程
一、pipeline服務器
網絡延遲網絡
client與server機器之間網絡延遲以下,大約是30ms。函數
測試用例post
分別執行其中的try_pipeline和without_pipeline統計處理時間。 性能
# -*- coding:utf-8 -*- import redis import time from concurrent.futures import ProcessPoolExecutor r = redis.Redis(host='10.93.84.53', port=6379, password='bigdata123') def try_pipeline(): start = time.time() with r.pipeline(transaction=False) as p: p.sadd('seta', 1).sadd('seta', 2).srem('seta', 2).lpush('lista', 1).lrange('lista', 0, -1) p.execute() print time.time() - start def without_pipeline(): start = time.time() r.sadd('seta', 1) r.sadd('seta', 2) r.srem('seta', 2) r.lpush('lista', 1) r.lrange('lista', 0, -1) print time.time() - start def worker(): while True: try_pipeline() with ProcessPoolExecutor(max_workers=12) as pool: for _ in range(10): pool.submit(worker)
結果分析
try_pipeline平均處理時間:0.04659
without_pipeline平均處理時間:0.16672
咱們的批量裏有5個操做,在處理時間維度上性能提高了4倍!
網絡延遲大約是30ms,不使用批量的狀況下,網絡上的時間損耗就有0.15s(30ms*5)以上。而pipeline批量操做只進行一次網絡往返,因此延遲只有0.03s。能夠看到節省的時間基本都是網路延遲。
二、pipeline與transation
pipeline不單單用來批量的提交命令,還用來實現事務transation。
這裏對redis事務的討論不會太多,只是給出一個demo。詳細的描述你能夠參見這篇博客。redis事務
細心的你可能發現了,使用transaction與否不一樣之處在與建立pipeline實例的時候,transaction是否打開,默認是打開的。
# -*- coding:utf-8 -*- import redis from redis import WatchError from concurrent.futures import ProcessPoolExecutor r = redis.Redis(host='127.0.0.1', port=6379) # 減庫存函數, 循環直到減庫存完成 # 庫存充足, 減庫存成功, 返回True # 庫存不足, 減庫存失敗, 返回False def decr_stock(): # python中redis事務是經過pipeline的封裝實現的 with r.pipeline() as pipe: while True: try: # watch庫存鍵, multi後若是該key被其餘客戶端改變, 事務操做會拋出WatchError異常 pipe.watch('stock:count') count = int(pipe.get('stock:count')) if count > 0: # 有庫存 # 事務開始 pipe.multi() pipe.decr('stock:count') # 把命令推送過去 # execute返回命令執行結果列表, 這裏只有一個decr返回當前值 print pipe.execute()[0] return True else: return False except WatchError, ex: # 打印WatchError異常, 觀察被watch鎖住的狀況 print ex pipe.unwatch() def worker(): while True: # 沒有庫存就退出 if not decr_stock(): break # 實驗開始 # 設置庫存爲100 r.set("stock:count", 100) # 多進程模擬多個客戶端提交 with ProcessPoolExecutor(max_workers=2) as pool: for _ in range(10): pool.submit(worker)