進程與線程

操做系統

1.爲何要有操做系統
操做系統,位於底層硬件與應用軟件之間的一層
工做方式:向下管理硬件,向上提供接口

操做系統進程切換:
1.出現IO操做
2.固定時間

進程

1.定義python

進程就是一個程序在一個數據集上的一次動態執行過程。進程通常由程序、數據集、進程控制塊三部分組成。咱們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程當中所須要使用的資源;進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。git

 

進程由三部分組成:github

一、程序:咱們編寫的程序用來描述進程要完成哪些功能以及如何完成redis

二、數據集:數據集則是程序在執行過程當中所須要使用的資源服務器

三、進程控制塊:進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感網絡

     知進程存在的惟一標誌。多線程

線程

線程的出現是爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹同樣事的缺陷,使到進程內併發成爲可能。線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能。線程沒有本身的系統資源。併發

Threading用於提供線程相關的操做。線程是應用程序中工做的最小單元,它被包含在進程之中,是進程中的實際運做單位。一app

條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。dom

 

 

 

進程與線程的關係

進程是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。或者說進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位。
線程則是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。

進程和線程的關係:

(1)一個線程只能屬於一個進程,而一個進程能夠有多個線程,但至少有一個線程。
(2)資源分配給進程,同一進程的全部線程共享該進程的全部資源。
(3)CPU分給線程,即真正在CPU上運行的是線程。

 進程: 資源管理單位(容器)

 線程: 最小執行單位

 

 並行與併發

 

並行處理(Parallel Processing)是計算機系統中能同時執行兩個或更多個處理的一種計算方法。並行處理可同時工做於同一程序的不一樣方面。並行處理的主要目的是節省大型和複雜問題的解決時間。併發處理(concurrency Processing):指一個時間段中有幾個程序都處於已啓動運行到運行完畢之間,且這幾個程序都是在同一個處理機(CPU)上運行,但任一個時刻點上只有一個程序在處理機(CPU)上運行

併發的關鍵是你有處理多個任務的能力,不必定要同時。並行的關鍵是你有同時處理多個任務的能力。因此說,並行是併發的子集

 

 同步與異步

 在計算機領域,同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到收到返回信息才繼續執行下去;異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。舉個例子,打電話時就是同步通訊,發短息時就是異步通訊。

 

1. 實現線程併發

 示例1:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3
 4 
 5 import threading  #線程
 6 import time
 7 
 8 def Hi(num): #有一個參數
 9     print("hello %s" %num)
10     time.sleep(3)  
11 
12 if __name__ == '__main__':
13 
14     t1=threading.Thread(target=Hi,args=(10,))  #建立了一個線程對象t1,10作爲一個參數,傳給num
15     t1.start()
16 
17     t2=threading.Thread(target=Hi,args=(9,))   #建立了一個線程對象t2,9作爲一個參數,傳給num
18     t2.start()
19 
20     print("ending.........")  #主線程輸出ending

執行結果:

1 hello 10    #子線程
2 hello 9     #子線程
3 ending.........   #主線程
4 #上面三個同時出來,再停頓三秒才結束
5 Process finished with exit code 0  #停頓3秒才結束

示例2:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t1.start()
22     t2=threading.Thread(target=game)
23     t2.start()

 

執行結果:

1 #總共花了5秒時間
2 
3 begin to listen Sat Jan 14 12:34:43 2017
4 begin to play game Sat Jan 14 12:34:43 2017  #一、先打印2個
5 
6 stop to listen Sat Jan 14 12:34:46 2017      #二、等待3秒再打印一個
7 
8 stop to play game Sat Jan 14 12:34:48 2017   #三、再等待2秒,打印一個

 

 2.使用 join方法

示例1:

1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()  #運行實例的方法
24     t2.start()
25 
26     t1.join()   #子線程對象調用join()方法
27     t2.join()
28 
29     print("ending")  #在主線程中

 

 執行結果:

1 begin to listen Sat Jan 14 12:58:34 2017
2 begin to play game Sat Jan 14 12:58:34 2017  #先打印2個
3 
4 stop to listen Sat Jan 14 12:58:37 2017      #等待3秒,再打印一個
5 
6 stop to play game Sat Jan 14 12:58:39 2017   #等待2秒,再打印兩個
7 ending  

示例2:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()  #運行實例的方法
24     t2.start()
25 
26     t1.join()   #t1線程不結束,誰都不往下走
27 
28     print("ending")  

執行結果:

1 begin to listen Sat Jan 14 13:06:07 2017
2 begin to play game Sat Jan 14 13:06:07 2017  #先打印這兩行
3 
4 stop to listen Sat Jan 14 13:06:10 2017      #再等待3秒打印這兩行
5 ending
6 
7 stop to play game Sat Jan 14 13:06:12 2017   #再等待2秒打印這行

示例3:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()  #運行實例的方法
24     t2.start()
25 
26     t2.join()
27 
28     print("ending")  #在主線程中

執行結果:

1 begin to listen Sat Jan 14 13:12:34 2017     #先打印這兩行
2 begin to play game Sat Jan 14 13:12:34 2017
3 
4 stop to listen Sat Jan 14 13:12:37 2017      #等待3秒,打印這一行
5 
6 stop to play game Sat Jan 14 13:12:39 2017   #等待2秒,打印這兩行
7 ending

示例4: 並無實現併發(實現多線程的意義)

 

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()
24 
25     t1.join()
26     t2.start()
27     
28     t2.join()
29 
30     print("ending")  #在主線程中

執行結果:

1 begin to listen Sat Jan 14 13:26:18 2017    #先打印條1行
2 
3 stop to listen Sat Jan 14 13:26:21 2017     #等待3秒再打印2行
4 begin to play game Sat Jan 14 13:26:21 2017
5 
6 stop to play game Sat Jan 14 13:26:26 2017  #等待5秒打印2行
7 ending

  

 線程調用方法:

1.直接調用

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 
 4 
 5 import threading
 6 import time
 7 
 8 
 9 def sayhi(num):  # 定義每一個線程要運行的函數
10 
11     print("running on number:%s" % num)
12 
13     time.sleep(3)
14 
15 
16 if __name__ == '__main__':
17     t1 = threading.Thread(target=sayhi, args=(1,))  # 生成一個線程實例
18     t2 = threading.Thread(target=sayhi, args=(2,))  # 生成另外一個線程實例
19 
20     t1.start()  # 啓動線程
21     t2.start()  # 啓動另外一個線程
22 
23     print(t1.getName())  # 獲取線程名
24     print(t2.getName())

執行結果:

1 running on number:1
2 running on number:2
3 Thread-1
4 Thread-2

  

 2.繼承式調用

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 
 4 
 5 import threading
 6 import time
 7 
 8 #本身定製一個MyThread的類
 9 class MyThread(threading.Thread):  
10     def __init__(self, num):
11         threading.Thread.__init__(self)
12         self.num = num
13 
14     def run(self):  # 定義每一個線程要運行的函數
15 
16         print("running on number:%s" % self.num)
17 
18         time.sleep(3)
19 
20 
21 if __name__ == '__main__':
22     t1 = MyThread(1)  #繼承這個類,把1這個參數,傳給num ,t1就是個線程對象
23     t2 = MyThread(2)
24     t1.start()
25     t2.start()
26 
27     print("ending......")

執行結果:

1 running on number:1
2 running on number:2
3 ending......

  

 用Daemon方法示例(設置t爲守護線程,就是字線程,跟着主線程一塊兒推出)

   daemon: 程序直到不存在非守護線程時退出。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     for t in threads:
31         t.setDaemon(True) #設置t爲守護線程; 注意:必定在start()以前設置,不然會報錯
32 
33         t.start()
34 
35     print ("all over %s" %ctime())

 

 執行結果:

1 Begin listening to 水手. Sat Jan 14 13:51:30 2017    #三個同時打印出來
2 Begin recording the python線程! Sat Jan 14 13:51:30 2017
3 all over Sat Jan 14 13:51:30 2017

  

示例3: 設置t1爲守護線程,沒有意義,達不到效果,由於t2還會繼續執行

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     t1.setDaemon(True)  #設置t1爲守護線程; 注意:必定在start以前設置,不然會報錯
31     for t in threads:
32 
33         t.start()
34 
35     print ("all over %s" %ctime())

 

執行結果:

1 Begin listening to 水手. Sat Jan 14 14:02:07 2017
2 Begin recording the python線程! Sat Jan 14 14:02:07 2017
3 all over Sat Jan 14 14:02:07 2017          #設置t1爲守護線程,因此會先把這三條先打印出來
4 
5 end listening Sat Jan 14 14:02:10 2017     #再等待3秒打印t2,
6 
7 end recording Sat Jan 14 14:02:12 2017     #再等待3秒打印這條出來

 

示例4:設置t2爲守護線程,子線程纔會跟着主線程一塊兒退出

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     t2.setDaemon(True)  # 設置t2爲守護線程; 注意:必定在start以前設置,不然會報錯
31     for t in threads:
32 
33         t.start()
34 
35     print ("all over %s" %ctime())

執行結果:

1 Begin listening to 水手. Sat Jan 14 14:17:09 2017
2 Begin recording the python線程! Sat Jan 14 14:17:09 2017
3 all over Sat Jan 14 14:17:09 2017       #先打印這三條
4 
5 end listening Sat Jan 14 14:17:12 2017  #等待3秒,再打印這條;t1結束後,主線程也結束了。

 

其餘方法:

Thread實例對象的方法
# isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     t2.setDaemon(True)  # 設置t爲守護進程; 注意:必定在start以前設置,不然會報錯
31     for t in threads:
32         t.start()
33         print(t.getName())    #返回線程名稱:Thread-1
34 
35     print ("all over %s" %ctime())

 

 

GIL(全局解釋器鎖)

不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行。

 

同步鎖

 

1.不加鎖(拿到的值是不固定的)

 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每一個線程中都獲取這個全局變量
 6     #num-=1
 7 
 8     temp=num
 9     time.sleep(0.1)
10     num =temp-1  # 對此公共變量進行-1操做
11 
12 num = 100  #設定一個共享變量
13 
14 thread_list = []
15 
16 for i in range(100):
17     t = threading.Thread(target=addNum)
18     t.start()
19     thread_list.append(t)
20 
21 for t in thread_list: #等待全部線程執行完畢
22     t.join()
23 
24 print('Result: ', num)

2.加鎖(互斥鎖,就是把多線程變成串行,結果不會變)

 1 import time
 2 import threading
 3 
 4 def subNum():
 5     global num
 6     lock.acquire()
 7     temp=num
 8     time.sleep(0.01)
 9     num =temp-1  #對此公共變量進行-1操做
10     lock.release()
11 
12 num = 100  #設定一個共享變量
13 thread_list = []
14 lock=threading.Lock()
15 for i in range(100):
16     t = threading.Thread(target=subNum)
17     t.start()
18     thread_list.append(t)
19 
20 for t in thread_list: #等待全部線程執行完畢
21     t.join()
22 print('Result: ',num)

 

死鎖和遞歸鎖

 

 所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。

死鎖:

 1 import threading
 2 import time
 3 
 4 mutexA = threading.Lock()
 5 mutexB = threading.Lock()
 6 
 7 class MyThread(threading.Thread):
 8 
 9     def __init__(self):
10         threading.Thread.__init__(self)
11 
12     def run(self):
13         self.fun1()
14         self.fun2()
15 
16     def fun1(self):
17 
18         mutexA.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放
19 
20         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
21 
22         mutexB.acquire()
23         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
24         mutexB.release()
25         mutexA.release()
26 
27 
28     def fun2(self):
29 
30         mutexB.acquire()
31         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
32         time.sleep(0.2)
33 
34         mutexA.acquire()
35         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
36         mutexA.release()
37 
38         mutexB.release()
39 
40 if __name__ == "__main__":
41 
42     print("start---------------------------%s"%time.time())
43 
44     for i in range(0, 10):
45         my_thread = MyThread()
46         my_thread.start()

 

在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

遞歸鎖:

 1 import threading
 2 import time
 3 
 4 # mutexA = threading.Lock()
 5 # mutexB = threading.Lock()
 6 
 7 Rlock=threading.RLock()
 8 
 9 class MyThread(threading.Thread):
10 
11     def __init__(self):
12         threading.Thread.__init__(self)
13 
14     def run(self):
15 
16         self.fun1()
17         self.fun2()
18 
19     def fun1(self):
20 
21         Rlock.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放
22 
23         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
24 
25         Rlock.acquire()  # count=2
26         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
27         Rlock.release()   #count-1
28 
29         Rlock.release()   #count-1 =0
30 
31 
32     def fun2(self):
33         Rlock.acquire()  # count=1
34         print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
35         time.sleep(0.2)
36 
37         Rlock.acquire()  # count=2
38         print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
39         Rlock.release()
40 
41         Rlock.release()   # count=0
42 
43 
44 if __name__ == "__main__":
45 
46     print("start---------------------------%s"%time.time())
47 
48     for i in range(0, 10):
49 
50         my_thread = MyThread()
51         my_thread.start()

 

Event對象

 

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

 

event.isSet():返回event的狀態值;

event.wait():若是 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。

 

 

 能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。

 

 

 1 import threading
 2 import time
 3 import logging
 4 
 5 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
 6 
 7 def worker(event):
 8     logging.debug('Waiting for redis ready...')
 9     event.wait()
10     logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
11     time.sleep(1)
12 
13 def main():
14     readis_ready = threading.Event()
15     t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
16     t1.start()
17 
18     t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
19     t2.start()
20 
21     logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
22     time.sleep(3) # simulate the check progress
23     readis_ready.set()
24 
25 if __name__=="__main__":
26     main()

threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:

複製代碼
def worker(event):
    while not event.is_set():
        logging.debug('Waiting for redis ready...')
        event.wait(2)
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)
複製代碼

這樣,咱們就能夠在等待Redis服務啓動的同時,看到工做線程里正在等待的狀況。

 

 Semaphore(信號量)

 

 

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

 1 import threading
 2 import time
 3 
 4 semaphore = threading.Semaphore(5)
 5 
 6 def func():
 7     if semaphore.acquire():
 8         print (threading.currentThread().getName() + ' get semaphore') #當前運行線程.獲取線程名
 9         time.sleep(2)
10         semaphore.release()
11 
12 for i in range(20):
13   t1 = threading.Thread(target=func)
14   t1.start()

 

隊列

 

get和put方法

建立一個「隊列」對象

import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數
maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;
第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,
put方法將引起Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且
block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

 

 1 Python Queue模塊有三種隊列及構造函數: 
 2 
 3 一、Python Queue模塊的FIFO隊列先進先出。  class queue.Queue(maxsize) 
 4 二、LIFO相似於堆,即先進後出。           class queue.LifoQueue(maxsize) 
 5 三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 
 6 
 7 
 8 import queue
 9 
10 #先進後出
11 
12 q=queue.LifoQueue()
13 
14 q.put(34)
15 q.put(56)
16 q.put(12)
17 
18 #優先級
19 q=queue.PriorityQueue()
20 q.put([5,100])
21 q.put([7,200])
22 q.put([3,"hello"])
23 q.put([4,{"name":"alex"}])
24 
25 while 1:
26   data=q.get()
27   print(data)

 

join和task_done方法

 1 join() 阻塞進程,直到全部任務完成,須要配合另外一個方法task_done。
 2 
 3     def join(self):
 4      with self.all_tasks_done:
 5       while self.unfinished_tasks:
 6        self.all_tasks_done.wait()
 7 
 8 task_done() 表示某個任務完成。每一條get語句後須要一條task_done。
 9 
10 
11 import queue
12 q = queue.Queue(5)
13 q.put(10)
14 q.put(20)
15 print(q.get())
16 q.task_done()
17 print(q.get())
18 q.task_done()
19 
20 q.join()
21 
22 print("ending!")

 

 

 

其餘經常使用方法:

此包中的經常使用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小 q.empty() 若是隊列爲空,返回True,反之False q.full() 若是隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 至關q.get(False)非阻塞
q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 至關q.put(item, False) q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列爲空,再執行別的操做

 

應用 生產者消費者模型:

 

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。

 

 1 import time,random
 2 import queue,threading
 3 
 4 q = queue.Queue()
 5 
 6 def Producer(name):
 7   count = 0
 8   while count <10:
 9     print("making........")
10     time.sleep(random.randrange(3))
11     q.put(count)
12     print('Producer %s has produced %s baozi..' %(name, count))
13     count +=1
14     #q.task_done()
15     #q.join()
16     print("ok......")
17 def Consumer(name):
18   count = 0
19   while count <10:
20     time.sleep(random.randrange(4))
21     if not q.empty():
22         data = q.get()
23         #q.task_done()
24         #q.join()
25         print(data)
26         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
27     else:
28         print("-----no baozi anymore----")
29     count +=1
30 
31 p1 = threading.Thread(target=Producer, args=('A',))
32 c1 = threading.Thread(target=Consumer, args=('B',))
33 # c2 = threading.Thread(target=Consumer, args=('C',))
34 # c3 = threading.Thread(target=Consumer, args=('D',))
35 p1.start()
36 c1.start()
37 # c2.start()
38 # c3.start()

 

 

協程函數

1.因爲單線程,不能再切換
2.再也不有任何鎖的改變
(加鎖是爲了保護數據)
 1 import time
 2 
 3 """
 4 傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。
 5 若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。
 6 """
 7 # 注意到consumer函數是一個generator(生成器):
 8 # 任何包含yield關鍵字的函數都會自動成爲生成器(generator)對象
 9 
10 def consumer():
11     r = ''
12     while True:
13         # 三、consumer經過yield拿到消息,處理,又經過yield把結果傳回;
14         #    yield指令具備return關鍵字的做用。而後函數的堆棧會自動凍結(freeze)在這一行。
15         #    當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,
16         #    就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。經過這種方式,迭代器能夠實現無限序列和惰性求值。
17         n = yield r
18         if not n:
19             return
20         print('[CONSUMER] ←← Consuming %s...' % n)
21         time.sleep(1)
22         r = '200 OK'
23 def produce(c):
24     # 一、首先調用c.next()啓動生成器
25     next(c)
26     n = 0
27     while n < 5:
28         n = n + 1
29         print('[PRODUCER] →→ Producing %s...' % n)
30         # 二、而後,一旦生產了東西,經過c.send(n)切換到consumer執行;
31         cr = c.send(n)
32         # 四、produce拿到consumer處理的結果,繼續生產下一條消息;
33         print('[PRODUCER] Consumer return: %s' % cr)
34     # 五、produce決定不生產了,經過c.close()關閉consumer,整個過程結束。
35     c.close()
36 if __name__=='__main__':
37     # 六、整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。
38     c = consumer()
39     produce(c)
40     
41     
42 '''
43 result:
44 
45 [PRODUCER] →→ Producing 1...
46 [CONSUMER] ←← Consuming 1...
47 [PRODUCER] Consumer return: 200 OK
48 [PRODUCER] →→ Producing 2...
49 [CONSUMER] ←← Consuming 2...
50 [PRODUCER] Consumer return: 200 OK
51 [PRODUCER] →→ Producing 3...
52 [CONSUMER] ←← Consuming 3...
53 [PRODUCER] Consumer return: 200 OK
54 [PRODUCER] →→ Producing 4...
55 [CONSUMER] ←← Consuming 4...
56 [PRODUCER] Consumer return: 200 OK
57 [PRODUCER] →→ Producing 5...
58 [CONSUMER] ←← Consuming 5...
59 [PRODUCER] Consumer return: 200 OK
60 '''

 

grennlet

greenlet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍後使用next()或send()操做進行恢復爲止。可使用一個調度器循環在一組生成器函數之間協做多個任務。greentlet是python中實現咱們所謂的"Coroutine(協程)"的一個基礎庫.

 

import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)

  

 

gevent

Python經過yield提供了對協程的基本支持,可是不徹底。而第三方的gevent爲Python提供了比較完善的協程支持。

gevent是第三方庫,經過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:

 

from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

start=time.time()

gevent.joinall([
        gevent.spawn(f, 'https://itk.org/'),
        gevent.spawn(f, 'https://www.github.com/'),
        gevent.spawn(f, 'https://zhihu.com/'),
])

# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')

print(time.time()-start)
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息