Python進程、線程、協程詳解

進程與線程的歷史

咱們都知道計算機是由硬件和軟件組成的。硬件中的CPU是計算機的核心,它承擔計算機的全部任務。 操做系統是運行在硬件之上的軟件,是計算機的管理者,它負責資源的管理和分配、任務的調度。 程序是運行在系統上的具備某種功能的軟件,好比說瀏覽器,音樂播放器等。 每次執行程序的時候,都會完成必定的功能,好比說瀏覽器幫咱們打開網頁,爲了保證其獨立性,就須要一個專門的管理和控制執行程序的數據結構——進程控制塊。 進程就是一個程序在一個數據集上的一次動態執行過程。 進程通常由程序、數據集、進程控制塊三部分組成。咱們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程當中所須要使用的資源;進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。python


在早期的操做系統裏,計算機只有一個核心,進程執行程序的最小單位,任務調度採用時間片輪轉的搶佔式方式進行進程調度。每一個進程都有各自的一塊獨立的內存,保證進程彼此間的內存地址空間的隔離。 隨着計算機技術的發展,進程出現了不少弊端,一是進程的建立、撤銷和切換的開銷比較大,二是因爲對稱多處理機(對稱多處理機(SymmetricalMulti-Processing)又叫SMP,是指在一個計算機上聚集了一組處理器(多CPU),各CPU之間共享內存子系統以及總線結構)的出現,能夠知足多個運行單位,而多進程並行開銷過大。 這個時候就引入了線程的概念。 線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合 和堆棧共同組成。線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能。 線程沒有本身的系統資源,只擁有在運行時必不可少的資源。但線程能夠與同屬與同一進程的其餘線程共享進程所擁有的其餘資源。git


進程與線程之間的關係

線程是屬於進程的,線程運行在進程空間內,同一進程所產生的線程共享同一內存空間,當進程退出時該進程所產生的線程都會被強制退出並清除。線程可與屬於同一進程的其它線程共享進程所擁有的所有資源,可是其自己基本上不擁有系統資源,只擁有一點在運行中必不可少的信息(如程序計數器、一組寄存器和棧)。程序員


python 線程github

Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。算法


一、threading模塊

threading 模塊創建在 _thread 模塊之上。thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊經過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import  threading
import  time
   
def  worker(num):
     """
     thread worker function
     :return:
     """
     time.sleep( 1 )
     print ( "The num is  %d"  %  num)
     return
   
for  in  range ( 20 ):
     =  threading.Thread(target = worker,args = (i,),name = 「t. % d」  %  i)
     t.start()

上述代碼建立了20個「前臺」線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。瀏覽器


Thread方法說明安全


t.start() : 激活線程,bash


t.getName() : 獲取線程的名稱網絡


t.setName() : 設置線程的名稱 


t.name : 獲取或設置線程的名稱


t.is_alive() : 判斷線程是否爲激活狀態


t.isAlive() :判斷線程是否爲激活狀態


t.setDaemon() 設置爲後臺線程或前臺線程(默認:False);經過一個布爾值設置線程是否爲守護線程,必須在執行start()方法以後纔可使用。若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止;若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止


t.isDaemon() : 判斷是否爲守護線程


t.ident :獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法以後該屬性纔有效,不然它只返回None。


t.join() :逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義


t.run() :線程被cpu調度後自動執行線程對象的run方法


二、線程鎖threading.RLock和threading.Lock


因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,CPU接着執行其餘線程。爲了保證數據的準確性,引入了鎖的概念。因此,可能出現以下問題:


例:假設列表A的全部元素都爲0,當一個線程從前向後打印列表的全部元素,另一個線程則從後向前修改列表的元素爲1,那麼輸出的時候,列表的元素就會一部分爲0,一部分爲1,這就致使了數據的不一致。鎖的出現解決了這個問題。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import  threading
import  time
   
globals_num  =  0
   
lock  =  threading.RLock()
   
def  Func():
     lock.acquire()   # 得到鎖
     global  globals_num
     globals_num  + =  1
     time.sleep( 1 )
     print (globals_num)
     lock.release()   # 釋放鎖
   
for  in  range ( 10 ):
     =  threading.Thread(target = Func)
     t.start()


三、threading.RLock和threading.Lock 的區別

RLock容許在同一線程中被屢次acquire。而Lock卻不容許這種狀況。 若是使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣。

1
2
3
4
5
6
7
8
9
10
11
12
import  threading
lock  =  threading.Lock()     #Lock對象
lock.acquire()
lock.acquire()   #產生了死瑣。
lock.release()
lock.release() 
import  threading
rLock  =  threading.RLock()   #RLock對象
rLock.acquire()
rLock.acquire()     #在同一線程內,程序不會堵塞。
rLock.release()
rLock.release()


四、threading.Event


python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。


事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。


clear:將「Flag」設置爲False

set:將「Flag」設置爲True

Event.isSet() :判斷標識位是否爲Ture。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import  threading
   
def  do(event):
     print ( 'start' )
     event.wait()
     print ( 'execute' )
   
event_obj  =  threading.Event()
for  in  range ( 10 ):
     =  threading.Thread(target = do, args = (event_obj,))
     t.start()
   
event_obj.clear()
inp  =  input ( 'input:' )
if  inp  = =  'true' :
     event_obj. set ()

當線程執行的時候,若是flag爲False,則線程會阻塞,當flag爲True的時候,線程不會阻塞。它提供了本地和遠程的併發性。


五、threading.Condition


一個condition變量老是與某些類型的鎖相聯繫,這個可使用默認的狀況或建立一個,當幾個condition變量必須共享和同一個鎖的時候,是頗有用的。鎖是conditon對象的一部分:沒有必要分別跟蹤。


condition變量服從上下文管理協議:with語句塊封閉以前能夠獲取與鎖的聯繫。 acquire() 和 release() 會調用與鎖相關聯的相應的方法。


其餘和鎖關聯的方法必須被調用,wait()方法會釋放鎖,當另一個線程使用 notify() or notify_all()喚醒它以前會一直阻塞。一旦被喚醒,wait()會從新得到鎖並返回,


Condition類實現了一個conditon變量。 這個conditiaon變量容許一個或多個線程等待,直到他們被另外一個線程通知。 若是lock參數,被給定一個非空的值,,那麼他必須是一個lock或者Rlock對象,它用來作底層鎖。不然,會建立一個新的Rlock對象,用來作底層鎖。


wait(timeout=None) : 等待通知,或者等到設定的超時時間。當調用這wait()方法時,若是調用它的線程沒有獲得鎖,那麼會拋出一個RuntimeError 異常。 wati()釋放鎖之後,在被調用相同條件的另外一個進程用notify() or notify_all() 叫醒以前 會一直阻塞。wait() 還能夠指定一個超時時間。

若是有等待的線程,notify()方法會喚醒一個在等待conditon變量的線程。notify_all() 則會喚醒全部在等待conditon變量的線程。


注意: notify()和notify_all()不會釋放鎖,也就是說,線程被喚醒後不會馬上返回他們的wait() 調用。除非線程調用notify()和notify_all()以後放棄了鎖的全部權。


在典型的設計風格里,利用condition變量用鎖去通許訪問一些共享狀態,線程在獲取到它想獲得的狀態前,會反覆調用wait()。修改狀態的線程在他們狀態改變時調用 notify() or notify_all(),用這種方式,線程會盡量的獲取到想要的一個等待者狀態。 例子: 生產者-消費者模型,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import  threading
import  time
def  consumer(cond):
     with cond:
         print ( "consumer before wait" )
         cond.wait()
         print ( "consumer after wait" )
   
def  producer(cond):
     with cond:
         print ( "producer before notifyAll" )
         cond.notifyAll()
         print ( "producer after notifyAll" )
   
condition  =  threading.Condition()
c1  =  threading.Thread(name = "c1" , target = consumer, args = (condition,))
c2  =  threading.Thread(name = "c2" , target = consumer, args = (condition,))
   
=  threading.Thread(name = "p" , target = producer, args = (condition,))
   
c1.start()
time.sleep( 2 )
c2.start()
time.sleep( 2 )
p.start()


六、queue模塊


Queue 就是對隊列,它是線程安全的


舉例來講,咱們去麥當勞吃飯。飯店裏面有廚師職位,前臺負責把廚房作好的飯賣給顧客,顧客則去前臺領取作好的飯。這裏的前臺就至關於咱們的隊列。造成管道樣,廚師作好飯經過前臺傳送給顧客,所謂單向隊列


這個模型也叫生產者-消費者模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import  queue
  
=  queue.Queue(maxsize = 0 )   # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0 時,表示隊列長度無限制。
  
q.join()     # 等到隊列爲kong的時候,在執行別的操做
q.qsize()    # 返回隊列的大小 (不可靠)
q.empty()    # 當隊列爲空的時候,返回True 不然返回False (不可靠)
q.full()     # 當隊列滿的時候,返回True,不然返回False (不可靠)
q.put(item, block = True , timeout = None #  將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置,
                         爲 False 時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,事後,
                          若是隊列沒法給出放入item的位置,則引起 queue.Full 異常
q.get(block = True , timeout = None #   移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞,
                      若此時隊列爲空,則引起 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。
q.put_nowait(item)  #   等效於 put(item,block=False)
q.get_nowait()  #    等效於 get(item,block=False)

代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
import  Queue
import  threading
message  =  Queue.Queue( 10 )
  
  
def  producer(i):
     while  True :
         message.put(i)
  
  
def  consumer(i):
     while  True :
         msg  =  message.get()
  
  
for  in  range ( 12 ):
     =  threading.Thread(target = producer, args = (i,))
     t.start()
  
for  in  range ( 10 ):
     =  threading.Thread(target = consumer, args = (i,))
     t.start()

那就本身作個線程池吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 簡單往隊列中傳輸線程數
import  threading
import  time
import  queue
class  Threadingpool():
     def  __init__( self ,max_num  =  10 ):
         self .queue  =  queue.Queue(max_num)
         for  in  range (max_num):
             self .queue.put(threading.Thread)
     def  getthreading( self ):
         return  self .queue.get()
     def  addthreading( self ):
         self .queue.put(threading.Thread)
def  func(p,i):
     time.sleep( 1 )
     print (i)
     p.addthreading()
if  __name__  = =  "__main__" :
     =  Threadingpool()
     for  in  range ( 20 ):
         thread  =  p.getthreading()
         =  thread(target  =  func, args  =  (p,i))
         t.start()


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#往隊列中無限添加任務
import  queue
import  threading
import  contextlib
import  time
StopEvent  =  object ()
class  ThreadPool( object ):
     def  __init__( self , max_num):
         self .q  =  queue.Queue()
         self .max_num  =  max_num
         self .terminal  =  False
         self .generate_list  =  []
         self .free_list  =  []
     def  run( self , func, args, callback = None ):
         """
         線程池執行一個任務
         :param func: 任務函數
         :param args: 任務函數所需參數
         :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
         :return: 若是線程池已經終止,則返回True不然None
         """
         if  len ( self .free_list)  = =  0  and  len ( self .generate_list) <  self .max_num:
             self .generate_thread()
         =  (func, args, callback,)
         self .q.put(w)
     def  generate_thread( self ):
         """
         建立一個線程
         """
         =  threading.Thread(target = self .call)
         t.start()
     def  call( self ):
         """
         循環去獲取任務函數並執行任務函數
         """
         current_thread  =  threading.currentThread
         self .generate_list.append(current_thread)
         event  =  self .q.get()   # 獲取線程
         while  event ! =  StopEvent:    # 判斷獲取的線程數不等於全局變量
             func, arguments, callback  =  event    # 拆分元祖,得到執行函數,參數,回調函數
             try :
                 result  =  func( * arguments)    # 執行函數
                 status  =  True
             except  Exception as e:     # 函數執行失敗
                 status  =  False
                 result  =  e
             if  callback  is  not  None :
                 try :
                     callback(status, result)
                 except  Exception as e:
                     pass
             # self.free_list.append(current_thread)
             # event = self.q.get()
             # self.free_list.remove(current_thread)
             with  self .work_state():
                 event  =  self .q.get()
         else :
             self .generate_list.remove(current_thread)
     def  close( self ):
         """
         關閉線程,給傳輸全局非元祖的變量來進行關閉
         :return:
         """
         for  in  range ( len ( self .generate_list)):
             self .q.put(StopEvent)
     def  terminate( self ):
         """
         忽然關閉線程
         :return:
         """
         self .terminal  =  True
         while  self .generate_list:
             self .q.put(StopEvent)
         self .q.empty()
     @contextlib.contextmanager
     def  work_state( self ):
         self .free_list.append(threading.currentThread)
         try :
             yield
         finally :
             self .free_list.remove(threading.currentThread)
def  work(i):
     print (i)
     return  + 1  # 返回給回調函數
def  callback(ret):
     print (ret)
pool  =  ThreadPool( 10 )
for  item  in  range ( 50 ):
     pool.run(func = work, args = (item,),callback = callback)
pool.terminate()
# pool.close()


python 進程

multiprocessing是python的多進程管理包,和threading.Thread相似。


一、multiprocessing模塊


直接從側面用subprocesses替換線程使用GIL的方式,因爲這一點,multiprocessing模塊可讓程序員在給定的機器上充分的利用CPU。在multiprocessing中,經過建立Process對象生成進程,而後調用它的start()方法,

1
2
3
4
5
6
7
8
9
10
from  multiprocessing  import  Process
  
def  func(name):
     print ( 'hello' , name)
  
  
if  __name__  = =  "__main__" :
     =  Process(target = func,args = ( 'zhangyanlin' ,))
     p.start()
     p.join()   # 等待進程執行完畢

在使用併發設計的時候最好儘量的避免共享數據,尤爲是在使用多進程的時候。 若是你真有須要 要共享數據, multiprocessing提供了兩種方式。


(1)multiprocessing,Array,Value


數據能夠用Value或Array存儲在一個共享內存地圖裏,以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from  multiprocessing  import  Array,Value,Process
  
def  func(a,b):
     a.value  =  3.333333333333333
     for  in  range ( len (b)):
         b[i]  =  - b[i]
  
  
if  __name__  = =  "__main__" :
     num  =  Value( 'd' , 0.0 )
     arr  =  Array( 'i' , range ( 11 ))
  
  
     =  Process(target = func,args = (num,arr))
     d =  Process(target = func,args = (num,arr))
     c.start()
     d.start()
     c.join()
     d.join()
  
     print (num.value)
     for  in  arr:
         print (i)

輸出:

1
2
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

建立num和arr時,「d」和「i」參數由Array模塊使用的typecodes建立:「d」表示一個雙精度的浮點數,「i」表示一個有符號的整數,這些共享對象將被線程安全的處理。


Array(‘i’, range(10))中的‘i’參數:


‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte

‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint

‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double


(2)multiprocessing,Manager


由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from  multiprocessing  import  Process,Manager
def  f(d,l):
     d[ "name" =  "zhangyanlin"
     d[ "age" =  18
     d[ "Job" =  "pythoner"
     l.reverse()
  
if  __name__  = =  "__main__" :
     with Manager() as man:
         =  man. dict ()
         =  man. list ( range ( 10 ))
  
         =  Process(target = f,args = (d,l))
         p.start()
         p.join()
  
         print (d)
         print (l)



輸出:

1
2
{0.25: None, 1:  '1' '2' : 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Server process manager比 shared memory 更靈活,由於它能夠支持任意的對象類型。另外,一個單獨的manager能夠經過進程在網絡上不一樣的計算機之間共享,不過他比shared memory要慢。


二、進程池(Using a pool of workers)


Pool類描述了一個工做進程池,他有幾種不一樣的方法讓任務卸載工做進程。


進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。


咱們能夠用Pool類建立一個進程池, 展開提交的任務給進程池。 例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#apply
from   multiprocessing  import  Pool
import  time
  
def  f1(i):
     time.sleep( 0.5 )
     print (i)
     return  +  100
  
if  __name__  = =  "__main__" :
     pool  =  Pool( 5 )
     for  in  range ( 1 , 31 ):
         pool. apply (func = f1,args = (i,))
  
#apply_async
def  f1(i):
     time.sleep( 0.5 )
     print (i)
     return  +  100
def  f2(arg):
     print (arg)
  
if  __name__  = =  "__main__" :
     pool  =  Pool( 5 )
     for  in  range ( 1 , 31 ):
         pool.apply_async(func = f1,args = (i,),callback = f2)
     pool.close()
     pool.join()

一個進程池對象能夠控制工做進程池的哪些工做能夠被提交,它支持超時和回調的異步結果,有一個相似map的實現。


processes :使用的工做進程的數量,若是processes是None那麼使用 os.cpu_count()返回的數量。

initializer: 若是initializer是None,那麼每個工做進程在開始的時候會調用initializer(*initargs)。

maxtasksperchild:工做進程退出以前能夠完成的任務數,完成後用一個心的工做進程來替代原進程,來讓閒置的資源被釋放。maxtasksperchild默認是None,意味着只要Pool存在工做進程就會一直存活。

context: 用在制定工做進程啓動時的上下文,通常使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來建立一個池,兩種方法都適當的設置了context

注意:Pool對象的方法只能夠被建立pool的進程所調用。


New in version 3.2: maxtasksperchild

New in version 3.4: context


進程池的方法

apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。


apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。


close() : 阻止更多的任務提交到pool,待任務完成後,工做進程會退出。


terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。


join() : wait工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait),不然進程會成爲殭屍進程。


map(func, iterable[, chunksize])¶


map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶


imap(func, iterable[, chunksize])¶


imap_unordered(func, iterable[, chunksize])


starmap(func, iterable[, chunksize])¶


starmap_async(func, iterable[, chunksize[, callback[, error_back]]])


python 協程


線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。


協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。


協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;


event loop是協程執行的控制點, 若是你但願執行協程, 就須要用到它們。


event loop提供了以下的特性:


註冊、執行、取消延時調用(異步函數)

建立用於通訊的client和server協議(工具)

建立和別的程序通訊的子進程和協議(工具)

把函數調用送入線程池中

協程示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
import  asyncio
   
async  def  cor1():
     print ( "COR1 start" )
     await cor2()
     print ( "COR1 end"
相關文章
相關標籤/搜索