Process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。python
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹: 1. group參數未使用,值始終爲None 2. target表示調用對象,即子進程要執行的任務 3. args表示調用對象的位置參數元組,args=(1,2,'zze',) 4. kwargs表示調用對象的字典,kwargs={'name':'zze','age':18} 5. name爲子進程的名稱
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
1 import time 2 from multiprocessing import Process 3 4 5 def f(name): 6 print('hello', name) 7 print('我是子進程') 8 9 10 if __name__ == '__main__': 11 p = Process(target=f, args=('bob',)) 12 p.start() 13 time.sleep(1) 14 print('執行主進程的內容了') 15 16 # result 17 # hello bob 18 # 我是子進程 19 # 執行主進程的內容了
join()函數能夠阻塞主進程,讓其等待子進程代碼執行完畢後,再執行join()後面的代碼linux
1 import time 2 from multiprocessing import Process 3 4 5 def f(name): 6 print('hello', name) 7 time.sleep(2) 8 print('我是子進程') 9 10 11 if __name__ == '__main__': 12 p = Process(target=f, args=('bob',)) 13 p.start() 14 time.sleep(1) 15 p.join() 16 print('執行主進程的內容了') 17 18 # result 19 # hello bob 20 # 我是子進程 21 # 執行主進程的內容了
1 import os 2 from multiprocessing import Process 3 4 def f(x): 5 print('子進程id :',os.getpid(),'父進程id :',os.getppid()) 6 return x*x 7 8 if __name__ == '__main__': 9 print('主進程id :', os.getpid()) 10 p_lst = [] 11 for i in range(5): 12 p = Process(target=f, args=(i,)) 13 p.start() 14 15 #result: 16 # 主進程id : 9208 17 # 子進程id : 4276 父進程id : 9208 18 # 子進程id : 3744 父進程id : 9208 19 # 子進程id : 9392 父進程id : 9208 20 # 子進程id : 3664 父進程id : 9208 21 # 子進程id : 520 父進程id : 9208
1 import time 2 from multiprocessing import Process 3 4 5 def f(name): 6 print('hello', name) 7 time.sleep(1) 8 9 10 if __name__ == '__main__': 11 p_lst = [] 12 for i in range(5): 13 p = Process(target=f, args=('bob%s'%i,)) 14 p.start() 15 p_lst.append(p) 16 [p.join() for p in p_lst] 17 print('父進程在執行') 18 #result: 19 # hello bob0 20 # hello bob1 21 # hello bob2 22 # hello bob3 23 # hello bob4 24 # 父進程在執行
1 import os 2 from multiprocessing import Process 3 4 5 class MyProcess(Process): 6 def __init__(self, name): 7 super().__init__() 8 self.name = name 9 10 def run(self): 11 print(os.getpid()) 12 print(self.name) 13 14 15 if __name__ == '__main__': 16 p1 = MyProcess('p1') 17 p2 = MyProcess('p2') 18 p3 = MyProcess('p3') 19 # start會自動調用run 20 p1.start() 21 p2.start() 22 p3.start() 23 24 p1.join() 25 p2.join() 26 p3.join() 27 print('主線程111') 28 29 #result: 30 # 1740 31 # p1 32 # 8468 33 # p2 34 # 9572 35 # p3 36 # 主線程111
1 import time 2 from multiprocessing import Process 3 4 5 def func(): 6 while True: 7 print('我還活着') 8 time.sleep(0.5) 9 10 11 if __name__ == "__main__": 12 p = Process(target=func) 13 p.daemon = True # 設置子進程爲守護進程 14 p.start() 15 i = 2 16 while i > 0: 17 print('主進程執行') 18 time.sleep(1) 19 i -= 1 20 print('主進程執行完畢') 21 22 # result 23 # 主進程執行 24 # 我還活着 25 # 我還活着 26 # 主進程執行 27 # 我還活着 28 # 我還活着 29 # 主進程執行完畢
1 import time 2 from multiprocessing import Process 3 4 5 def func(): 6 while True: 7 print('我還活着') 8 time.sleep(0.5) 9 10 11 if __name__ == "__main__": 12 p = Process(target=func) 13 # p.daemon = True # 設置子進程爲守護進程 14 p.start() 15 i = 2 16 while i > 0: 17 print('主進程執行') 18 time.sleep(1) 19 i -= 1 20 print('主進程執行完畢') 21 22 # result 23 # 主進程執行 24 # 我還活着 25 # 我還活着 26 # 主進程執行 27 # 我還活着 28 # 我還活着 29 # 主進程執行完畢 30 # 我還活着 31 # 我還活着 32 # 我還活着 33 # 我還活着 34 # 我還活着 35 # 我還活着 36 # 我還活着 37 # 我還活着 38 # ...
加鎖能夠保證代碼塊在同一時間段只有指定一個進程執行git
1 from multiprocessing import Process 2 import time 3 import os 4 5 6 def func(): 7 time.sleep(1) 8 print('正在執行子進程的進程號:{},當前時間:{}'.format(os.getpid(), time.strftime("%Y-%m-%d %X"))) 9 10 11 if __name__ == '__main__': 12 for i in range(5): 13 Process(target=func).start() 14 15 # result: 16 # 正在執行子進程的進程號:6044,當前時間:2018-09-09 19:22:12 17 # 正在執行子進程的進程號:7024,當前時間:2018-09-09 19:22:12 18 # 正在執行子進程的進程號:9900,當前時間:2018-09-09 19:22:12 19 # 正在執行子進程的進程號:8888,當前時間:2018-09-09 19:22:12 20 # 正在執行子進程的進程號:10060,當前時間:2018-09-09 19:22:12
1 from multiprocessing import Lock 2 from multiprocessing import Process 3 import time 4 import os 5 6 7 def func(lock): 8 lock.acquire() 9 time.sleep(1) 10 print('正在執行子進程的進程號:{},當前時間:{}'.format(os.getpid(), time.strftime("%Y-%m-%d %X"))) 11 lock.release() 12 13 14 if __name__ == '__main__': 15 lock = Lock() 16 for i in range(5): 17 Process(target=func, args=(lock,)).start() 18 19 # result: 20 # 正在執行子進程的進程號:8752,當前時間:2018-09-09 19:25:39 21 # 正在執行子進程的進程號:10152,當前時間:2018-09-09 19:25:40 22 # 正在執行子進程的進程號:5784,當前時間:2018-09-09 19:25:41 23 # 正在執行子進程的進程號:9708,當前時間:2018-09-09 19:25:42 24 # 正在執行子進程的進程號:8696,當前時間:2018-09-09 19:25:43
信號量能夠保證代碼塊在同一時間段只有指定數量進程執行github
1 from multiprocessing import Process, Semaphore 2 import time 3 4 5 def func(num, s): 6 s.acquire() 7 print('編號:{} 正在執行,'.format(num), time.strftime("%Y-%m-%d %X")) 8 time.sleep(1) 9 s.release() 10 11 12 if __name__ == '__main__': 13 s = Semaphore(2) 14 for i in range(10): 15 p = Process(target=func, args=(i, s)) 16 p.start() 17 18 # result: 19 # 編號:0 正在執行, 2018-09-10 16:16:28 20 # 編號:1 正在執行, 2018-09-10 16:16:28 21 # 編號:2 正在執行, 2018-09-10 16:16:29 22 # 編號:3 正在執行, 2018-09-10 16:16:29 23 # 編號:4 正在執行, 2018-09-10 16:16:30 24 # 編號:5 正在執行, 2018-09-10 16:16:30 25 # 編號:7 正在執行, 2018-09-10 16:16:31 26 # 編號:6 正在執行, 2018-09-10 16:16:31 27 # 編號:8 正在執行, 2018-09-10 16:16:32 28 # 編號:9 正在執行, 2018-09-10 16:16:32
例:讓指定代碼塊在5秒後執行數組
1 from multiprocessing import Process, Event 2 import time 3 4 5 # 獲取指定秒數後的時間 6 def get_addsec_time(sec=0): 7 return time.strftime("%Y-%m-%d %X", time.localtime(time.time() + sec)) 8 9 10 def func(e): 11 print('func準備執行') 12 e.wait() # 當e.is_set()爲True時執行後面代碼 13 print('執行了,當前時間:{}'.format(time.strftime("%Y-%m-%d %X"))) 14 15 16 if __name__ == '__main__': 17 e = Event() 18 print(e.is_set()) # False 初始是阻塞狀態 19 e.set() 20 print(e.is_set()) # True 不阻塞 21 e.clear() 22 print(e.is_set()) # False 恢復阻塞 23 after_five_sec = get_addsec_time(5) # 5秒後的時間 24 Process(target=func, args=(e,)).start() 25 while True: 26 print('當前時間:{}'.format(time.strftime("%Y-%m-%d %X"))) 27 time.sleep(1) 28 if time.strftime("%Y-%m-%d %X") == after_five_sec: 29 print('5秒過去了') 30 e.set() 31 break; 32 33 # result: 34 # False 35 # True 36 # False 37 # 當前時間:2018-09-10 17:06:37 38 # func準備執行 39 # 當前時間:2018-09-10 17:06:38 40 # 當前時間:2018-09-10 17:06:39 41 # 當前時間:2018-09-10 17:06:40 42 # 當前時間:2018-09-10 17:06:41 43 # 5秒過去了 44 # 執行了,當前時間:2018-09-10 17:06:42
建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 安全
Queue([maxsize])
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
q.empty()
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。
q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。
q.join_thread()
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
1 ''' 2 multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 3 都是基於消息傳遞實現的,可是隊列接口 4 ''' 5 6 from multiprocessing import Queue 7 q=Queue(3) 8 9 #put ,get ,put_nowait,get_nowait,full,empty 10 q.put(3) 11 q.put(3) 12 q.put(3) 13 # q.put(3) # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。 14 # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。 15 try: 16 q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。 17 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。 18 print('隊列已經滿了') 19 20 # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。 21 print(q.full()) #滿了 22 23 print(q.get()) 24 print(q.get()) 25 print(q.get()) 26 # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。 27 try: 28 q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。 29 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 30 print('隊列已經空了') 31 32 print(q.empty()) #空了
1 from multiprocessing import Process, Queue 2 3 4 def func(q, e): 5 q.put('from son process') 6 7 8 if __name__ == '__main__': 9 q = Queue(5) # 初始化隊列容量爲5 10 p = Process(target=func, args=(q)) 11 p.start() 12 p.join() 13 print(q.get()) # from son process
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。網絡
JoinableQueue的實例q除了與Queue對象相同的方法以外,還具備如下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。
下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
1 from multiprocessing import JoinableQueue, Process 2 import time 3 import random 4 5 6 def producer(name, q): 7 for i in range(1, 11): 8 time.sleep(random.randint(1, 2)) 9 s = '{}生產的第{}個蘋果'.format(name, i) 10 q.put(s) 11 print(s) 12 q.join() # 生產完畢,使用此方法進行阻塞,直到隊列中全部蘋果都被吃完。 13 14 15 def consumer(name, q): 16 while True: 17 time.sleep(random.randint(2, 3)) 18 s = q.get() 19 print('{}吃了{}'.format(name, s)) 20 q.task_done() # 向q.join()發送一次信號,證實一個數據已經被取走了 21 22 23 if __name__ == '__main__': 24 q = JoinableQueue(10) 25 producer_task = Process(target=producer, args=('bob', q)) 26 producer_task.start() 27 consumer_task = Process(target=consumer, args=('tom', q)) 28 consumer_task.daemon = True # 設置爲守護進程 隨主進程代碼執行完而結束 29 consumer_task.start() 30 31 producer_task.join() # 等待至生產完且生產的蘋果都被吃完時繼續執行即主進程代碼結束 32 33 # result: 34 # bob生產的第1個蘋果 35 # tom吃了bob生產的第1個蘋果 36 # bob生產的第2個蘋果 37 # tom吃了bob生產的第2個蘋果 38 # bob生產的第3個蘋果 39 # tom吃了bob生產的第3個蘋果 40 # bob生產的第4個蘋果 41 # tom吃了bob生產的第4個蘋果 42 # bob生產的第5個蘋果 43 # tom吃了bob生產的第5個蘋果 44 # bob生產的第6個蘋果 45 # bob生產的第7個蘋果 46 # tom吃了bob生產的第6個蘋果 47 # bob生產的第8個蘋果 48 # tom吃了bob生產的第7個蘋果 49 # bob生產的第9個蘋果 50 # bob生產的第10個蘋果 51 # tom吃了bob生產的第8個蘋果 52 # tom吃了bob生產的第9個蘋果 53 # tom吃了bob生產的第10個蘋果
#建立管道的類:(管道是進程不安全的) Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道 #參數介紹: dumplex:默認管道是全雙工的,若是將duplex設成False,conn1只能用於接收,conn2只能用於發送。
#主要方法: conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象 #其餘方法: conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
1 from multiprocessing import Process, Pipe 2 3 4 def f(conn): 5 conn.send("from sub process") 6 conn.close() 7 8 9 if __name__ == '__main__': 10 parent_conn, child_conn = Pipe() 11 p = Process(target=f, args=(child_conn,)) 12 p.start() 13 print(parent_conn.recv()) # from sub process 14 p.join()
應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。多線程
1 from multiprocessing import Process, Pipe 2 3 4 def f(child_conn): 5 while True: 6 try: 7 print(child_conn.recv()) 8 except EOFError: 9 child_conn.close() 10 break 11 12 13 if __name__ == '__main__': 14 parent_conn, child_conn = Pipe() 15 p = Process(target=f, args=(child_conn,)) 16 p.start() 17 child_conn.close() 18 parent_conn.send('hello') 19 parent_conn.close() 20 p.join()
1 from multiprocessing import Manager, Process, Lock 2 3 4 def work(d, lock): 5 # with lock: 6 d['count'] -= 1 7 8 9 if __name__ == '__main__': 10 with Manager() as m: 11 lock = Lock() 12 dic = m.dict({'count': 100}) 13 p_l = [] 14 for i in range(10): 15 p = Process(target=work, args=(dic, lock)) 16 p_l.append(p) 17 p.start() 18 for p in p_l: p.join() 19 20 print(dic) # {'count': 91} 21 # Manager包裝的類型是進程不安全的
Pool([numprocess [,initializer [, initargs]]]):建立進程池
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數併發
1 from multiprocessing import Pool, Process 2 import time 3 4 5 def func(n): 6 for i in range(100): 7 n += i 8 9 10 if __name__ == '__main__': 11 pool = Pool(5) 12 start = time.time() 13 pool.map(func, range(100)) 14 print('進程池執行耗時:{}'.format(time.time() - start)) 15 p_list = [] 16 start = time.time() 17 for i in range(100): 18 p = Process(target=func, args=(i,)) 19 p.start() 20 p_list.append(p) 21 for p in p_list: p.join() 22 print('多進程執行耗時:{}'.format(time.time() - start)) 23 24 # result: 25 # 進程池執行耗時: 0.24797534942626953 26 # 多進程執行耗時: 7.359263896942139
1 import os, time 2 from multiprocessing import Pool 3 4 5 def work(n): 6 print('%s run' % os.getpid()) 7 time.sleep(3) 8 return n ** 2 9 10 11 if __name__ == '__main__': 12 p = Pool(3) # 進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 13 res_l = [] 14 for i in range(10): 15 res = p.apply(work, args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞 16 res_l.append(res) # 但無論該任務是否存在阻塞,同步調用都會在原地等着 17 print(res_l) 18 # result: 19 # 15940 run 20 # 16200 run 21 # 16320 run 22 # 15940 run 23 # 16200 run 24 # 16320 run 25 # 15940 run 26 # 16200 run 27 # 16320 run 28 # 15940 run 29 # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
1 import os 2 import time 3 import random 4 from multiprocessing import Pool 5 6 7 def work(n): 8 print('%s run' % os.getpid()) 9 time.sleep(random.random()) 10 return n ** 2 11 12 13 if __name__ == '__main__': 14 p = Pool(3) # 進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 15 res_l = [] 16 for i in range(10): 17 res = p.apply_async(work, args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行 18 # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務 19 # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束 20 # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 21 res_l.append(res) 22 23 # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果 24 # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了 25 p.close() 26 p.join() 27 for res in res_l: 28 print(res.get()) # 使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get 29 30 # result: 31 # 8872 run 32 # 13716 run 33 # 11396 run 34 # 11396 run 35 # 8872 run 36 # 13716 run 37 # 11396 run 38 # 8872 run 39 # 13716 run 40 # 11396 run 41 # 0 42 # 1 43 # 4 44 # 9 45 # 16 46 # 25 47 # 36 48 # 49 49 # 64 50 # 81
1 from multiprocessing import Pool 2 3 4 def func(i): 5 return i * i 6 7 8 def callback_func(i): 9 print(i) 10 11 12 if __name__ == '__main__': 13 pool = Pool(5) 14 for i in range(1, 11): 15 pool.apply_async(func, args=(i,), callback=callback_func) 16 pool.close() 17 pool.join() 18 # # result: 19 # 1 20 # 4 21 # 9 22 # 16 23 # 25 24 # 36 25 # 49 26 # 64 27 # 81 28 # 100
1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。app
2)通訊:進程間通訊IPC,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊——須要進程同步和互斥手段的輔助,以保證數據的一致性。
3)調度和切換:線程上下文切換比進程上下文切換要快得多。
4)在多線程操做系統中,進程不是一個可執行的實體。
5)進程是資源分配的最小單位,線程是CPU調度的最小單位,每個進程中至少有一個線程。
multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性。
Thread實例對象的方法
isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。
threading模塊提供的一些方法:
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
1 import threading 2 import time 3 4 5 def func(): 6 print('start sub thread1') 7 print(threading.currentThread()) # <Thread(sub thread1, started 11832)> 8 time.sleep(10) 9 print('end sub thread1') 10 11 12 thread = threading.Thread(target=func) 13 thread.start() 14 print(thread.is_alive()) # True 15 print(thread.getName()) # Thread-1 16 thread.setName('sub thread1') 17 print(thread.getName()) # sub thread1 18 print(threading.currentThread()) # <_MainThread(MainThread, started 9708)> 19 print(threading.enumerate()) # [<_MainThread(MainThread, started 9708)>, <Thread(sub thread1, started 11832)>] 20 print(threading.activeCount()) # 2
1 from threading import Thread 2 3 4 def func(): 5 print('from sub threading') 6 7 8 p = Thread(target=func) 9 p.start() 10 p.join() 11 # result: 12 # from sub threading
1 from threading import Thread 2 3 4 class MyThread(Thread): 5 def run(self): 6 print('from sub thread,threadid:{}'.format(self.ident)) 7 8 9 my_thread = MyThread() 10 my_thread.start() 11 my_thread.join() 12 13 # result: 14 # from sub thread,threadid:9332
同一進程內的線程之間共享進程內的數據
1 from threading import Thread 2 3 4 def func(): 5 global i 6 i = 1 7 8 9 i = 10 10 thread = Thread(target=func) 11 thread.start() 12 thread.join() 13 print(i) # 1
不管是進程仍是線程,都遵循:守護進程/線程會等待主進程/線程運行完畢後被銷燬。須要強調的是:運行完畢並不是終止運行
1.對主進程來講,運行完畢指的是主進程代碼運行完畢
2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
1 from multiprocessing import Process 2 import time 3 4 5 def notDaemonFunc(): 6 print('start notDaemonFunc') 7 time.sleep(10) 8 print('end notDaemonFunc') 9 10 11 def daemonFunc(): 12 print('start daemonFunc') 13 time.sleep(5) 14 print('end daemonFunc') # 主進程代碼早已執行完畢沒機會執行 15 16 17 if __name__ == '__main__': 18 notDaemonProcess = Process(target=notDaemonFunc) 19 notDaemonProcess.start() 20 damonProcess = Process(target=daemonFunc) 21 damonProcess.daemon = True 22 damonProcess.start() 23 time.sleep(1) 24 print('執行完畢') 25 26 # 主進程代碼執行完畢時守護進程立馬結束 27 28 # result: 29 # start notDaemonFunc 30 # start daemonFunc 31 # 執行完畢 32 # end notDaemonFunc
1 from threading import Thread 2 import time 3 4 5 def notDaemonFunc(): 6 print('start notDaemonFunc') 7 time.sleep(10) 8 print('end notDaemonFunc') 9 10 11 def daemonFunc(): 12 print('start daemonFunc') 13 time.sleep(5) 14 print('end daemonFunc') 15 16 17 notDaemonThread = Thread(target=notDaemonFunc) 18 notDaemonThread.start() 19 damonThread = Thread(target=daemonFunc) 20 damonThread.daemon = True 21 damonThread.start() 22 time.sleep(1) 23 print('執行完畢') 24 25 # result: 26 # start notDaemonFunc 27 # start daemonFunc 28 # 執行完畢 29 # end daemonFunc 30 # end notDaemonFunc
1 from threading import Thread 2 import time 3 4 5 def work(): 6 global n 7 temp = n 8 time.sleep(0.1) 9 n = temp - 1 10 11 12 if __name__ == '__main__': 13 n = 100 14 l = [] 15 for i in range(100): 16 p = Thread(target=work) 17 l.append(p) 18 p.start() 19 for p in l: 20 p.join() 21 22 print(n) # 指望0 但結果可能爲99 98
1 from threading import Thread, Lock 2 import time 3 4 5 def work(lock): 6 with lock: 7 global n 8 temp = n 9 time.sleep(0.1) 10 n = temp - 1 11 12 13 if __name__ == '__main__': 14 n = 100 15 l = [] 16 lock = Lock() 17 for i in range(100): 18 p = Thread(target=work, args=(lock,)) 19 l.append(p) 20 p.start() 21 for p in l: 22 p.join() 23 24 print(n) # 0
是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程
1 import time 2 from threading import Thread, Lock 3 4 noodle_lock = Lock() 5 fork_lock = Lock() 6 7 8 def eat1(name): 9 noodle_lock.acquire() 10 print('%s 搶到了麪條' % name) 11 time.sleep(1) 12 fork_lock.acquire() 13 print('%s 搶到了筷子' % name) 14 print('%s 吃麪' % name) 15 fork_lock.release() 16 noodle_lock.release() 17 18 19 def eat2(name): 20 fork_lock.acquire() 21 print('%s 搶到了筷子' % name) 22 time.sleep(1) 23 noodle_lock.acquire() 24 print('%s 搶到了麪條' % name) 25 print('%s 吃麪' % name) 26 noodle_lock.release() 27 fork_lock.release() 28 29 30 t1 = Thread(target=eat1, args=('tom',)) 31 t2 = Thread(target=eat2, args=('jerry',)) 32 t1.start() 33 t2.start() 34 #result: 35 # tom 搶到了麪條 36 # jerry 搶到了叉子
在Python中爲了支持在同一線程中屢次請求同一資源,提供了可重入鎖RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次請求。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
1 import time 2 from threading import Thread, RLock 3 4 fork_lock = noodle_lock = RLock() 5 6 7 def eat1(name): 8 noodle_lock.acquire() 9 print('%s 搶到了麪條' % name) 10 time.sleep(1) 11 fork_lock.acquire() 12 print('%s 搶到了筷子' % name) 13 print('%s 吃麪' % name) 14 fork_lock.release() 15 noodle_lock.release() 16 17 18 def eat2(name): 19 fork_lock.acquire() 20 print('%s 搶到了筷子' % name) 21 time.sleep(1) 22 noodle_lock.acquire() 23 print('%s 搶到了麪條' % name) 24 print('%s 吃麪' % name) 25 noodle_lock.release() 26 fork_lock.release() 27 28 29 t1 = Thread(target=eat1, args=('tom',)) 30 t2 = Thread(target=eat2, args=('jerry',)) 31 t1.start() 32 t2.start() 33 34 # result: 35 # tom 搶到了麪條 36 # tom 搶到了筷子 37 # tom 吃麪 38 # jerry 搶到了筷子 39 # jerry 搶到了麪條 40 # jerry 吃麪
1 from threading import Thread, Semaphore 2 import time 3 4 5 def func(num, s): 6 s.acquire() 7 print('編號:{} 正在執行,'.format(num), time.strftime("%Y-%m-%d %X")) 8 time.sleep(1) 9 s.release() 10 11 12 s = Semaphore(2) 13 [Thread(target=func, args=(i, s)).start() for i in range(10)] 14 15 # result: 16 # 編號:0 正在執行, 2018-09-12 20:33:09 17 # 編號:1 正在執行, 2018-09-12 20:33:09 18 # 編號:2 正在執行, 2018-09-12 20:33:10 19 # 編號:3 正在執行, 2018-09-12 20:33:10 20 # 編號:4 正在執行, 2018-09-12 20:33:11 21 # 編號:5 正在執行, 2018-09-12 20:33:11 22 # 編號:7 正在執行, 2018-09-12 20:33:12 23 # 編號:6 正在執行, 2018-09-12 20:33:12 24 # 編號:9 正在執行, 2018-09-12 20:33:13 25 # 編號:8 正在執行, 2018-09-12 20:33:13
1 from threading import Thread, Event 2 import time 3 4 5 # 獲取指定秒數後的時間 6 def get_addsec_time(sec=0): 7 return time.strftime("%Y-%m-%d %X", time.localtime(time.time() + sec)) 8 9 10 def func(e): 11 print('func準備執行') 12 e.wait() # 當e.is_set()爲True時執行後面代碼 13 print('執行了,當前時間:{}'.format(time.strftime("%Y-%m-%d %X"))) 14 15 16 e = Event() 17 print(e.is_set()) # False 初始是阻塞狀態 18 e.set() 19 print(e.is_set()) # True 不阻塞 20 e.clear() 21 print(e.is_set()) # False 恢復阻塞 22 after_five_sec = get_addsec_time(5) # 5秒後的時間 23 Thread(target=func, args=(e,)).start() 24 while True: 25 print('當前時間:{}'.format(time.strftime("%Y-%m-%d %X"))) 26 time.sleep(1) 27 if time.strftime("%Y-%m-%d %X") == after_five_sec: 28 print('5秒過去了') 29 e.set() 30 break; 31 32 # result: 33 # False 34 # True 35 # False 36 # func準備執行 37 # 當前時間:2018-09-12 20:37:27 38 # 當前時間:2018-09-12 20:37:28 39 # 當前時間:2018-09-12 20:37:29 40 # 當前時間:2018-09-12 20:37:30 41 # 當前時間:2018-09-12 20:37:31 42 # 5秒過去了 43 # 執行了,當前時間:2018-09-12 20:37:32
使得線程等待,只有知足某條件時,才釋放n個線程
Python提供的Condition對象提供了對複雜線程同步問題的支持。Condition被稱爲條件變量,除了提供與Lock相似的acquire和release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,而後判斷一些條件。若是條件不知足則wait;若是條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
1 import threading 2 3 4 def run(n): 5 con.acquire() 6 print('prepare') 7 con.wait() 8 print("run the thread: %s" % n) 9 con.release() 10 11 con = threading.Condition() 12 for i in range(5): 13 t = threading.Thread(target=run, args=(i,)) 14 t.start() 15 16 while True: 17 inp = input('>>>') 18 if inp == 'q': 19 break 20 con.acquire() 21 con.notify(int(inp)) 22 con.release() 23 print('------------------------') 24 25 #result: 26 # prepare 27 # prepare 28 # prepare 29 # prepare 30 # prepare 31 # >>>3 32 # ------------------------ 33 # run the thread: 2 34 # run the thread: 1 35 # run the thread: 0 36 # >>>3 37 # ------------------------ 38 # run the thread: 4 39 # run the thread: 3 40 # >>>q
指定n秒後執行某個函數
1 from threading import Timer 2 import time 3 4 5 def func(): 6 print('in func,current time:{}'.format(time.strftime('%X'))) 7 8 9 print('in main,current time:{}'.format(time.strftime('%X'))) 10 # 5秒後執行 11 t = Timer(5, func) 12 t.start() 13 14 # result: 15 # in main,current time:20:53:52 16 # in func,current time:20:53:57
在上述threading模塊知識點中並無出現一個和multiprocessing模塊中Queen對應的隊列,這是由於python自己給咱們提供的queen就是線程安全的,而同個進程的線程之間資源是能夠共享的,因此咱們能夠直接使用queen
queue.
Queue
(maxsize=0) 先進先出1 import queue 2 3 q=queue.Queue() 4 q.put('first') 5 q.put('second') 6 q.put('third') 7 8 print(q.get()) 9 print(q.get()) 10 print(q.get()) 11 12 ''' 13 result: 14 first 15 second 16 third 17 '''
queue.
LifoQueue
(maxsize=0) 後進先出
1 import queue 2 3 q = queue.LifoQueue() 4 q.put('first') 5 q.put('second') 6 q.put('third') 7 8 print(q.get()) 9 print(q.get()) 10 print(q.get()) 11 ''' 12 result: 13 third 14 second 15 first 16 '''
queue.
PriorityQueue
(maxsize=0) #優先級
1 import queue 2 3 q = queue.PriorityQueue() 4 # put進入一個元組,第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 5 q.put((20, 'a')) 6 q.put((10, 'b')) 7 q.put((30, 'c')) 8 9 print(q.get()) 10 print(q.get()) 11 print(q.get()) 12 ''' 13 數字越小優先級越高,優先級高的優先出隊 14 result: 15 (10, 'b') 16 (20, 'a') 17 (30, 'c') 18 '''
concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor:進程池,提供異步調用 executor = ProcessPoolExecutor(max_workers=n):初始化進程池 max_workers指定池內最大進程數 executor.submit(fn, *args, **kwargs):異步提交任務 executor.map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做 executor.shutdown(wait=True) :至關於multiprocessing模塊中的pool.close()+pool.join()操做,wait=True時,等待池內全部任務執行完畢回收完資源後才繼續.wait=False時,當即返回,並不會等待池內的任務執行完畢,但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢,submit和map必須在shutdown以前 executor.submit().result(timeout=None):取得結果 executor.submit().result(timeout=None):取得結果 executor.submit().add_done_callback(fn):給任務添加回調函數
1 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor 2 3 import os, time 4 5 6 def func(n): 7 print('{} is runing ,current time:{}'.format(os.getpid(), time.strftime('%X'))) 8 time.sleep(1) 9 return 'pid:{} finished'.format(os.getpid()) 10 11 12 if __name__ == '__main__': 13 executor = ProcessPoolExecutor(max_workers=2) 14 result_list = [] 15 for i in range(1, 6): 16 result = executor.submit(func, i) 17 result_list.append(result) 18 executor.shutdown(True) 19 print('---------------get result-----------------') 20 for result in result_list: 21 print(result.result()) 22 23 ''' 24 result: 25 3444 is runing ,current time:21:32:39 26 2404 is runing ,current time:21:32:39 27 3444 is runing ,current time:21:32:40 28 2404 is runing ,current time:21:32:40 29 3444 is runing ,current time:21:32:41 30 ---------------get result----------------- 31 pid:3444 finished 32 pid:2404 finished 33 pid:3444 finished 34 pid:2404 finished 35 pid:3444 finished 36 '''
1 from concurrent.futures import ThreadPoolExecutor 2 import threading 3 import time 4 5 6 def task(n): 7 print('threadId:{} is runing,current time:{}'.format(threading.currentThread().ident, time.strftime('%X'))) 8 time.sleep(1) 9 return n ** 2 10 11 12 if __name__ == '__main__': 13 executor = ThreadPoolExecutor(max_workers=2) 14 15 # for i in range(11): 16 # future=executor.submit(task,i) 17 18 executor.map(task, range(1, 5)) # map取代了for+submit 19 20 ''' 21 result: 22 threadId:5324 is runing,current time:21:53:24 23 threadId:3444 is runing,current time:21:53:24 24 threadId:5324 is runing,current time:21:53:25 25 threadId:3444 is runing,current time:21:53:25 26 '''
1 from concurrent.futures import ThreadPoolExecutor 2 import threading 3 import time 4 5 6 def callback_func(result): 7 print(result.result()) 8 9 10 def func(i): 11 return i * i 12 13 14 executor = ThreadPoolExecutor(5) 15 [executor.submit(func, i).add_done_callback(callback_func) for i in range(1, 5)] 16 17 ''' 18 result: 19 1 20 4 21 9 22 16 23 '''
單線程裏執行多個任務代碼一般會既有計算操做又有阻塞操做,咱們徹底能夠在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。如此,才能提升效率,這就用到了Gevent模塊。
協程是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
須要強調的是:
1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)
2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關) 對比操做系統控制線程的切換,用戶在單線程內控制協程的切換
1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程
2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
1. 必須在只有一個單線程裏實現併發
2. 修改共享數據不需加鎖
3. 用戶程序裏本身保存多個控制流的上下文棧
4. 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))
安裝: pip3 install greenlet
1 from greenlet import greenlet 2 3 4 def func1(): 5 print('func1 start') 6 g2.switch() 7 print('func1 end') 8 g2.switch() 9 10 11 def func2(): 12 print('func2 start') 13 g1.switch() 14 print('func2 end') 15 16 17 g1 = greenlet(func1) 18 g2 = greenlet(func2) 19 g1.switch() 20 21 ''' 22 result: 23 func1 start 24 func2 start 25 func1 end 26 func2 end 27 '''
1 #順序執行 2 import time 3 def f1(): 4 res=1 5 for i in range(100000000): 6 res+=i 7 8 def f2(): 9 res=1 10 for i in range(100000000): 11 res*=i 12 13 start=time.time() 14 f1() 15 f2() 16 stop=time.time() 17 print('run time is %s' %(stop-start)) #10.985628366470337 18 19 #切換 20 from greenlet import greenlet 21 import time 22 def f1(): 23 res=1 24 for i in range(100000000): 25 res+=i 26 g2.switch() 27 28 def f2(): 29 res=1 30 for i in range(100000000): 31 res*=i 32 g1.switch() 33 34 start=time.time() 35 g1=greenlet(f1) 36 g2=greenlet(f2) 37 g1.switch() 38 stop=time.time() 39 print('run time is %s' %(stop-start)) # 52.763017892837524
單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度
安裝: pip3 install gevent
1 import gevent 2 import threading 3 import os 4 import time 5 6 7 def func1(): 8 print('pid:{} threadid:{} from func1 | start'.format(os.getpid(), threading.get_ident())) 9 gevent.sleep(1) 10 print('pid:{} threadid:{} from func1 | end'.format(os.getpid(), threading.get_ident())) 11 12 13 def func2(): 14 print('pid:{} threadid:{} from func2 | start'.format(os.getpid(), threading.get_ident())) 15 gevent.sleep(1) 16 print('pid:{} threadid:{} from func2 | end'.format(os.getpid(), threading.get_ident())) 17 18 19 start = time.time() 20 func1() 21 func2() 22 print('非協程耗時:{}'.format(time.time() - start)) 23 24 start = time.time() 25 g1 = gevent.spawn(func1) 26 g2 = gevent.spawn(func2) 27 g1.join() 28 g2.join() 29 print('協程耗時:{}'.format(time.time() - start)) 30 31 ''' 32 result: 33 pid:12092 threadid:2828 from func1 | start 34 pid:12092 threadid:2828 from func1 | end 35 pid:12092 threadid:2828 from func2 | start 36 pid:12092 threadid:2828 from func2 | end 37 非協程耗時:2.008000135421753 38 pid:12092 threadid:2828 from func1 | start 39 pid:12092 threadid:2828 from func2 | start 40 pid:12092 threadid:2828 from func1 | end 41 pid:12092 threadid:2828 from func2 | end 42 協程耗時:1.0 43 '''
上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
from gevent import monkey;monkey.patch_all() # 必須放到被打補丁者的前面
1 from gevent import monkey;monkey.patch_all() 2 import gevent 3 import threading 4 import os 5 import time 6 7 8 def func1(): 9 print('pid:{} threadid:{} from func1 | start'.format(os.getpid(), threading.get_ident())) 10 time.sleep(1) 11 print('pid:{} threadid:{} from func1 | end'.format(os.getpid(), threading.get_ident())) 12 13 14 def func2(): 15 print('pid:{} threadid:{} from func2 | start'.format(os.getpid(), threading.get_ident())) 16 time.sleep(1) 17 print('pid:{} threadid:{} from func2 | end'.format(os.getpid(), threading.get_ident())) 18 19 20 start = time.time() 21 func1() 22 func2() 23 print('非協程耗時:{}'.format(time.time() - start)) 24 25 start = time.time() 26 g1 = gevent.spawn(func1) 27 g2 = gevent.spawn(func2) 28 g1.join() 29 g2.join() 30 print('協程耗時:{}'.format(time.time() - start)) 31 32 ''' 33 result: 34 pid:7200 threadid:43458064 from func1 | start 35 pid:7200 threadid:43458064 from func1 | end 36 pid:7200 threadid:43458064 from func2 | start 37 pid:7200 threadid:43458064 from func2 | end 38 非協程耗時:2.004999876022339 39 pid:7200 threadid:55386728 from func1 | start 40 pid:7200 threadid:55387544 from func2 | start 41 pid:7200 threadid:55386728 from func1 | end 42 pid:7200 threadid:55387544 from func2 | end 43 協程耗時:1.000999927520752 44 '''
1 from gevent import monkey;monkey.patch_all() 2 import gevent 3 import requests 4 import time 5 6 7 def get_page(url): 8 print('GET: %s' % url) 9 response = requests.get(url) 10 if response.status_code == 200: 11 print('%d bytes received from %s' % (len(response.text), url)) 12 13 14 start_time = time.time() 15 gevent.joinall([ 16 gevent.spawn(get_page, 'https://www.python.org/'), 17 gevent.spawn(get_page, 'https://www.yahoo.com/'), 18 gevent.spawn(get_page, 'https://github.com/'), 19 ]) 20 stop_time = time.time() 21 print('run time is %s' % (stop_time - start_time)) 22 ''' 23 result: 24 GET: https://www.python.org/ 25 GET: https://www.yahoo.com/ 26 GET: https://github.com/ 27 64127 bytes received from https://github.com/ 28 48854 bytes received from https://www.python.org/ 29 502701 bytes received from https://www.yahoo.com/ 30 run time is 1.9760000705718994 31 '''
1 from gevent import monkey;monkey.patch_all() 2 from socket import * 3 import gevent 4 5 6 # 若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket 7 # from gevent import socket 8 # s=socket.socket() 9 10 def server(server_ip, port): 11 s = socket(AF_INET, SOCK_STREAM) 12 s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 13 s.bind((server_ip, port)) 14 s.listen(5) 15 while True: 16 conn, addr = s.accept() 17 gevent.spawn(talk, conn, addr) 18 19 20 def talk(conn, addr): 21 try: 22 while True: 23 res = conn.recv(1024) 24 print('client %s:%s msg: %s' % (addr[0], addr[1], res)) 25 conn.send(res.upper()) 26 except Exception as e: 27 print(e) 28 finally: 29 conn.close() 30 31 32 if __name__ == '__main__': 33 server('127.0.0.1', 8080)
1 from threading import Thread 2 from socket import * 3 import threading 4 5 6 def client(server_ip, port): 7 c = socket(AF_INET, SOCK_STREAM) # 套接字對象必定要加到函數內,即局部名稱空間內,放在函數外則被全部線程共享,則你們公用一個套接字對象,那麼客戶端端口永遠同樣了 8 c.connect((server_ip, port)) 9 10 count = 0 11 while True: 12 c.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8')) 13 msg = c.recv(1024) 14 print(msg.decode('utf-8')) 15 count += 1 16 17 18 if __name__ == '__main__': 19 for i in range(500): 20 t = Thread(target=client, args=('127.0.0.1', 8080)) 21 t.start()