假若有兩個程序A和B,程序A在執行到一半的過程當中,須要讀取大量的數據輸入(I/O操做),而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。 是否是在程序A讀取數據的過程當中,讓程序B去執行,當程序A讀取完數據以後,讓 程序B暫停,而後讓程序A繼續執行? 固然沒問題,但這裏有一個關鍵詞:切換。html
既然是切換,那麼這就涉及到了狀態的保存,狀態的恢復,加上程序A與程序B所須要的系統資 源(內存,硬盤,鍵盤等等)是不同的。天然而然的就須要有一個東西去記錄程序A和程序B 分別須要什麼資源,怎樣去識別程序A和程序B等等,因此就有了一個叫進程的抽象 python
進程定義: 進程就是一個程序在一個數據集上的一次動態執行過程。 數據庫
進程通常由 程序、數據集、進程控制塊 三部分組成。 api
線程的出現是爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹同樣事的缺陷,使到進程內併發成爲可能。
假設,一個文本程序,須要接受鍵盤輸入,將內容顯示在屏幕上,還須要保存信息到硬盤中。若只有一個進程,勢必形成同一時間只能幹同樣事的尷尬(當保存時,就不能經過鍵盤輸入內容)。
如有多個進程,每一個進程負責一個任務,進程A負責接收鍵盤輸入的任務,進程B負責將內容顯示在屏幕上的任務,進程C負責保存內容到硬盤中的任務。這裏進程A,B,C間的協做涉及到了進程通訊問題,並且有共同都須要擁有的東西-------文本內容,不停的切換形成性能上的損失。
如有一種機制,可使任務A,B,C共享資源,這樣上下文切換所須要保存和恢復的內容就少了,同時又能夠減小通訊所帶來的性能損耗,那就行了。是的,這種機制就是線程。緩存
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能。線程沒有本身的系統資源。
- Threads share the address space of the process that created it; processes have their own address space.
- Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
- Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
- New threads are easily created; new processes require duplication of the parent process.
- Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
- Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
- 一個程序至少有一個進程,一個進程至少有一個線程.(進程能夠理解成線程的容器)
- 進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率。
- 線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
- 進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位.
- 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.
- 線程本身基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)可是它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源.
- 一個線程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行.
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)安全
上面的核心意思就是,不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行。數據結構
首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。
然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。多線程
這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
threading 模塊創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。併發
直接調用:app
import threading import time def sayhi(num): #定義每一個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(1,),name="name_t1") #生成一個線程實例 t2 = threading.Thread(target=sayhi,args=(2,),name="name_t2") #生成另外一個線程實例 t1.start() #啓動線程 t2.start() #啓動另外一個線程 print(t1.getName()) #獲取線程名 等同於 print(t1.name) print(t2.getName()) print(threading.current_thread().name) >>> running on number:1 running on number:2 name_t1 name_t2 MainThread
因爲任何進程默認就會啓動一個線程,咱們把該線程稱爲主線程,主線程又能夠啓動新的線程,Python的 threading 模塊有個 current_thread()
函數,它永遠返回當前線程的實例。主線程實例的名字叫 MainThread ,子線程的名字在建立時指定,咱們用LoopThread
命名子線程。名字僅僅在打印時用來顯示,徹底沒有其餘意義,若是不起名字Python就自動給線程命名爲Thread-1
,Thread-2
……
args裏面是參數
繼承式調用:
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) # 等同於 super().__init__() self.num = num def run(self):#定義每一個線程要運行的函數 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() print("ending......")
Join & Daemon
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import time import threading def run(n): print('[%s]------running----\n' % n) time.sleep(2) print('--done--') def main(): for i in range(5): t = threading.Thread(target=run,args=[i,]) t.start() t.join(1) print('starting thread', t.getName()) m = threading.Thread(target=main,args=[]) # m.setDaemon(True) #將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務 m.start() m.join(timeout=2) print("---main thread done----") >>> [0]------running---- starting thread Thread-2 [1]------running---- ---main thread done---- --done-- starting thread Thread-3 [2]------running---- --done-- starting thread Thread-4 [3]------running---- --done-- starting thread Thread-5 [4]------running---- --done-- starting thread Thread-6 --done-- ***Repl Closed*** # 加了守護程序以後 >>> [0]------running---- starting thread Thread-2 [1]------running---- ---main thread done---- --done-- ***Repl Closed***
join():
在子線程完成運行以前,這個子線程的父線程將一直被阻塞。
setDaemon(True):
將線程聲明爲守護線程,必須在start() 方法調用以前設置, 若是不設置爲守護線程程序會被無限掛起。這個方法基本和join是相反的。
當咱們 在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成
想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是 只要主線程
完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法啦。
# run(): 線程被cpu調度後自動執行線程對象的run方法 # start():啓動線程活動。 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?
import time import threading def addNum(): global num #在每一個線程中都獲取這個全局變量 #num-=1 temp=num #print('--get num:',num ) time.sleep(0.1) num =temp-1 #對此公共變量進行-1操做 num = 100 #設定一個共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部線程執行完畢 t.join() print('final num:', num )
觀察:time.sleep(0.1) /0.001/0.0000001 結果分別是多少?
假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。
多個線程都在同時操做同一個共享資源,因此形成了資源破壞,怎麼辦呢?(join會形成串行,失去所線程的意義)
咱們能夠經過 同步鎖 來解決這種問題。
鎖的概念是這樣的,當一個線程lock.acquire()後,能夠切換到另外一個線程,可是當另外一個線程執行到lock.acquire()後,發現已經鎖住了,因此要等到
另外一個進程解鎖之後,才能進行加鎖。因此lock = threaing.Lock() 是個全局鎖。這樣lock能夠在多個線程中判斷
import time import threading def addNum(): global num #在每一個線程中都獲取這個全局變量 print('--get num:',num ) time.sleep(1) lock.acquire() #修改數據前加鎖 num -=1 #對此公共變量進行-1操做 lock.release() #修改後釋放 num = 100 #設定一個共享變量 thread_list = [] lock = threading.Lock() #生成全局鎖 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部線程執行完畢 t.join() print('final num:', num )
Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體經過下圖來看一下。
那你又問了, 既然用戶程序已經本身有鎖了,那爲何C python還須要GIL呢?加入GIL主要的緣由是爲了下降程序的開發的複雜度,好比如今的你寫python不須要關心內存回收的問題,由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。
在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都正在使用,全部這兩個線程在無外力做用下將一直等待下去。下面是一個死鎖的例子:
import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待線程結束,後面再講。
解決辦法:使用遞歸鎖,將
lockA=threading.Lock() lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()
爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
import time import threading class Account: def __init__(self, _id, balance): self.id = _id self.balance = balance self.lock = threading.RLock() def withdraw(self, amount): with self.lock: self.balance -= amount def deposit(self, amount): with self.lock: self.balance += amount def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景 with self.lock: interest=0.05 count=amount+amount*interest self.withdraw(count) def transfer(_from, to, amount): #鎖不能夠加在這裏 由於其餘的其它線程執行的其它方法在不加鎖的狀況下數據一樣是不安全的 _from.withdraw(amount) to.deposit(amount) alex = Account('alex',1000) yuan = Account('yuan',1000) t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) t1.start() t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) t2.start() t1.join() t2.join() print('>>>',alex.balance) print('>>>',yuan.balance)
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法wait、clear、set.
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False
set:將「Flag」設置爲True
用 threading.Event 實現線程間通訊。
import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚你們都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>能夠下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()
若是一個線程須要不停重複的使用event對象,最好使用condition對象實現一個週期定時器,每當定時器超時的時候,其餘線程均可以檢測到:
當小夥伴a在往火鍋裏面添加魚丸,這個就是生產者行爲;另一個小夥伴b在吃掉魚丸就是消費者行爲。當火鍋裏面魚丸達到必定數量加滿後b才能吃,這就是一種條件判斷了。
Condition(條件變量)一般與一個鎖關聯。須要在多個Contidion中共享一個鎖時,能夠傳遞一個Lock/RLock實例給構造方法,不然它將本身生成一個RLock實例。
能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。
Condition():
實現場景:當a同窗王火鍋裏面添加魚丸加滿後(最多5個,加滿後通知b去吃掉),通知b同窗去吃掉魚丸(吃到0的時候通知a同窗繼續添加)
# coding=utf-8 import threading import time con = threading.Condition() num = 0 # 生產者 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): # 鎖定線程 global num con.acquire() while True: print("開始添加!!!") num += 1 print("火鍋裏面魚丸個數:%s" % str(num)) time.sleep(1) if num >= 5: print("火鍋裏面裏面魚丸數量已經到達5個,沒法添加了!") # 喚醒等待的線程 con.notify() # 喚醒小夥伴開吃啦 # 等待通知 con.wait() #當while退出則開鎖。此程序中運行不到這句話 con.release() # 消費者 class Consumers(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): con.acquire() global num while True: print("開始吃啦!!!") num -= 1 print("火鍋裏面剩餘魚丸數量:%s" %str(num)) time.sleep(2) if num <= 0: print("鍋底沒貨了,趕忙加魚丸吧!") con.notify() # 喚醒其它線程 # 等待通知 con.wait() # 當while退出則開鎖。此程序中運行不到這句話 con.release() p = Producer() c = Consumers() p.start() c.start()
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) # getname() 線程的名稱 time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
定時器,指定n秒後執行某操做
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
import threading,time li=[1,2,3,4,5] def pri(): while li: a=li[-1] print(a) time.sleep(1) try: li.remove(a) except Exception as e: print('----',a,e) t1=threading.Thread(target=pri,args=()) t1.start() t2=threading.Thread(target=pri,args=()) t2.start()
多線程進行操做時,li[-1]有可能都表明5,而不是遞減的。
思考:如何經過對列來完成上述功能?
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
建立一個「隊列」對象 import queue q = queue.Queue(maxsize = 10) queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 將一個值放入隊列中 q.put(10) 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 將一個值從隊列中取出 q.get() 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True, get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 Python queue模塊有三種隊列及構造函數: 一、Python queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 二、LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize) 三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 此包中的經常使用方法(q = queue.Queue()): q.qsize() 返回隊列的大小 q.empty() 若是隊列爲空,返回True,反之False q.full() 若是隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 至關q.get(False) 非阻塞 q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 至關q.put(item, False) q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列爲空,再執行別的操做
other mode:
import queue #先進後出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #優先級 # q=queue.PriorityQueue() # q.put([5,100]) # q.put([7,200]) # q.put([3,"hello"]) # q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data)
生產者消費者模型:
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() #q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() #q.task_done() #q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',)) p1.start() c1.start() # c2.start() # c3.start()
import threading,queue class Mythread1(threading.Thread): def __init__(self): super().__init__() def run(self): n = 100 while n>0: n -= 1 print("put進n") q.put(n) class Mythread2(threading.Thread): def __init__(self): super().__init__() def run(self): n = 100 while n>0: n -= 1 print("get出n") q.get(n) if __name__ == '__main__': q = queue.Queue() t1 = Mythread1() t2 = Mythread2() t1.start() t2.start() >>>代碼會出現以下結果 put進n put進n put進n put進nget出n get出n get出nput進n get出n put進n put進n put進n put進n put進n put進n get出nput進n get出n get出n get出n put進nget出n get出n get出n
暫時不知道爲何。(估計是print的時候自動換行出問題)可是有一個解決方法。加鎖。
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "Q1mi" # Date: 2018/7/12 import threading, queue class Mythread1(threading.Thread): def __init__(self): super().__init__() def run(self): n = 1000 while n > 0: n -= 1 lock.acquire() print("put進n", end="\n") lock.release() q.put(n) class Mythread2(threading.Thread): def __init__(self): super().__init__() def run(self): n = 1000 while n > 0: n -= 1 lock.acquire() print("get出n", end="\n") lock.release() q.get(n) if __name__ == '__main__': lock = threading.Lock() q = queue.Queue() t1 = Mythread1() t2 = Mythread2() t1.start() t2.start()
多進程模塊 multiprocessing
M
is a package that supports spawning processes using an API similar to the threading module. The ultiprocessing
package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.multiprocessing
因爲GIL的存在,python中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。
multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
from multiprocessing import Process import time def f(name): time.sleep(1) print('hello', name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin',)) p_list.append(p) p.start() for i in p_list: p.join() print('end')
from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print ('hello', self.name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
To show the individual process IDs involved, here is an expanded example:
from multiprocessing import Process import os import time def info(title): print("title:",title) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main process line') time.sleep(1) print("------------------") p = Process(target=info, args=('yuan',)) p.start() p.join()
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前尚未實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
實例方法:
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程準備就緒,等待CPU調度
run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():無論任務是否完成,當即中止工做進程
屬性
daemon:和線程的setDeamon功能同樣
name:進程名字。
pid:進程號。
import time from multiprocessing import Process def foo(i): time.sleep(1) print (p.is_alive(),i,p.pid) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(10): p = Process(target=foo, args=(i,)) #p.daemon=True p_list.append(p) for p in p_list: p.start() # for p in p_list: # p.join() print('main process end')
from multiprocessing import Process, Queue import queue def f(q,n): #q.put([123, 456, 'hello']) q.put(n*n+1) print("son process",id(q)) if __name__ == '__main__': q = Queue() #try: q=queue.Queue() print("main process",id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p.start() print(q.get()) print(q.get()) print(q.get())
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name":"yuan"}, 'hello']) response=conn.recv() print("response",response) conn.close() print("q_ID2:",id(child_conn)) if __name__ == '__main__': parent_conn, child_conn = Pipe() print("q_ID1:",id(child_conn)) p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" parent_conn.send("兒子你好!") p.join()
The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing # 管道消費者. def consumer(lock,pipe): output_p, input_p = pipe input_p.close() # 關閉管道輸入口 while True: lock.acquire() item = output_p.recv() lock.release() if item == None: break # 處理部分 lock.acquire() print(item) lock.release() # 管道生產者 def producer(sequence, input_p): for item in sequence: # Put the item on the queue input_p.send(item) if __name__ == '__main__': # 進程數、建立管道,鎖等 p_num = 2 process = [] (output_p, input_p) = multiprocessing.Pipe() lock = multiprocessing.Lock() # 定義消費進程 for i in range(p_num): t =multiprocessing.Process(target=consumer,args=(lock,(output_p, input_p),)) t.daemon=True process.append(t) # 啓動消費進程 for i in range(p_num): process[i].start() # 關閉輸出管道,以往管道填充數據 output_p.close() sequence = range(100) + [None]*p_num producer(sequence, input_p) # 數據填充完畢,打開輸入管道 input_p.close() # 等待結束 for i in range(p_num): process[i].join()
Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example:
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
python中,進程池內部會維護一個進程序列。當須要時,程序會去進程池中獲取一個進程。若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。
同步是指一個進程在執行某個請求的時候,必需要到收到對方返回的信息才繼續執行下去。
異步是指進程在執行某個請求時,無論其餘的進程的狀態,這個進程就執行後續操做;
當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。例如:打電話就是同步通訊,發信息就是異步通訊。
代碼以下:
from multiprocessing import Pool
import time
def func(args):
time.sleep(1) #程序休眠1s
print("%s------>%s"%(args,time.ctime())) #打印參數及時間
if __name__=="__main__":
p1=Pool(2) #設定開啓2個進程池
for i in range(10):
p1.apply_async(func=func,args=(i,)) #設定異步執行任務
p1.close() #關閉進程池
time.sleep(2) #程序休眠2s
p1.terminate() #關閉進程池
p1.join() #阻塞進程池
print("ending") #打印結束語句
>>>
0------>Thu Jul 20 20:18:43 2017
1------>Thu Jul 20 20:18:43 2017
ending
能夠看到,在程序執行過程當中,關閉進程池,則程序會當即中止,不會再繼續執行後續語句。
代碼以下:
from multiprocessing import Pool import time def func(args): time.sleep(1) #休眠1s print("%s------>%s"%(args,time.ctime())) #打印傳遞的參數及時間 if __name__=="__main__": p1=Pool(2) #定義2個進程池 for i in range(10): #定義循環10次 p1.apply_async(func=func,args=(i,)) #異步執行任務 p1.close() #等待全部的任務都完成才關閉進程池 p1.join() print("ending")
執行結果以下:
0------>Thu Jul 20 20:19:12 2017
1------>Thu Jul 20 20:19:12 2017
2------>Thu Jul 20 20:19:13 2017
3------>Thu Jul 20 20:19:13 2017
4------>Thu Jul 20 20:19:14 2017
5------>Thu Jul 20 20:19:14 2017
6------>Thu Jul 20 20:19:15 2017
7------>Thu Jul 20 20:19:15 2017
8------>Thu Jul 20 20:19:16 2017
9------>Thu Jul 20 20:19:16 2017
ending
參考文檔:
1. https://www.cnblogs.com/huanxiyun/articles/5826902.html
2. http://www.javashuo.com/article/p-xkqrdjgb-c.html
上下文管理器的任務是:代碼塊執行前準備,代碼塊執行後收拾
如何打開一個文件,並寫入"hello world"
filename="my.txt" mode="w" f=open(filename,mode) f.write("hello world") f.close()
當發生異常時(如磁盤寫滿),就沒有機會執行第5行。固然,咱們能夠採用try-finally語句塊進行包裝:
writer=open(filename,mode) try: writer.write("hello world") finally: writer.close()
當咱們進行復雜的操做時,try-finally語句就會變得醜陋,採用with語句重寫:
with open(filename,mode) as writer: writer.write("hello world")
as指代了從open()函數返回的內容,並把它賦給了新值。with完成了try-finally的任務。
with語句的做用相似於try-finally,提供一種上下文機制。要應用with語句的類,其內部必須提供兩個內置函數__enter__和__exit__。前者在主體代碼執行前執行,後者在主體代碼執行後執行。as後面的變量,是在__enter__函數中返回的。
class echo(): def output(self): print "hello world" def __enter__(self): print "enter" return self #能夠返回任何但願返回的東西 def __exit__(self,exception_type,value,trackback): print "exit" if exception_type==ValueError: return True else: return Flase >>>with echo as e: e.output() 輸出: enter hello world exit
完備的__exit__函數以下:
def __exit__(self,exc_type,exc_value,exc_tb)
其中,exc_type:異常類型;exc_value:異常值;exc_tb:異常追蹤信息
當__exit__返回True時,異常不傳播
contextlib模塊的做用是提供更易用的上下文管理器,它是經過Generator實現的。contextlib中的contextmanager做爲裝飾器來提供一種針對函數級別的上下文管理機制,經常使用框架以下:
from contextlib import contextmanager @contextmanager def make_context(): print 'enter' try: yield "ok" except RuntimeError,err: print 'error',err finally: print 'exit' >>>with make_context() as value: print value 輸出爲: enter ok exit
其中,yield寫入try-finally中是爲了保證異常安全(能處理異常)as後的變量的值是由yield返回。yield前面的語句可看做代碼塊執行前操做,yield以後的操做能夠看做在__exit__函數中的操做。
以線程鎖爲例:
@contextlib.contextmanager def loudLock(): print 'Locking' lock.acquire() yield print 'Releasing' lock.release() with loudLock(): print 'Lock is locked: %s' % lock.locked() print 'Doing something that needs locking' #Output: #Locking #Lock is locked: True #Doing something that needs locking #Releasing
對於:
with open(filename,mode) as reader:
with open(filename1,mode1) as writer:
writer.write(reader.read())
能夠經過contextlib.nested進行簡化:
with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer):
writer.write(reader.read())
在python 2.7及之後,被一種新的語法取代:
with open(filename,mode) as reader,open(filename1,mode1) as writer:
writer.write(reader.read())
file類直接支持上下文管理器API,但有些表示打開句柄的對象並不支持,如urllib.urlopen()返回的對象。還有些遺留類,使用close()方法而不支持上下文管理器API。爲了確保關閉句柄,須要使用closing()爲它建立一個上下文管理器(調用類的close方法)。
import contextlib class myclass(): def __init__(self): print '__init__' def close(self): print 'close()' with contextlib.closing(myclass()): print 'ok'
>>>
__init__
ok
close()
協程,又稱微線程,纖程。英文名Coroutine。
優勢1: 協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。
優勢2: 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。
import time import queue def consumer(name): print("--->ready to eat baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while 1: time.sleep(1) print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) con.send(n) con2.send(n+1) n +=2 if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
import gevent import requests,time start=time.time() def f(url): print('GET: %s' % url) resp =requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://www.baidu.com/'), gevent.spawn(f, 'https://www.sina.com.cn/'), ]) # f('https://www.python.org/') # # f('https://www.yahoo.com/') # # f('https://baidu.com/') # # f('https://www.sina.com.cn/') print("cost time:",time.time()-start)
更多內容參考: