1,數據一致性python
當多個進程/線程對同一個共享資源讀寫,會由於資源的爭奪而出現混亂,致使數據不一致。mysql
以下圖:redis
在數據庫的原始數據是 d0,上圖的處理流程以下:算法
t1 時刻,有兩個數據源的數據 d1,d2 分別到達數據處理層,主進程分配線程 Merge1 處理 d1,Merge2 處理 d2,二者又同時(假設仍是 t1 )從數據庫獲取原始數據 d0
t2 時刻,Merge1 合併完 d0 和 d1 的數據,並將合併後的數據存到數據庫,數據庫的數據變成 d0 + d1
t3 時刻,Merge2 合併完 d0 和 d2 的數據,並將合併後的數據存到數據庫,數據庫的數據變成 d0 + d2
t1 到 t3,數據庫最終的數據變成了 d0 + d2,數據源 d1 的數據消失,出現數據不一致問題。
上面所列的問題,是因爲多線程同時對某一個共享數據進行讀寫致使,咱們只要找到一種方案,使得對共享數據的訪問是同步的,便可解決該問題。當有某個線程或者進程已經訪問了該數據,其餘進程或者線程就必須等待其訪問結束,纔可擁有該共享數據的訪問權(進入臨界區)。最簡單的方式,就是加個同步鎖。sql
鎖的實現方式,按照應用的實現架構,可能會有如下幾種類型:數據庫
若是處理程序是單進程多線程的,在 python下,就可使用 threading 模塊的 Lock 對象來限制對共享變量的同步訪問,實現線程安全。緩存
單機多進程的狀況,在 python 下,可使用 multiprocessing 的 Lock 對象來處理。安全
多機多進程部署的狀況,就得依賴一個第三方組件(存儲鎖對象)來實現一個分佈式的同步鎖了。服務器
2,分佈式鎖實現方式網絡
目前主流的分佈式鎖實現方式有如下幾種:
基於數據庫來實現,如 mysql
基於緩存來實現,如 redis
基於 zookeeper 來實現
下面咱們簡單介紹下這幾種鎖的實現。
2.1,基於數據庫的鎖:
基於數據庫的鎖實現也有兩種方式,一是基於數據庫表,另外一種是基於數據庫排他鎖。
基於數據庫表的增刪:
基於數據庫表增刪是最簡單的方式,首先建立一張鎖的表主要包含下列字段:方法名,時間戳等字段。
具體使用的方法,當須要鎖住某個方法時,往該表中插入一條相關的記錄。這邊須要注意,方法名是有惟一性約束的,若是有多個請求同時提交到數據庫的話,數據庫會保證只有一個操做能夠成功,那麼咱們就能夠認爲操做成功的那個線程得到了該方法的鎖,能夠執行方法體內容。執行完畢,須要delete該記錄。
對於上述方案能夠進行優化,如應用主從數據庫,數據之間雙向同步。一旦掛掉快速切換到備庫上;作一個定時任務,每隔必定時間把數據庫中的超時數據清理一遍;使用while循環,直到insert成功再返回成功,雖然並不推薦這樣作;還能夠記錄當前得到鎖的機器的主機信息和線程信息,那麼下次再獲取鎖的時候先查詢數據庫,若是當前機器的主機信息和線程信息在數據庫能夠查到的話,直接把鎖分配給他就能夠了,實現可重入鎖。
數據庫的排他鎖:
基於MySql的InnoDB引擎,可使用如下方法來實現加鎖操做。
在查詢語句後面增長for update,數據庫會在查詢過程當中給數據庫表增長排他鎖。當某條記錄被加上排他鎖以後,其餘線程沒法再在該行記錄上增長排他鎖。其餘沒有獲取到鎖的就會阻塞在上述select語句上,可能的結果有2種,在超時以前獲取到了鎖,在超時以前仍未獲取到鎖。
得到排它鎖的線程便可得到分佈式鎖,當獲取到鎖以後,能夠執行方法的業務邏輯,執行完方法以後,釋放鎖 connection.commit() 。
存在的問題主要是性能不高和sql超時的異常。
2.2,基於zookeeper實現
基於zookeeper臨時有序節點能夠實現的分佈式鎖。每一個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節點的目錄下,生成一個惟一的瞬時有序節點。 判斷是否獲取鎖的方式很簡單,只須要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個瞬時節點刪除便可。同時,其能夠避免服務宕機致使的鎖沒法釋放,而產生的死鎖問題。
提供的第三方庫有 curator ,具體使用讀者能夠自行去看一下。Curator提供的InterProcessMutex是分佈式鎖的實現。acquire方法獲取鎖,release方法釋放鎖。另外,鎖釋放、阻塞鎖、可重入鎖等問題均可以有有效解決。講下阻塞鎖的實現,客戶端能夠經過在ZK中建立順序節點,而且在節點上綁定監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端能夠檢查本身建立的節點是否是當前全部節點中序號最小的,若是是就獲取到鎖,即可以執行業務邏輯。
最後,Zookeeper實現的分佈式鎖其實存在一個缺點,那就是性能上可能並無緩存服務那麼高。由於每次在建立鎖和釋放鎖的過程當中,都要動態建立、銷燬瞬時節點來實現鎖功能。ZK中建立和刪除節點只能經過Leader服務器來執行,而後將數據同不到全部的Follower機器上。併發問題,可能存在網絡抖動,客戶端和ZK集羣的session鏈接斷了,zk集羣覺得客戶端掛了,就會刪除臨時節點,這時候其餘客戶端就能夠獲取到分佈式鎖了。
用法參考:https://yunjianfei.iteye.com/blog/2164888
2.3,基於緩存redis實現
相對於基於數據庫實現分佈式鎖的方案來講,基於緩存來實如今性能方面會表現的更好一點,存取速度快不少。並且不少緩存是能夠集羣部署的,能夠解決單點問題。
使用redis的SETNX實現分佈式鎖,多個進程執行如下Redis命令:
SETNX lock.id <current Unix time + lock timeout + 1>
SETNX是將 key 的值設爲 value,當且僅當 key 不存在。若給定的 key 已經存在,則 SETNX 不作任何動做。
返回1,說明該進程得到鎖,SETNX將鍵 lock.id 的值設置爲鎖的超時時間,當前時間 +加上鎖的有效時間。
返回0,說明其餘進程已經得到了鎖,進程不能進入臨界區。進程能夠在一個循環中不斷地嘗試 SETNX 操做,以得到鎖。
3,分佈式鎖保持數據一致的原理
每種實現方式各有千秋,綜合考量,咱們最終決定使用 redis,主要緣由是:
redis 是基於內存來操做,存取速度比數據庫快,在高併發下,加鎖以後的性能不會降低太多
redis 能夠設置鍵值的生存時間(TTL)
redis 的使用方式簡單,整體實現開銷小
同時使用 redis 實現的分佈鎖還須要具有如下幾個條件:
同一個時刻只能有一個線程佔有鎖,其餘線程必須等待直到鎖被釋放
鎖的操做必須知足原子性
不會發生死鎖,例如已得到鎖的線程在釋放鎖以前忽然異常退出,致使其餘線程會一直在循環等待鎖被釋放
鎖的添加和釋放必須由同一個線程來設置
咱們在上圖 的基礎上,在 Data process 和 Database 之間加了一層鎖,咱們在 redis 中使用添加了一個 lock_key 來做爲鎖的標識,流程圖以下:
仍是假設某臺機器(圖中的machine)在數據庫的原始數據是 d0,上圖的處理流程變成了:
t1 時刻,有兩個數據源的數據 d1,d2 同時到達數據處理層,主進程分配了線程 Merge1 處理 d1,線程 Merge2 處理 d2,二者又同時嘗試從 redis 得到鎖
t2 時刻,Merge1 成功得到了鎖,同時從數據庫中加載 machine 的原始數據 d0,Merge2 循環等待 Merge1 釋放鎖
t3 時刻,Merge1 合併完數據,並將合併好的數據 d0 + d1 存放到數據庫,最後釋放鎖
t4 時刻,Merge2 得到了鎖,同時從數據庫中加載machine的數據 d0 + d1
t5 時刻,Merge2 合併完數據,並將合併好的數據 d0 + d1 + d2 存放到數據庫,最後釋放鎖
從以上能夠看到保持數據一致的原理其實也不難,無非就是使用一個鍵值來使得多個線程對同一臺機器的數據的讀寫是同步的,可是在實現的過程當中,每每會忽視了分佈式鎖所要具有的某個條件,極端狀況下,仍是會出現數據不一致的問題。
幾個要用到的redis命令:
setnx(key, value):「set if not exits」,若該key-value不存在,則成功加入緩存而且返回1,不然返回0。
get(key):得到key對應的value值,若不存在則返回nil。
getset(key, value):先獲取key對應的value值,若不存在則返回nil,而後將舊的value更新爲新的value。
expire(key, seconds):設置key-value的有效期爲seconds秒。
4,死鎖的問題
SETNX實現分佈式鎖,可能會存在死鎖的狀況。與單機模式下的鎖相比,分佈式環境下不只須要保證進程可見,還須要考慮進程與鎖之間的網絡問題。某個線程獲取了鎖以後,斷開了與Redis 的鏈接,鎖沒有及時釋放,競爭該鎖的其餘線程都會hung,產生死鎖的狀況。
在使用 SETNX 得到鎖時,咱們將鍵 lock.id 的值設置爲鎖的有效時間,線程得到鎖後,其餘線程還會不斷的檢測鎖是否已超時,若是超時,等待的線程也將有機會得到鎖。然而,鎖超時,咱們不能簡單地使用 DEL 命令刪除鍵 lock.id 以釋放鎖。
考慮如下狀況:
A已經首先得到了鎖 lock.id,而後線A斷線。B,C都在等待競爭該鎖;
B,C讀取lock.id的值,比較當前時間和鍵 lock.id 的值來判斷是否超時,發現超時;
B執行 DEL lock.id命令,並執行 SETNX lock.id 命令,並返回1,B得到鎖;
C因爲各剛剛檢測到鎖已超時,執行 DEL lock.id命令,將B剛剛設置的鍵 lock.id 刪除,執行 SETNX lock.id命令,並返回1,即C得到鎖。
上面的步驟很明顯出現了問題,致使B,C同時獲取了鎖。在檢測到鎖超時後,線程不能直接簡單地執行 DEL 刪除鍵的操做以得到鎖。
對於上面的步驟進行改進,問題是出在刪除鍵的操做上面,那麼獲取鎖以後應該怎麼改進呢?
首先看一下redis的GETSET這個操做, GETSET key value ,將給定 key 的值設爲 value ,並返回 key 的舊值(old value)。利用這個操做指令,咱們改進一下上述的步驟。
A已經首先得到了鎖 lock.id,而後線A斷線。B,C都在等待競爭該鎖;
B,C讀取lock.id的值,比較當前時間和鍵 lock.id 的值來判斷是否超時,發現超時;
B檢測到鎖已超時,即當前的時間大於鍵 lock.id 的值,B會執行
GETSET lock.id <current Unix timestamp + lock timeout + 1> 設置時間戳,經過比較鍵 lock.id 的舊值是否小於當前時間,判斷進程是否已得到鎖;
B發現GETSET返回的值小於當前時間,則執行 DEL lock.id命令,並執行 SETNX lock.id 命令,並返回1,B得到鎖;
C執行GETSET獲得的時間大於當前時間,則繼續等待。
在線程釋放鎖,即執行 DEL lock.id 操做前,須要先判斷鎖是否已超時。若是鎖已超時,那麼鎖可能已由其餘線程得到,這時直接執行 DEL lock.id 操做會致使把其餘線程已得到的鎖釋放掉。
使用Zookeeper實現分佈式鎖的優勢:
有效的解決單點問題,不可重入問題,非阻塞問題以及鎖沒法釋放的問題。實現起來較爲簡單。
使用Zookeeper實現分佈式鎖的缺點:
性能上不如使用緩存實現分佈式鎖。 須要對ZK的原理有所瞭解。
5,redis代碼實現分佈式鎖
結合上面(分佈式鎖保持數據一致的原理)提到的使用redis分佈式鎖的三種條件,使用三種不一樣獲取redis鎖的方式,探索分佈式鎖的使用方法。
下面使用同一份測試代碼:
咱們啓用了多線程去對 redis 中的 test_key 的值進行自增操做,理想狀況,test_key 的值應該等於線程的數量,好比開了 10 個線程,test_key的值最終應該是 10。
import threading, time, redis from redis import StrictRedis def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #獲取鎖 value = redis_conn.get(key) #獲取數據 time.sleep(0.1) if value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) lock.del_lock(key) #釋放鎖 ##主程序 if __name__ == "__main__": pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8) redis = StrictRedis(connection_pool=pool) lock = RedisLock(redis) key = 'test_key' thread_count = 10 redis.delete(key) for i in range(thread_count): thread = threading.Thread(target=increase_data, args=(redis, lock, key)) thread.start()
方式一:加鎖操做非原子性
在這個版本中,當線程 A get(key) 的值爲空時,set key 的值爲 1,並返回,這表示線程 A 得到了鎖,能夠繼續執行後面的操做,不然須要一直循環去獲取鎖,直到 key 的值再次爲空,從新得到鎖,執行任務完畢後釋放鎖。
class RedisLock(object): def __init__(self, redis_conn): self.redis_conn = redis_conn def get_lock_key(self, key): lock_key = 'lock_%s' %key return lock_key def get_lock(self, key): lock_key = self.get_lock_key(key) while True: value = self.redis_conn.get(lock_key) if not value: self.redis_conn.set(lock_key, 1) return True time.sleep(0.01) def del_lock(self, key): lock_key = self.get_lock_key(key) return self.redis_conn.delete(lock_key)
執行測試腳本,獲得的結果以下:
Thread-1 1 Thread-5 2 Thread-2 2 Thread-6 3 Thread-7 3 Thread-4 3 Thread-9 4 Thread-8 5 Thread-10 5 Thread-3 5
觀察結果就發現,同時有多個線程輸出的結果是同樣的。乍一看上面加鎖的代碼邏輯彷佛沒啥問題,可是結果卻事與願違,緣由是上面的代碼 get(key) 和 set(key, value) 並非原子性的,A 線程在 get(key) 的時候發現是空值,因而從新 set(key, value),但在 set 完成的前一刻,B 線程剛好 get(key) 的時候獲得的仍是空值,而後也順利得到鎖,致使數據被兩個或多個線程同時修改,最後出現不一致。
方式二:使用 setnx 來實現
鑑於上面版本是因爲命令不是原子性操做形成兩個或多個線程同時得到鎖的問題,這個版本改爲使用 redis 的 setnx 命令來進行鎖的查詢和設置操做,setnx 即 set if not exists,顧名思義就是當key不存在的時候才設置 value,並返回 1,若是 key 已經存在,則不進行任何操做,返回 0。
def get_lock(self, key): lock_key = self.get_lock_key(key) while True: value = self.redis_conn.setnx(lock_key, 1) if value: return True time.sleep(0.01)
代碼執行結果:
('Thread-1', 0) ('Thread-9', 1) ('Thread-4', 2) ('Thread-8', 3) ('Thread-7', 4) ('Thread-10', 5) ('Thread-2', 6) ('Thread-6', 7) ('Thread-5', 8) ('Thread-3', 9)
結果是正確的,可是若是知足於此,仍是會出問題的,好比假設 A 線程得到了鎖後,因爲某種異常緣由致使線程 crash了,一直不釋放鎖呢?咱們稍微改一下測試用例的 increase 函數,模擬某個線程在釋放鎖以前由於異常退出。
def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #獲取鎖 value = redis_conn.get(key) #獲取數據 time.sleep(0.1) if not value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) if thread_name == "Thread-2": print("thread-2 crash ....") import sys sys.exit(1) lock.del_lock(key) #釋放鎖
代碼執行結果:
Thread-2 3 Thread-2 crash.. .....
線程 2 crash 以後,後續的線程一直獲取不了鎖,便一直處於等待鎖的狀態,因而乎產生了死鎖。若是數據是多線程處理的,好比每來一個數據就開一個線程去處理,那麼堆積的線程會逐漸增多,最終可能會致使系統崩潰。
使用了 redis 來實現分佈式鎖,何不利用 redis 的 ttl 機制呢,給鎖加上過時時間。
代碼修改成:
def get_lock(self, key, timeout=1): lock_key = self.get_lock_key(key) while True: value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout) if value: break else: print("waiting....") time.sleep(0.1)
執行結果:
('Thread-1', 0) ('Thread-10', 1) ('Thread-8', 2) ('Thread-3', 3) ('Thread-2', 4) thread-2 crash .... ('Thread-7', 5) waiting.... waiting.... waiting.... ('Thread-9', 6) ('Thread-6', 7) ('Thread-4', 8) ('Thread-5', 9)
結果正確,線程 2 在 crash 後,其餘線程在等待,直到鎖過時。
進行到這裏,彷佛已經能夠解決數據不一致的問題了,但在歡喜之餘,不妨多想一想會不會出現其餘問題。好比假設 A 進程的邏輯還沒處理完,可是鎖因爲過時時間到了,致使鎖自動釋放掉,這時 B 線程得到了鎖,開始處理 B 的邏輯,而後 A 進程的邏輯處理完了,就把 B 進程的鎖給刪除了。
方式三:鎖的生成和刪除必須是同一個線程
先修改代碼,設置代碼的執行時間大於ttl時間
def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #獲取鎖 value = redis_conn.get(key) #獲取數據 time.sleep(2.5) #模擬實際狀況下進行的某些耗時操做, 且執行時間大於鎖過時的時間 if value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) if thread_name == "Thread-2": print("thread-2 crash ....") import sys sys.exit(1) lock.del_lock(key) #釋放鎖 執行結果: ('Thread-1', 0) Thread-5 is waiting.. ('Thread-4', 0) ('Thread-2', 1) thread-2 crash .... ('Thread-3', 1) ('Thread-5', 1)
從以上結果能夠看出,因爲每一個線程的執行時間大於鎖的過時時間,當線程的任務還沒執行完時,鎖已經自動釋放,使得下一個線程得到了鎖,然後下一個線程的鎖被上一個執行完了的線程刪掉或者也是自動釋放(具體要看線程的執行時間和鎖的釋放時間),因而又產生了同一個數據被兩個或多個線程同時修改的問題,致使數據出現不一致。
咱們用四個線程,按照時間順序畫的流程圖以下:
能夠看到,在 2.5s 和 5s 的時刻,都產生了誤刪鎖的狀況。
既然這個現象是因爲鎖過時致使誤刪別人家的鎖引起的,那咱們就順着這個思路,強制線程只能刪除本身設置的鎖。若是是這樣,就得被每一個線程的鎖添加一個惟一標識了。看看上面的鎖機制,咱們每次添加鎖的時候,都是給 lock_key 設爲 1,不管是 key 仍是 value,都不具有惟一性,若是把 key 設爲每一個線程惟一的,那在分佈式系統中,得產生 N (等於總線程數)個 key 了 ,從直觀性和維護性上來講,這都是不可取的,因而乎只能從 value 入手了。咱們看到每一個線程均可以取到一個惟一標識,即線程 ID,若是加上進程的 PID,以及機器的 IP,就能夠構成一個線程鎖的惟一標識了,若是還擔憂不夠惟一,再打上一個時間戳了,因而乎,咱們的分佈式鎖最終版就變成了如下這樣:
class RedisLock(object): def __init__(self, redis_conn): self.redis_conn = redis_conn self.ip = socket.gethostbyname(socket.gethostname()) self.pid = os.getpid() def get_lock_key(self, key): lock_key = 'lock_%s' %key return lock_key def gen_unique_value(self): thread_name = threading.current_thread().name time_now = time.time() unique_value = "{0}-{1}-{2}-{3}".format(self.ip, self.pid, thread_name, time_now) return unique_value def get_lock(self, key, timeout=1): lock_key = self.get_lock_key(key) unique_value = self.gen_unique_value() print("unique value %s" % unique_value) while True: value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout) if value: return unique_value else: thread_name = threading.current_thread().name print("{} is waiting..".format(thread_name)) time.sleep(0.1) def del_lock(self, key, value): lock_key = self.get_lock_key(key) old_lock_value = self.redis_conn.get(lock_key) if old_lock_value == value: return self.redis_conn.delete(lock_key) 修改測試代碼: def increase_data(redis_conn, lock, key): lock_value = lock.get_lock(key) #獲取鎖 value = redis_conn.get(key) #獲取數據 time.sleep(2.5) #模擬實際狀況下進行的某些耗時操做, 且執行時間大於鎖過時的時間 if value: value = int(value) + 1 else: value = 0 redis_conn.set(key, value) thread_name = threading.current_thread().name print(thread_name, value) if thread_name == "Thread-2": print("thread-2 crash ....") import sys sys.exit(1) lock.del_lock(key, lock_value) #釋放鎖
運行結果:
unique value 192.168.1.110-45351-Thread-1-1555730398.38 unique value 192.168.1.110-45351-Thread-2-1555730398.39 unique value 192.168.1.110-45351-Thread-3-1555730398.4 unique value 192.168.1.110-45351-Thread-4-1555730398.42 unique value 192.168.1.110-45351-Thread-5-1555730398.43 ('Thread-1', 0) Thread-3 is waiting.. Thread-2 is waiting.. Thread-3 is waiting.. Thread-2 is waiting.. ('Thread-4', 0) Thread-3 is waiting.. ('Thread-5', 0) ('Thread-3', 1) ('Thread-2', 1) thread-2 crash ....
以上能夠看出,問題沒有獲得解決。由於什麼緣由呢?以上咱們設置值的惟一性只能確保線程不會誤刪其餘線程產生的鎖,進而出現連串的誤刪鎖的狀況,好比 A 刪了 B 的鎖,B 執行完刪了 C 的鎖 。使用 redis 的過時機制,只要業務的處理時間大於鎖的過時時間,就沒有一個很好的方式來避免因爲鎖過時致使其餘線程同時佔有鎖的問題,因此須要熟悉業務的執行時間,來合理地設置鎖的過時時間。
還需注意的一點是,以上的實現方式中,刪除鎖(del_lock)的操做不是原子性的,先是拿到鎖,再判斷鎖的值是否相等,相等的話最後再刪除鎖,既然不是原子性的,就有可能存在這樣一種極端狀況:在判斷的那一時刻,鎖正好過時了,被其餘線程佔有了鎖,那最後一步的刪除,就可能會形成誤刪鎖了。可使用官方推薦的 Lua 腳原本確保原子性:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
可是隻要鎖的過時時間設置的足夠合理,這個問題實際上是能夠忽略的,也能夠說出現這種極端狀況的機率是及其小的。
以上咱們使用 redis 來實現一個分佈式的同步鎖,來保證數據的一致性,其特色是:
知足互斥性,同一個時刻只能有一個線程能夠獲取鎖
利用 redis 的 ttl 來確保不會出現死鎖,但同時也會帶來因爲鎖過時引起的多線程同時佔有鎖的問題,須要咱們合理設置鎖的過時時間來避免
利用鎖的惟一性來確保不會出現誤刪鎖的狀況
以上的方案中,咱們是假設 redis 服務端是單集羣且高可用的,忽視瞭如下的問題:若是某一時刻 redis master 節點發生了故障,集羣中的某個 slave 節點變成 master 節點,這時候就可能出現原 master 節點上的鎖沒有及時同步到 slave 節點,致使其餘線程同時得到鎖。對於這個問題,能夠參考 redis 官方推出的 redlock 算法,可是比較遺憾的是,該算法也沒有很好地解決鎖過時的問題。
————————————————版權聲明:本文爲CSDN博主「達西布魯斯」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連接及本聲明。原文連接:https://blog.csdn.net/biheyu828/article/details/89005866