最近學習spark,我主要使用pyspark api進行編程。python
以前使用Python都是現學現用,用完就忘了也沒有理解和記憶,所以這裏把Python相關的知識也彌補和記錄下來吧編程
多線程任務隊列在實際項目中很是有用,關鍵的地方要實現隊列的多線程同步問題,也即保證隊列的多線程安全api
例如:能夠開多個消費者線程,每一個線程上綁定一個隊列,這樣就實現了多個消費者同時處理不一樣隊列上的任務安全
同時能夠有多個生產者往隊列發送消息,實現異步消息處理多線程
先複習下互斥量和條件變量的概念:異步
互斥量(mutex)從本質上說是一把鎖,在訪問共享資源前對互斥量進行加鎖,在訪問完成後釋放互斥量上的鎖。對互斥量進行加鎖之後,任何其餘試圖再次對互斥鎖加鎖的線程將會阻塞直到當前線程釋放該互斥鎖。若是釋放互斥鎖時有多個線程阻塞,全部在該互斥鎖上的阻塞線程都會變成可運行狀態,第一個變爲運行狀態的線程能夠對互斥鎖加鎖,其餘線程將會看到互斥鎖依然被鎖住,只能回去再次等待它從新變爲可用。函數
條件變量(cond)是在多線程程序中用來實現」等待–》喚醒」邏輯經常使用的方法。條件變量利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動做:一個線程等待」條件變量的條件成立」而掛起;另外一個線程使「條件成立」。爲了防止競爭,條件變量的使用老是和一個互斥鎖結合在一塊兒。線程在改變條件狀態前必須首先鎖住互斥量,函數pthread_cond_wait把本身放到等待條件的線程列表上,而後對互斥鎖解鎖(這兩個操做是原子操做)。在函數返回時,互斥量再次被鎖住學習
條件變量老是與互斥鎖一塊兒使用的測試
Python的threading中定義了兩種鎖:threading.Lock和threading.RLockspa
二者的不一樣在於後者是可重入鎖,也就是說在一個線程內重複LOCK同一個鎖不會發生死鎖,這與POSIX中的PTHREAD_MUTEX_RECURSIVE也就是可遞歸鎖的概念是相同的, 互斥鎖的API有三個函數,分別執行分配鎖,上鎖,解鎖操做。
python的threading中的條件變量默認綁定了一個RLock,也能夠在初始化條件變量的時候傳進去一個本身定義的鎖.
最後貼出我本身實現的簡單線程安全任務隊列
測試代碼