目錄python
線程是操做系統可以進行運算調度的最小單位,它被包含在進程之中,是進程中的實際運做單位,一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程中併發執行不一樣的任務。git
官方解釋:github
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.編程
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.windows
If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.api
Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.安全
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.服務器
Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.多線程
Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).併發
一個進程就是一個應用程序,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎,在早期面向進程設計的計算機結構中, 進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器,進程是線程的容器,程序是指令、數據以及其組織形式的描述,一個進程包含多個線程。
官方解釋:
An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
在python中,一次只能有一個線程在執行,若是想要利用多核多處理器資源,儘可能使用多進程去處理。
官方解釋:
Cpython實現細節:在Cpython中,因爲Global Interpreter Lock,只有一個線程能夠同時執行python代碼(某些面向性能的庫可能會避免這個限制)若是你但願應用程序更好地利用多核機器的計算資源,建議你使用多處理,可是,若是要同時運行多個I/O綁定任務,則線程仍然是一個合適的模型。
總結:因此若是底層python解釋器的話,咱們python的多線程只適合IO密集型任務,不適合CPU計算密集型任務。
模塊以及相關方法
threading模塊:
threading.Thread(target=?,args=(arg1,arg2...)):
建立threading線程對象,target參數將要運行的函數的變量名填寫到這裏,args參數是要運行函數須要的參數。
start():讓線程開始執行任務
join():默認線程開起來以後就和主線程沒有關係了,主線程結束和join不要緊,調用join方法,會讓主線程再調用join方法的地方等待,知道子線程完成操做,主線程才繼續往下執行代碼。
setDaemon(True):默認爲False,默認主線程與其餘線程沒有關係,可是有時候,咱們想要主線程關閉的時候,把其餘線程也關閉了(不關心其餘線程是否是執行完任務了),咱們將其餘線程設置爲主線程的守護進程。注意:該方法須要在線程開始以前執行threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法:
run(): 用以表示線程活動的方法。
start():啓動線程活動。
join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。
如何使用threading模塊建立多線程
直接調用
# author:Dman # date:2019/3/26 import threading import time def foo(n): print('foo___start___%s' % n) time.sleep(1) print('end foo____') def bar(): print('bar___start') time.sleep(2) #time.sleep的時候並不會佔用cpu print('end bar____') t1 = threading.Thread(target=foo,args=(2,)) t2 = threading.Thread(target=bar) t1.start() t2.start() print('_________main______')
使用繼承的方式調用
# author:Dman # date:2019/3/27 """ 使用自定義類的方式去建立thread 步驟: 一、繼承threading.Thread類 二、必須重寫父方法,run()方法。 三、能夠重寫父類方法 """ import threading import time class Mythred(threading.Thread): def __init__(self,num): # super(Mythred,self).__init__() threading.Thread.__init__(self) self.num = num def run(self): print('threading %s is running at %s ' % (threading.current_thread(), time.ctime())) time.sleep(3) if __name__ == '__main__': t1 = Mythred(1) t2 = Mythred(2) t1.start() t2.start()
join方法的疑問
# author:Dman # date:2019/3/27 """ threading 中方法,join方法的理解與使用。 join方法 """ import threading from time import ctime,sleep import time def music(func): for i in range(2): print ("Begin listening to %s. %s" %(func,ctime())) sleep(4) print("end listening %s"%ctime()) def move(func): for i in range(2): print ("Begin watching at the %s! %s" %(func,ctime())) sleep(5) print('end watching %s'%ctime()) if __name__ == '__main__': threads = [] t1 = threading.Thread(target=music, args=('七里香',)) threads.append(t1) t2 = threading.Thread(target=move, args=('阿甘正傳',)) threads.append(t2) for t in threads: # t.setDaemon(True) t.start() # t.join() #位置1 # t1.join() #位置2 # t2.join()######## #位置3 print ("all over %s" %ctime())
總結:
一、位置1:會阻塞全部的子線程,父進程會在全部程序執行完成以後就執行
二、位置2:只會阻塞線程 t1,在 t1子線程執行完畢以後,主線程就會繼續執行print函數。
三、位置3:只會阻塞線程 t2,在 t2子線程執行完畢以後,主線程就會繼續執行print函數。
多線程數據共享和同步
若是多個線程共同對某個數據進行修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要對多個線程進行同步,使用thread類的Lock和RLock能夠是實現簡單的線程同步。
同步鎖:又稱互斥鎖 ------threading.Lock
做用:解決多個線程訪問共享變量的時候出現的數據的問題。
# author:Dman # date:2019/3/27 """ 沒有同步鎖案例 """ """ 下面程序執行順序: 一、咱們打開了100個線程去執行addnum函數,其中addnum是對一個全局變量num進行-1的操做,咱們的理想的狀態時num左後等於0 實際運行結果是: 100個線程開始搶GIL,搶到的將被CPU執行 一、執行global num 二、temp = num 賦值操做 三、執行time.sleep(0.1) ---(解釋:這個語句至關於發生IO阻塞,掛起,GIL釋放,下一步num=temp-1還未被執行,所以全局變量num的值仍爲 100) 四、其餘99個線程開始搶GIL鎖,重複上面的步驟 五、其餘98個線程開始搶GIL鎖,重複上面的步驟 ... (備註:若是阻塞的事件夠長,因爲cpu的執行速度很快,也就是切換的快,在發生阻塞的0.1秒鐘,若是100個 線程都切換一遍那麼,每一個線程就都拿到num=100這個變量,後面再執行-1操做,那麼當全部線程結束,獲得的結果都是99.) """ import time import threading def addNum(): global num #在每一個線程中都獲取這個全局變量 # num-=1 #這個操做速度很快 temp=num # print('--get num:',num) # 每一個線程執行到這一步就只能拿到變量num = 100, time.sleep(0.1) num =temp-1 #對此公共變量進行-1操做 if __name__ == '__main__': num = 100 #設定一個共享變量 thread_list = [] for i in range(5): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部線程執行完畢 t.join() print('final num:', num ) #------運行結果------- final num: 99
# author:Dman # date:2019/3/28 """ 線程鎖使用實例 一、使用threading.Lock()方法獲得鎖對象 二、在方法中調用acquire()和release()方法去包圍咱們的代碼, 那麼同一時刻就只有一個線程能夠訪問acquire和release包圍的代碼塊。 """ import time import threading def addNum(): global num # 在每一個線程中都獲取這個全局變量 # num-=1 #這個操做速度很快 lock.acquire() # 獲取鎖 temp = num # print('--get num:',num) # 每一個線程執行到這一步就只能拿到變量num = 100, time.sleep(0.1) num = temp - 1 # 對此公共變量進行-1操做 lock.release() # 釋放鎖 if __name__ == '__main__': lock = threading.Lock() num = 100 # 設定一個共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: # 等待全部線程執行完畢 t.join() print('final num:', num)
思考:同步鎖和GIL的區別
總結:
一、線程鎖又稱互斥鎖、同步鎖,爲了防止多個代碼同時訪問共享變量的時候,出現問題。
二、在threading模塊中,經過Lock類來建立鎖對象,經過aquire方法和release方法去包圍須要保護的代碼
線程死鎖和遞歸鎖
死鎖
死鎖現象,見代碼以下:
# author:Dman # date:2019/3/30 """ 線程死鎖: 在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都正在使用,全部這兩個線程在無外力做用下將一直等待下去。 """ import threading import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() # 要求獲取LockB print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() # 要求獲取LockA print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待線程結束,後面再講。
遞歸鎖-----threading.RLock
做用:爲了解決鎖嵌套的問題,解決死鎖問題。
# author:Dman # date:2019/3/30 """ 遞歸鎖(RLock)也叫可重入鎖:解決死鎖問題,看 線程鎖_test3.py 特色:能夠屢次acquire。 內部使用計數器來維護。acquire的時候計數器加1,release的時候計數器減1 結果:鎖的是內部代碼塊,同一時刻保證只有一個線程執行該代碼塊。 使用場景:當咱們修改多個變量是有關聯的,咱們只能對本身的方法去鎖定,可是不能保證別人的方法是鎖定的,因此當咱們內部鎖定了以後,其餘函數也可能鎖定,這樣就出現了多把鎖的狀況。 """ import threading import threading,time class myThread(threading.Thread): def doA(self): # lockA.acquire() lock.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) # lockB.acquire() # 要求獲取LockB lock.acquire() print(self.name,"gotlockB",time.ctime()) # lockB.release() # lockA.release() lock.release() lock.release() def doB(self): lock.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lock.acquire() # 要求獲取LockA print(self.name,"gotlockA",time.ctime()) lock.release() lock.release() def run(self): self.doA() self.doB() if __name__=="__main__": # lockA=threading.Lock() # lockB=threading.Lock() lock = threading.RLock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待線程結束,後面再講。
案例使用案例----銀行取錢:
# author:Dman # date:2019/3/30 """ 遞歸鎖場景---案例 """ import threading class Account: def __init__(self,name,money): self.name = name self.balance = money self.lock = threading.RLock() def withdraw(self,amount): with self.lock: self.balance -= amount def deposit(self,amount): with self.lock: # with上下文管理,幫咱們acquire 和release self.balance += amount def transfer(from_user, to_user,amount): # 鎖不能夠加在這裏,由於其餘的線程執行其餘方法在不加鎖的狀況下數據一樣是不安全的 from_user.withdraw(amount) to_user.deposit(amount) if __name__ == '__main__': alex = Account('alex',100) dman = Account('xiaohu',20000) t1 = threading.Thread(target=transfer, args=(alex, dman, 100)) t1.start() t2 = threading.Thread(target=transfer, args=(dman, dman, 200)) t2.start() t1.join() t2.join() print('>>>', alex.balance) print('>>>', dman.balance)
總結:
一、建立遞歸鎖的方法:使用threading.RLock類去建立遞歸鎖對象。同互斥鎖同樣,使用aquire和release方法去包圍代碼塊
二、遞歸鎖是爲了解決鎖嵌套的時候的問題。
條件變量同步---threading.Condition
做用:爲了實現多個線程之間的交互,它自己也提供了RLock或Lock的方法,還提供了wait()、notify()、notifyAll()方法
wait():條件不知足時調用,線程會釋放鎖並進入等待阻塞;
notify():條件創造後調用,通知等待池激活一個線程;
notifyAll():條件創造後調用,通知等待池激活全部線程。
# author:Dman # date:2019/3/30 """ 條件變量------實現線程的限制 應用場景:有一類線程須要知足條件以後才能繼續執行。,爲了在知足必定條件後,喚醒某個線程,防止該線程一直不被執行 """ import threading,time from random import randint class Producer(threading.Thread): def run(self): global L while True: val=randint(0,100) print('生產者',self.name,":Append"+str(val),L) if lock_con.acquire(): L.append(val) lock_con.notify() # lock_con.release() time.sleep(3) class Consumer(threading.Thread): def run(self): global L while True: lock_con.acquire() # print('ok1') if len(L)==0: lock_con.wait() print('消費者',self.name,":Delete"+str(L[0]),L) del L[0] lock_con.release() time.sleep(0.25) if __name__=="__main__": L=[] lock_con=threading.Condition()#獲取一個Condition對象 threads=[] for i in range(5): threads.append(Producer()) threads.append(Consumer()) for t in threads: t.start() for t in threads: t.join()
總結:
一、使用threading.Condition()獲取一個Condition對象,裏面默認使用RLock,也能夠本身手動傳參數。
同步條件---threading.Event
做用:Event和Condition差很少,只是少了鎖的功能,所以Event用於不訪問共享變量的條件環境
event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。
# author:Dman # date:2019/3/30 """ event沒有鎖功能,可是實現了線程之間的交互。內部有標誌位 實現了函數: isSet():返回event 的狀態值 wait():若是event的狀態值位False將阻塞線程 set(): 設置event的狀態值位True clear():設置event的狀態值爲False 交叉執行。 """ import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚你們都要加班到22:00。") event.isSet() or event.set() time.sleep(5) print("BOSS:<22:00>能夠下班了。") event.isSet() or event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(0.25) event.clear() event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() #獲取event對象 threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() #---------運行結果--------------- BOSS:今晚你們都要加班到22:00。 Worker:哎……命苦啊! Worker:哎……命苦啊!Worker:哎……命苦啊! Worker:哎……命苦啊! Worker:哎……命苦啊! BOSS:<22:00>能夠下班了。 Worker:OhYeah! Worker:OhYeah! Worker:OhYeah!Worker:OhYeah! Worker:OhYeah!
信號量
做用:用來控制線程併發數的,使用BoundedSemaphore或Semaphore類來管理一個內置的計數器,每當調用acquire方法時-1,調用release方法時+1.
計數器不能小於0,當計數器爲0時,acquire方法將阻塞線程至同步鎖定狀態,知道其餘線程調用release方法。(相似停車場的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將調用release時檢查計數器是否超過了計數器的初始值,若是超過了將拋出一個異常。
# author:Dman # date:2019/3/30 """ 一、信號量 二、信號量和遞歸鎖的區別: 三、應用場景: 四、信號量的建立: """ import threading,time class MyThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release() if __name__ =='__main__': semaphore = threading.BoundedSemaphore(5) thrs = [] for i in range(13): thrs.append(MyThread()) for i in thrs: i.start() # print('___main function close _____')
多線程數據共享利器--queue隊列模塊
做用:多個線程間進行安全的信息交互的時候
queue隊列類的方法
建立一個「隊列」對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。Python Queue模塊有三種隊列及構造函數:
一、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize)
二、LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize)
三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize)此包中的經常使用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 若是隊列爲空,返回True,反之False
q.full() 若是隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 至關q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 至關q.put(item, False)
q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做
案例一
# author:Dman # date:2019/3/30 import queue """ 隊列 queue:是線程安全的 相比較列表:爲何隊列是線程安全的 """ import threading,queue,time,random class Production(threading.Thread): def run(self): while True: r = random.randint(0,100) q.put(r) print('生產出來%s號包子' % r) time.sleep(1) class Proces(threading.Thread): def run(self): while True: re = q.get() print('吃掉%s號包子'% re) if __name__ == '__main__': q = queue.Queue(10) threads = [Production(),Production(),Proces()] for t in threads: t.start()
案例二:
# author:Dman # date:2019/4/3 #實現一個線程不斷生成一個隨機數到一個隊列中(考慮使用Queue這個模塊) # 實現一個線程從上面的隊列裏面不斷的取出奇數 # 實現另一個線程從上面的隊列裏面不斷取出偶數 import random,threading,time from queue import Queue #Producer thread class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): for i in range(10): #隨機產生10個數字 ,能夠修改成任意大小 randomnum=random.randint(1,99) print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)) self.data.put(randomnum) #將數據依次存入隊列 time.sleep(1) print ("%s: %s finished!" %(time.ctime(), self.getName())) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: try: val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時5秒 if val_even%2==0: print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)) time.sleep(2) else: self.data.put(val_even) time.sleep(2) except: #等待輸入,超過5秒 就報異常 print ("%s: %s finished!" %(time.ctime(),self.getName())) break class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: try: val_odd = self.data.get(1,5) if val_odd%2!=0: print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) except: print ("%s: %s finished!" % (time.ctime(), self.getName())) break #Main thread def main(): queue = Queue() producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() print ('All threads terminate!') if __name__ == '__main__': main()
案例3:相比較,list不是線程安全的
import threading,time li=[1,2,3,4,5] def pri(): while li: a=li[-1] print(a) time.sleep(1) try: li.remove(a) except: print('----',a) t1=threading.Thread(target=pri,args=()) t1.start() t2=threading.Thread(target=pri,args=()) t2.start()
多進程概念
因爲GIL的存在,默認的Cpython解釋器中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU資源,在python中大部分須要使用多進程,Python提供了multiprocessing模塊,這個模塊支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。
該模塊的使用和threading模塊相似,api大體相同,可是須要注意幾點:
一、在unix平臺上,某個進程終結以後,該進程須要被父進程調用wait,不然進程成爲殭屍進程,因此有必要對每一個process對象調用join方法(實際上等於wait),對於多線程來講 ,因爲只有一個進程,因此不存在此必要性。
二、multiprocessing模塊提供了Pipe和Queue,效率上更高,贏優先考慮Pipe和Queue,避免使用Lock、Event等同步方式(由於他們佔據的不是進程的資源。)
三、多進程應該避免共享資源,在多線程中,咱們能夠比較容易的共享資源,好比使用全局變量或者傳遞參數,在多進程的狀況下,因爲每一個進程有本身獨立的內存空間,以上方法不合適。此時咱們能夠經過共享內存和Manager的方法來共享資源,但這樣作提升了程序的複雜度。
四、另外、在windows系統下,須要注意的是想要啓動一個子進程,必須加上
if __name__ == '__main__':
建立多進程------multiprocessing.Process
直接調用
from multiprocessing import Process import time def f(name): time.sleep(1) print('hello', name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin',)) p_list.append(p) p.start() for i in p_list: p.join() print('end')
類的方式調用
from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print ('hello', self.name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
多進程之間的通訊,有三種方式
multiprocessing.Queue
from multiprocessing import Process, Queue def f(q,n): q.put([42, n, 'hello']) if __name__ == '__main__': q = Queue() p_list=[] for i in range(3): p = Process(target=f, args=(q,i)) p_list.append(p) p.start() print(q.get()) print(q.get()) print(q.get()) for i in p_list: i.join()
multiprocessing.Pipe
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
一、Pipe()函數返回一個由管道鏈接的鏈接對象,默認狀況下是雙工(雙向)。
二、Pipe()返回的兩個鏈接對象表明管道的兩端。 每一個鏈接對象都有send() 和recv()方法(以及其餘方法)。 請注意,若是兩個進程(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的數據可能會損壞。 固然,同時使用管道的不一樣端的進程不存在損壞的風險。
multiprocessing.Manager
from multiprocessing import Process, Manager def f(d, l,n): d[n] = '1' d['2'] = 2 d[0.25] = None l.append(n) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
一、Manager()返回的管理器對象控制一個服務器進程,該進程保存Python對象並容許其餘進程使用代理操做它們。
二、Manager()返回的管理器將支持類型列表,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value和Array。
進程間同步----multiprocessing.Lock
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
進程間同步,只使用父進程的鎖,(另外儘可能避免這種狀況)
進程池----multiprocessing.Pool
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print('-->exec done:',arg) pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join()
進程池內部維護一個進程序列,當使用時,就去進程池中獲取一個進程,若是進程池中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用的進程爲止。
進程池中的兩個方法:
一、apply
二、map
三、apply_async 是異步的,也就是在啓動進程以後會繼續後續的代碼,不用等待進程函數返回
四、map_async 是異步的,
五、join語句要放在close語句後面
協程是什麼?
協程,又稱微線程,英文名爲Coroutine,協程是用戶態的輕量級的線程。協程擁有本身的寄存器上下文和棧。協程調用切換時,將寄存器上下文和棧保存到其餘地方,在切換回來的時候,能夠恢復先前保存的寄存器上下文和棧,所以:
協程能保留上一次調用的狀態,每次過程重入的時候,就至關於進入上一次調用的狀態,換種說話,進入上一次離開時所處的邏輯流的位置
總結:
一、協程必須在只有一個單線程裏實現併發
二、修改共享數據不須要加鎖
三、用戶程序裏本身保存多個控制流的上下文和棧
四、一個協程遇到IO操做自動切換到其餘線程
協程的好處?
一、無需線程上下文切換的開銷
二、無需院子操做鎖定以及同步的開銷(原子操做是不須要同步,所謂原子操做是指不會被線程調度機制打斷的操做,也就是說該操做必須執行完畢,才能進行線程切換;原子操做能夠是一個步驟,也能夠是多個操做步驟)
三、方便切換控制流,簡化編程模型
四、高併發+高擴展+低成本:一個CPU支持上萬的協程都不是問題,因此很適合高併發的問題。
協程的缺點
一、沒法利用多核資源:協程的本質是一個單線程,它不能同時將單個CPU的多個核用上,協程須要和進程配合才能利用多核CPU;咱們平常所編寫的大部分應用沒有這個必要,除非是CPU密集性應用
二、進行阻塞操做入IO會阻塞掉整個程序
yield實現協程案例
# author:Dman # date:2019/4/1 import time import queue def consumer(name): print('---開始生產包子') while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) def producer(): next(con1) next(con2) n = 0 while n<5: n += 1 con1.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n) if __name__ == '__main__': con1 = consumer('c1') con2 = consumer('c2') p = producer() #------------------運行結果--------------- ---開始生產包子 ---開始生產包子 [c1] is eating baozi 1 [c2] is eating baozi 1 [producer] is making baozi 1 [c1] is eating baozi 2 [c2] is eating baozi 2 [producer] is making baozi 2 [c1] is eating baozi 3 [c2] is eating baozi 3 [producer] is making baozi 3 [c1] is eating baozi 4 [c2] is eating baozi 4 [producer] is making baozi 4 [c1] is eating baozi 5 [c2] is eating baozi 5 [producer] is making baozi 5
greenlet模塊支持的協程
相比較yield,能夠在任意函數之間隨意切換,而不須要把這個函數先聲明成爲generaor。(可是它沒法自動遇到IO阻塞去切換,必須手動去切換)
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() #調用switch去切換執行函數 #----------執行結果---------------- 12 56 34 78
gevent模塊支持的協程
理解
使用gevent,能夠得到極高的併發性能,但gevent只能在Unix/Linux下運行,在Windows下不保證正常安裝和運行。(它能夠在遇到IO阻塞的時候自動切換)
因爲gevent是基於IO切換的協程,因此最神奇的是,咱們編寫的Web App代碼,不須要引入gevent的包,也不須要改任何代碼,僅僅在部署的時候,用一個支持gevent的WSGI服務器,馬上就得到了數倍的性能提高。具體部署方式能夠參考後續「實戰」-「部署Web App」一節。
簡單案例
# author:Dman # date:2019/4/1 """ gevent 封裝了greenlet,這個不須要本身去切換,遇到io阻塞,模塊會本身去切換任務。 咱們只須要把gevent對象加到裏面 """ import gevent def func1(): print('\033[31;1m李闖在跟海濤搞...\033[0m') gevent.sleep(2) #模擬IO阻塞,自動開始切換 print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m') def func2(): print('\033[32;1m李闖切換到了跟海龍搞...\033[0m') gevent.sleep(1) print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m') gevent.joinall([ gevent.spawn(func1), #將函數加到裏面。 gevent.spawn(func2), # gevent.spawn(func3), ]) #-----------執行結果------------- 李闖在跟海濤搞... 李闖切換到了跟海龍搞... 李闖搞完了海濤,回來繼續跟海龍搞... 李闖又回去跟繼續跟海濤搞...
同步IO和異步IO的區別
# author:Dman # date:2019/4/1 import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) def asynchronous(): #異步io函數 threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:'.center(20,'-')) synchronous() print('Asynchronous:'.center(20,'-')) asynchronous()
簡單的異步爬蟲,遇到IO阻塞會自動切換任務
from gevent import monkey import time monkey.patch_all() # 在最開頭的地方gevent.monkey.patch_all();把標準庫中的thread/socket等給替換掉, # 這樣咱們在後面使用socket的時候能夠跟日常同樣使用,無需修改任何代碼,可是它變成非阻塞的了. # import gevent from urllib.request import urlopen def f(url): print('GET: %s' % url) resp = urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) list = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/'] start = time.time() # for url in l: # f(url) gevent.joinall([ gevent.spawn(f, list[0]), gevent.spawn(f, list[1]), gevent.spawn(f, list[2]), ]) print(time.time()-start) #-----------輸出結果--------------- GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ 48560 bytes received from https://www.python.org/. 82655 bytes received from https://github.com/. 536556 bytes received from https://www.yahoo.com/. 3.361192226409912