Python併發編程-進程 線程 同步鎖 線程死鎖和遞歸鎖

進程是最小的資源單位,線程是最小的執行單位html

1、進程                                                                                                                                                       python

進程:就是一個程序在一個數據集上的一次動態執行過程。linux

進程由三部分組成:面試

一、程序:咱們編寫的程序用來描述進程要完成哪些功能以及如何完成編程

二、數據集:數據集則是程序在執行過程當中所須要使用的資源bootstrap

三、進程控制塊:進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感windows

     知進程存在的惟一標誌。api

2、線程                                                                                                                                                            安全

   Threading用於提供線程相關的操做。線程是應用程序中工做的最小單元,它被包含在進程之中,是進程中的實際運做單位。一數據結構

條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。

 

一、實現線程併發

 示例1:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading  #線程
 6 import time
 7 
 8 def Hi(num): #有一個參數  9     print("hello %s" %num)
10     time.sleep(3)  
11 
12 if __name__ == '__main__':
13 
14     t1=threading.Thread(target=Hi,args=(10,))  #建立了一個線程對象t1,10作爲一個參數,傳給num
15     t1.start()
16 
17     t2=threading.Thread(target=Hi,args=(9,))   #建立了一個線程對象t2,9作爲一個參數,傳給num
18     t2.start()
19 
20     print("ending.........")  #主線程輸出ending

執行結果:

1 hello 10    #子線程
2 hello 9     #子線程
3 ending.........   #主線程 4 #上面三個同時出來,再停頓三秒才結束
5 Process finished with exit code 0  #停頓3秒才結束

 

示例2:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t1.start()
22     t2=threading.Thread(target=game)
23     t2.start()

執行結果:

1 #總共花了5秒時間
2 
3 begin to listen Sat Jan 14 12:34:43 2017
4 begin to play game Sat Jan 14 12:34:43 2017  #一、先打印2個
5 
6 stop to listen Sat Jan 14 12:34:46 2017      #二、等待3秒再打印一個
7 
8 stop to play game Sat Jan 14 12:34:48 2017   #三、再等待2秒,打印一個

 

二、使用join方法

示例1:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()  #運行實例的方法
24     t2.start()
25 
26     t1.join()   #子線程對象調用join()方法 27     t2.join()
28 
29     print("ending")  #在主線程中

執行結果:

begin to listen Sat Jan 14 12:58:34 2017
begin to play game Sat Jan 14 12:58:34 2017  #先打印2個

stop to listen Sat Jan 14 12:58:37 2017      #等待3秒,再打印一個

stop to play game Sat Jan 14 12:58:39 2017   #等待2秒,再打印兩個
ending  

 

示例2:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()  #運行實例的方法
24     t2.start()
25 
26     t1.join()   #t1線程不結束,誰都不往下走
27 
28     print("ending")  

執行結果:

1 begin to listen Sat Jan 14 13:06:07 2017
2 begin to play game Sat Jan 14 13:06:07 2017  #先打印這兩行
3 
4 stop to listen Sat Jan 14 13:06:10 2017      #再等待3秒打印這兩行
5 ending
6 
7 stop to play game Sat Jan 14 13:06:12 2017   #再等待2秒打印這行

 

示例3:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()  #運行實例的方法
24     t2.start()
25 
26     t2.join()
27 
28     print("ending")  #在主線程中

執行結果:

1 begin to listen Sat Jan 14 13:12:34 2017     #先打印這兩行
2 begin to play game Sat Jan 14 13:12:34 2017
3 
4 stop to listen Sat Jan 14 13:12:37 2017      #等待3秒,打印這一行
5 
6 stop to play game Sat Jan 14 13:12:39 2017   #等待2秒,打印這兩行
7 ending

 

示例4:並無實現併發(失去多線程的意義)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 def music():
 9     print("begin to listen %s"%time.ctime())
10     time.sleep(3)
11     print("stop to listen %s" %time.ctime())
12 
13 def game():
14     print("begin to play game %s"%time.ctime())
15     time.sleep(5)
16     print("stop to play game %s" %time.ctime())
17 
18 if __name__ == '__main__':
19 
20     t1=threading.Thread(target=music)
21     t2=threading.Thread(target=game)
22 
23     t1.start()
24 
25     t1.join()
26     t2.start()
27     
28     t2.join()
29 
30     print("ending")  #在主線程中

執行結果:

1 begin to listen Sat Jan 14 13:26:18 2017    #先打印條1行
2 
3 stop to listen Sat Jan 14 13:26:21 2017     #等待3秒再打印2行
4 begin to play game Sat Jan 14 13:26:21 2017
5 
6 stop to play game Sat Jan 14 13:26:26 2017  #等待5秒打印2行
7 ending

 

3、線程的兩種調用方式                                                                                                                                       

  threading 模塊創建在 thread 模塊之上。thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊經過對 thread

進行二次封裝,提供了更方便的 api 來處理線程。

一、直接調用(推薦寫法)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 
 9 def sayhi(num):  # 定義每一個線程要運行的函數
10 
11     print("running on number:%s" % num)
12 
13     time.sleep(3)
14 
15 
16 if __name__ == '__main__':
17     t1 = threading.Thread(target=sayhi, args=(1,))  # 生成一個線程實例
18     t2 = threading.Thread(target=sayhi, args=(2,))  # 生成另外一個線程實例
19 
20     t1.start()  # 啓動線程
21     t2.start()  # 啓動另外一個線程
22 
23     print(t1.getName())  # 獲取線程名
24     print(t2.getName())

執行結果:

1 running on number:1
2 running on number:2
3 Thread-1
4 Thread-2

 

二、繼承式調用(有些編程人員會用這種寫法,也要能看懂。不推薦這種寫法)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 #本身定製一個MyThread的類
 9 class MyThread(threading.Thread):  
10     def __init__(self, num):
11         threading.Thread.__init__(self)
12         self.num = num
13 
14     def run(self):  # 定義每一個線程要運行的函數
15 
16         print("running on number:%s" % self.num)
17 
18         time.sleep(3)
19 
20 
21 if __name__ == '__main__':
22     t1 = MyThread(1)  #繼承這個類,把1這個參數,傳給num ,t1就是個線程對象 23     t2 = MyThread(2)
24     t1.start()
25     t2.start()
26 
27     print("ending......")

執行結果:

1 running on number:1
2 running on number:2
3 ending......

 

4、 threading.thread的實例方法                                                                                                                   

一、join&Daemon方法

示例1:沒有用Daemon方法示例

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 #Author: nulige
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     for t in threads:
31         t.start()
32 
33     print ("all over %s" %ctime())

執行結果:

1 Begin listening to 水手. Sat Jan 14 13:44:10 2017
2 Begin recording the python線程! Sat Jan 14 13:44:10 2017
3 all over Sat Jan 14 13:44:10 2017         #先打印三個出來; 主線程結束了
4 
5 end listening Sat Jan 14 13:44:13 2017    #等待3秒,打印這1個; 子線程尚未結束,會繼續往下運行
6 
7 end recording Sat Jan 14 13:44:15 2017    #再等待2秒,打印這1個

 

示例2: 用Daemon方法示例(設置t爲守護線程,就是子線程,跟着主線程一塊兒退出)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 #Author: nulige
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     for t in threads:
31         t.setDaemon(True) #設置t爲守護線程; 注意:必定在start()以前設置,不然會報錯
32 
33         t.start()
34 
35     print ("all over %s" %ctime())

執行結果:

1 Begin listening to 水手. Sat Jan 14 13:51:30 2017    #三個同時打印出來
2 Begin recording the python線程! Sat Jan 14 13:51:30 2017
3 all over Sat Jan 14 13:51:30 2017

 

示例3:設置t1爲守護線程,沒有意義,達不到效果,由於t2還會繼續執行

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 #Author: nulige
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     t1.setDaemon(True)  #設置t1爲守護線程; 注意:必定在start以前設置,不然會報錯
31     for t in threads:
32 
33         t.start()
34 
35     print ("all over %s" %ctime())

執行結果:

1 Begin listening to 水手. Sat Jan 14 14:02:07 2017
2 Begin recording the python線程! Sat Jan 14 14:02:07 2017
3 all over Sat Jan 14 14:02:07 2017          #設置t1爲守護線程,因此會先把這三條先打印出來
4 
5 end listening Sat Jan 14 14:02:10 2017     #再等待3秒打印t2,
6 
7 end recording Sat Jan 14 14:02:12 2017     #再等待3秒打印這條出來

 

示例4:設置t2爲守護線程,子線程纔會跟着主線程一塊兒退出

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 #Author: nulige
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     t2.setDaemon(True)  # 設置t2爲守護線程; 注意:必定在start以前設置,不然會報錯
31     for t in threads:
32 
33         t.start()
34 
35     print ("all over %s" %ctime())

執行結果:

1 Begin listening to 水手. Sat Jan 14 14:17:09 2017
2 Begin recording the python線程! Sat Jan 14 14:17:09 2017
3 all over Sat Jan 14 14:17:09 2017       #先打印這三條
4 
5 end listening Sat Jan 14 14:17:12 2017  #等待3秒,再打印這條;t1結束後,主線程也結束了。
 

 

二、一道面試題

 1 #執行結果是什麼?
 2 
 3 i = 0
 4 for i in range(10): 5 i += 1 6 print(i) 7 8 執行結果: 9 10

 

三、其它方法

示例:getName()方法 (通常沒什麼用)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 #Author: nulige
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     t2.setDaemon(True)  # 設置t爲守護進程; 注意:必定在start以前設置,不然會報錯
31     for t in threads:
32         t.start()
33         print(t.getName())    #返回線程名稱:Thread-1
34 
35     print ("all over %s" %ctime())

執行結果:

1 Begin listening to 水手. Sat Jan 14 14:36:44 2017
2 Thread-1   #返回線程名稱
3 Begin recording the python線程! Sat Jan 14 14:36:44 2017
4 Thread-2   #返回默認的線程名稱
5 all over Sat Jan 14 14:36:44 2017
6 end listening Sat Jan 14 14:36:47 2017

 

 示例:threading.activeCount(),返回正在運行的線程數量

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 #Author: nulige
 4 
 5 import threading
 6 from time import ctime,sleep
 7 import time
 8 
 9 def ListenMusic(name):
10 
11         print ("Begin listening to %s. %s" %(name,ctime()))
12         sleep(3)
13         print("end listening %s"%ctime())
14 
15 def RecordBlog(title):
16 
17         print ("Begin recording the %s! %s" %(title,ctime()))
18         sleep(5)
19         print('end recording %s'%ctime())
20 
21 #建立一個列表,把t1和t2加到列表中去
22 threads = []
23 t1 = threading.Thread(target=ListenMusic,args=('水手',))
24 t2 = threading.Thread(target=RecordBlog,args=('python線程',))
25 threads.append(t1)
26 threads.append(t2)
27 
28 if __name__ == '__main__':
29 
30     t2.setDaemon(True)  #設置t爲守護進程; 注意:必定在start以前設置,不然會報錯
31     for t in threads:
32         t.start()
33 
34         print("count:", threading.active_count())   #判斷有多少個線程的數量
35 
36     while threading.active_count()==1:  #等於1就至關於只有一個主線程,沒有子線程
37 
38         print ("all over %s" %ctime())

執行結果:

1 Begin listening to 水手. Sat Jan 14 14:49:00 2017
2 count: 2
3 Begin recording the python線程! Sat Jan 14 14:49:00 2017
4 count: 3  #獲得的線程數量
5 end listening Sat Jan 14 14:49:03 2017

 

5、進程與線程的關係區別                                                                                                                                    

一、 一個程序至少有一個進程,一個進程至少有一個線程.(進程能夠理解成線程的容器)
二、 進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率。
三、 線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和 程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
四、 進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調 度的一個獨立單位. 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程 本身基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)可是 它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源. 一個線程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行.

 

6、python的GIL                                                                                                                                              

  In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once.
This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features havegrown
to depend on the guarantees that it enforces.) 上面的核心意思就是:不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行。

 

常見概念:

一、什麼是併發和並行?

併發:是指系統具備處理多個任務(動做)的能力(CPU經過切換來完成併發),併發是並行的一個子集。

並行:是指系統具備同時處理多個任務(動做)的能力

二、同步與異步的區別?

同步: 當進程執行到一個IO(等待外部數據)的時候你---->會一直等:同步 (示例: 打電話)

異步:當進程執行到一個IO(等待外部數據)的時候你---->不等:一直等到數據接收成功,再回來處理。異步效率更高(示例:發短信)

三、任務分爲

一、對於IO密集型的任務:  python的多線程是有意義的,能夠採用:多進程+協程的方式
二、對於計算密集型的任務:python的多線程就不推薦。python就不適用了。

 

7、同步鎖                                                                                                                                                               

 示例1:不加鎖(拿到的值是不固定的)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 def sub():
 9     global num
10 
11     # num -= 1
12     temp=num
13     time.sleep(0.001)   #別外75個線程,拿到100了,時間不固定。
14     num=temp-1
15 
16 num =100
17 
18 l=[]
19 
20 for i in range(100):
21     t=threading.Thread(target=sub)
22     t.start()
23     l.append(t)
24 
25 for t in l:
26     t.join()
27 
28 print(num)

執行結果:

1 73 or 75  (這個值是隨機的,會不斷變化)

 

示例2:加鎖 (加鎖的做用:就是把多線程變成串行,結果不會變)

 1 #加鎖的做用:就是把多線程變成串行,結果就不會變)
 2 
 3 #!/usr/bin/env python
 4 # -*- coding:utf-8 -*- 
 5 #Author: nulige
 6 
 7 import threading
 8 import time
 9 
10 def sub():
11 
12     global num
13 
14     # num -= 1
15     lock.acquire()  #獲取鎖
16     temp=num
17     time.sleep(0.001)  
18     num=temp-1
19     lock.release()  #釋放鎖
20 
21 num =100
22 
23 l=[]
24 lock=threading.Lock()
25 
26 for i in range(100):
27     t=threading.Thread(target=sub)
28     t.start()
29     l.append(t)
30 
31 for t in l:
32     t.join()
33 
34 print (num)

執行結果:

1 0

 

GIL:全局解釋器鎖
做用:保證同一時刻,只有一個線程被CPU執行,不管你有多少個線程。

爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體咱們經過下圖進行講解

  既然用戶程序已經本身有鎖了,那爲何C python還須要GIL呢?加入GIL主要的緣由是爲了下降程序的開發的複雜度,好比如今的你寫python不須要關心內存回收的問題,由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,這能夠說是Python早期版本的遺留問題。

 

8、線程死鎖和遞歸鎖                                                                                                                                                 

  在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都

正在使用,全部這兩個線程在無外力做用下將一直等待下去。

示例1:線程死鎖

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 # Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 
 9 class MyThread(threading.Thread):
10     def actionA(self):
11         A.acquire()  # count=1
12         print(self.name, "gotA", time.ctime())
13         time.sleep(2)
14 
15         B.acquire()  # count=2
16         print(self.name, "gotB", time.ctime())
17         time.sleep(1)
18 
19         B.release()  # count=1
20         A.release()  # count=0
21 
22     def actionB(self):
23         B.acquire()  # count=1
24         print(self.name, "gotB", time.ctime())
25         time.sleep(2)
26 
27         A.acquire()  # count=2
28         print(self.name, "gotA", time.ctime())
29         time.sleep(1)
30 
31         A.release()  # count=1
32         B.release()  # count=0
33 
34     def run(self):
35         self.actionA()
36         self.actionB()
37 
38 
39 if __name__ == '__main__':
40 
41     A = threading.Lock()
42     B = threading.Lock()
43 
44     L = []
45 
46     for i in range(5):
47         t = MyThread()
48         t.start()
49         L.append(t)
50 
51     for i in L:
52         i.join()
53 
54     print("ending.....")

執行結果:

1 Thread-1 gotA Mon Jan 16 17:33:58 2017
2 Thread-1 gotB Mon Jan 16 17:34:00 2017
3 Thread-1 gotB Mon Jan 16 17:34:01 2017
4 Thread-2 gotA Mon Jan 16 17:34:01 2017  #死鎖,一直卡在這裏

解決辦法:

使用遞歸鎖,將

1
2
lockA = threading.Lock()
lockB = threading.Lock()<br> #--------------<br>lock=threading.RLock()

爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。

 

示例2:遞歸鎖(解決死鎖問題)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import threading
 6 import time
 7 
 8 class MyThread(threading.Thread):
 9 
10     def actionA(self):
11 
12         r_lcok.acquire()  #count=1
13         print(self.name,"gotA",time.ctime())
14         time.sleep(2)
15 
16         r_lcok.acquire()  #count=2
17         print(self.name,"gotB",time.ctime())
18         time.sleep(1)
19 
20         r_lcok.release()  #count=1
21         r_lcok.release()  #count=0
22 
23 
24     def actionB(self):
25 
26         r_lcok.acquire()  #count=1
27         print(self.name,"gotB",time.ctime())
28         time.sleep(2)
29 
30         r_lcok.acquire()  #count=2
31         print(self.name,"gotA",time.ctime())
32         time.sleep(1)
33 
34         r_lcok.release()  #count=1
35         r_lcok.release()  #count=0
36 
37     def run(self):
38 
39             self.actionA()
40             self.actionB()
41 
42 if __name__ == '__main__':
43 
44     r_lcok=threading.RLock()
45     L=[]
46 
47     for i in range(5):
48         t=MyThread()
49         t.start()
50         L.append(t)
51 
52     for i in L:
53         i.join()
54 
55     print("ending.....")

執行結果:

 1 Thread-1 gotA Mon Jan 16 17:38:42 2017
 2 Thread-1 gotB Mon Jan 16 17:38:44 2017
 3 Thread-1 gotB Mon Jan 16 17:38:45 2017
 4 Thread-1 gotA Mon Jan 16 17:38:47 2017
 5 Thread-3 gotA Mon Jan 16 17:38:48 2017
 6 Thread-3 gotB Mon Jan 16 17:38:50 2017
 7 Thread-4 gotA Mon Jan 16 17:38:51 2017
 8 Thread-4 gotB Mon Jan 16 17:38:53 2017
 9 Thread-5 gotA Mon Jan 16 17:38:54 2017
10 Thread-5 gotB Mon Jan 16 17:38:56 2017
11 Thread-5 gotB Mon Jan 16 17:38:57 2017
12 Thread-5 gotA Mon Jan 16 17:38:59 2017
13 Thread-3 gotB Mon Jan 16 17:39:00 2017
14 Thread-3 gotA Mon Jan 16 17:39:02 2017
15 Thread-4 gotB Mon Jan 16 17:39:03 2017
16 Thread-4 gotA Mon Jan 16 17:39:05 2017
17 Thread-2 gotA Mon Jan 16 17:39:06 2017
18 Thread-2 gotB Mon Jan 16 17:39:08 2017
19 Thread-2 gotB Mon Jan 16 17:39:09 2017
20 Thread-2 gotA Mon Jan 16 17:39:11 2017
21 ending.....
View Code

 

9、同步條件(Event)                                                                                                                                        

An event is a simple synchronization object;the event represents an internal flag,
and threads can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set() event.clear()

If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

示例:

 1 import threading,time
 2 class Boss(threading.Thread):
 3     def run(self):
 4         print("BOSS:今晚你們都要加班到22:00。")
 5         print(event.isSet())
 6         event.set()
 7         time.sleep(5)
 8         print("BOSS:<22:00>能夠下班了。")
 9         print(event.isSet())
10         event.set()
11 class Worker(threading.Thread):
12     def run(self):
13         event.wait()
14         print("Worker:哎……命苦啊!")
15         time.sleep(1)
16         event.clear()
17         event.wait()
18         print("Worker:OhYeah!")
19 if __name__=="__main__":
20     event=threading.Event()
21     threads=[]
22     for i in range(5):
23         threads.append(Worker())
24     threads.append(Boss())
25     for t in threads:
26         t.start()
27     for t in threads:
28         t.join()

執行結果:

 1 BOSS:今晚你們都要加班到22:00 2 False
 3 Worker:哎……命苦啊!
 4 Worker:哎……命苦啊!
 5 Worker:哎……命苦啊!
 6 Worker:哎……命苦啊!
 7 Worker:哎……命苦啊!
 8 BOSS:<22:00>能夠下班了。
 9 False
10 Worker:OhYeah!
11 Worker:OhYeah!
12 Worker:OhYeah!
13 Worker:OhYeah!
14 Worker:OhYeah!
15 ending.....
View Code

 

10、信號量(Semaphore)                                                                                                                                   

        信號量:指同時開幾個線程併發

    信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。

  計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)

    BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 #Author: nulige
 4 
 5 import threading,time
 6 
 7 class myThread(threading.Thread):
 8     def run(self):           #啓動後,執行run方法  9         if semaphore.acquire():  #加把鎖,能夠放進去多個(至關於5把鎖,5個鑰匙,同時有5個線程)
10             print(self.name)
11             time.sleep(5)
12             semaphore.release()
13 
14 if __name__=="__main__":
15     semaphore=threading.Semaphore(5)  #同時能有幾個線程進去(設置爲5就是一次5個線程進去),相似於停車廠一次能停幾輛車 16     
17     thrs=[] #空列表 18     for i in range(100): #100個線程 19         thrs.append(myThread()) #加線程對象 20 
21     for t in thrs:
22         t.start()  #分別啓動

執行結果:

 1 Thread-1
 2 Thread-2
 3 Thread-3
 4 Thread-4
 5 Thread-5   #5個線程同時出來
 6 
 7 Thread-8
 8 Thread-6
 9 Thread-9
10 Thread-7
11 Thread-10  #每隔3秒再打印5個出來
12 
13 部分省略.......

 

11、多線程利器---隊列(queue)    ( 重點掌握)                                                                                             

列表是不安全的數據結構

示例:

 1 #兩個線程同時刪除5,因此會報錯,由於只有一個5,一個線程刪除了就沒有啦。
 2 
 3 import threading,time
 4 
 5 li=[1,2,3,4,5]
 6 
 7 def pri():
 8     while li:
 9         a=li[-1]  #取值
10         print(a)
11         time.sleep(1)
12         li.remove(a)  #remove按索引去刪除內容
13 
14 t1=threading.Thread(target=pri,args=())  #線程1
15 t1.start()
16 t2=threading.Thread(target=pri,args=())  #線程2
17 t2.start()

執行結果:

 1 #會報錯,由於只有一個5,刪除了就沒有啦,不能兩個線程同時刪除。
 2 
 3 5
 4 5
 5 4
 6 Exception in thread Thread-2:
 7 Traceback (most recent call last):
 8   File "C:\Python3.5\lib\threading.py", line 914, in _bootstrap_inner
 9     self.run()
10   File "C:\Python3.5\lib\threading.py", line 862, in run
11     self._target(*self._args, **self._kwargs)
12   File "D:/python/day34/s10.py", line 34, in pri
13     li.remove(a)
14 ValueError: list.remove(x): x not in list
15 
16 3
17 2
18 1
View Code

 

思考:如何經過對列來完成上述功能?

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

queue列隊的三種模式及構造函數

一、先進先出模式 (誰先進去,誰先出來)   ---->class queue.Queue(maxsize)

二、先進後出模式  (先進去的,最後出來)  ---->class queue.LifoQueue(maxsize)

三、優先級模式    (優先級越低,先出來)   ---->class queue.PriorityQueue(maxsize)

 

1、先進先出

 示例1:

 1 #先進先出 (原則:誰先進去,誰就先出來)
 2 import queue  #線程 隊列
 3 
 4 q=queue.Queue() #先進先出  5 q.put(12)
 6 q.put("hello")
 7 q.put({"name":"yuan"})
 8 
 9 while 1:
10     data=q.get()
11     print(data)
12     print("-----------")

執行結果:

1 12            #他是第1個進去的,因此他先出來
2 -----------
3 hello
4 -----------
5 {'name': 'yuan'}
6 -----------

示例2: 

 1 import queue  #隊列,解決多線程問題  (注意:python2.7 Queue的首字母是大寫的)
 2 
 3 
 4 # q=queue.Queue(3) #一、設置3就是滿了,默認(FIFO 先進先出 ) #先進後出(手槍彈夾,後壓進去的,先出來)
 5 q=queue.Queue()  
 6 q.put(12)  
 7 q.put("hello")
 8 q.put({"name":"yuan"})
 9 
10 q.put(34)
11 # q.put(34,False) #二、blook=True,若是改爲Flase,提示你滿了,會報錯,但不會卡在這裏
12 
13 
14 while 1:
15     data=q.get()  #一、會卡着,等值進來
16     # data=q.get(block=False)  #三、隊列爲空
17     print(data)
18     print("-----------")

 

2、先進後出

示例:

 1 #先進後出
 2 import queue
 3 
 4 q=queue.LifoQueue() #先進後出  5 
 6 q.put(12)
 7 q.put("hello")
 8 q.put({"name":"yuan"})
 9 
10 while 1:
11     data=q.get()  #卡着,等值進來,
12     print(data)
13     print("-----------")

執行結果:

1 {'name': 'yuan'}  #後進來的先出去
2 -----------
3 hello
4 -----------
5 12
6 -----------

 

3、優化級

示例:

 1 #優先級
 2 import queue
 3 
 4 q=queue.PriorityQueue() #優先級  5 
 6 q.put([3,12])
 7 q.put([2,"hello"])  #2先出來,按優化級 級別是:2--->3--->4 從級到高  8 q.put([4,{"name":"yuan"}])
 9 
10 while 1:
11     data=q.get()
12     print(data[1])
13     print("-----------------------")

 執行結果:

1 hello                 #2先出來,按優先級
2 -----------------------
3 12
4 -----------------------
5 {'name': 'yuan'}
6 -----------------------

 

queue隊列類的方法:

建立一個「隊列」對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,
get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。


此包中的經常使用方法(q = Queue.Queue()):

q.qsize()  返回隊列的大小
q.empty()  若是隊列爲空,返回True,反之False
q.full()   若是隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]])  獲取隊列,timeout等待時間
q.get_nowait() 至關q.get(False)
非阻塞 q.put(item)   寫入隊列,timeout等待時間
q.put_nowait(item) 至關q.put(item, False)
q.task_done()  在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join()       實際上意味着等到隊列爲空,再執行別的操做

 

示例1: q.qsize() and q.empty() and q.full

 1 import queue
 2 
 3 q=queue.Queue()
 4 q.put(12)
 5 q.put("hello")
 6 q.put({"name":"yuan"})
 7 
 8 print(q.qsize()) #判斷隊列大小
 9 print(q.empty()) #判斷隊列是否爲空
10 print(q.full)    #判斷隊列是否滿了
11 
12 while 1:
13     data=q.get()
14     print(data)
15     print("-----------")

執行結果:

3 --->q.qsize()

False ---->q.empty()
 
<bound method Queue.full of <queue.Queue object at 0x01315A70>> --->full

 

示例2:g.put_nowait() 至關於q.get(Flase)

 1 import queue
 2 
 3 q=queue.Queue(3)
 4 
 5 q.put(12)
 6 q.put([2,"hello"])
 7 q.put([4,{"name":"yuan"}])
 8 
 9 q.put_nowait(56)  #至關於q.get(Flase)
10 
11 while 1:
12     data=q.get()
13     print(data)

執行結果:

1 Traceback (most recent call last):
2   File "D:/python/day34/s7.py", line 79, in <module>
3     q.put_nowait(56)  #至關於q.get(Flase)
4   File "C:\Python3.5\lib\queue.py", line 184, in put_nowait
5     return self.put(item, block=False)
6   File "C:\Python3.5\lib\queue.py", line 130, in put
7     raise Full
8 queue.Full

12、生產者消費者模型                                                                                                                                       

  一、爲何要使用生產者和消費者模式

  在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

  二、什麼是生產者消費者模式

  生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。

示例1:邊作包子,邊吃包子

 1 #生產者消費者模型(生產者先執行,再吃包子。)
 2 
 3 import time,random
 4 import queue,threading
 5 
 6 q = queue.Queue()
 7 
 8 def Producer(name):
 9   count = 0
10   while count <10:
11     print("making........")
12     time.sleep(random.randrange(3)) #產生一個隨機數(1-2秒之間)
13     q.put(count)
14     print('Producer %s has produced %s baozi..' %(name, count))
15     count +=1
16     print("ok......")
17 
18 def Consumer(name):
19   count = 0
20   while count <10:
21     time.sleep(random.randrange(4))  #產生一個隨機數(1-3秒之間)
22     if not q.empty():
23         data = q.get()
24         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
25     else:
26         print("-----no baozi anymore----")
27         count +=1
28 
29 p1 = threading.Thread(target=Producer, args=('A君',))
30 c1 = threading.Thread(target=Consumer, args=('B君',))
31 
32 p1.start()
33 c1.start()

執行結果:

 1 making........
 2 Producer A君 has produced 0 baozi..
 3 ok......
 4 making........
 5 Consumer B君 has eat 0 baozi...
 6 Producer A君 has produced 1 baozi..
 7 ok......
 8 making........
 9 Consumer B君 has eat 1 baozi...
10 Producer A君 has produced 2 baozi..
11 ok......
12 making........
13 Consumer B君 has eat 2 baozi...
14 Producer A君 has produced 3 baozi..
15 ok......
16 making........
17 Producer A君 has produced 4 baozi..
18 ok......
19 making........
20 Consumer B君 has eat 3 baozi...
21 Producer A君 has produced 5 baozi..
22 ok......
23 making........
24 Producer A君 has produced 6 baozi..
25 ok......
26 making........
27 Consumer B君 has eat 4 baozi...
28 Producer A君 has produced 7 baozi..
29 ok......
30 making........
31 Producer A君 has produced 8 baozi..
32 ok......
33 making........
34 Producer A君 has produced 9 baozi..
35 ok......
36 Consumer B君 has eat 5 baozi...
37 Consumer B君 has eat 6 baozi...
38 Consumer B君 has eat 7 baozi...
39 Consumer B君 has eat 8 baozi...
40 Consumer B君 has eat 9 baozi...
41 -----no baozi anymore----
42 -----no baozi anymore----
43 -----no baozi anymore----
44 -----no baozi anymore----
45 -----no baozi anymore----
46 -----no baozi anymore----
47 -----no baozi anymore----
48 -----no baozi anymore----
49 -----no baozi anymore----
50 -----no baozi anymore----
View Code

 

示例2: 供不該求,吃包子的人太多了(1我的在生產包子,3我的在吃包子)

 1 #生產者消費者模型(供不該求,吃的人太多了,生產不贏)
 2 
 3 import time,random
 4 import queue,threading
 5 
 6 q = queue.Queue()
 7 
 8 def Producer(name):
 9   count = 0
10   while count <10:
11     print("making........")
12     time.sleep(random.randrange(3)) #產生一個隨機數(1-2秒之間)
13     q.put(count)
14     print('Producer %s has produced %s baozi..' %(name, count))
15     count +=1
16     print("ok......")
17 
18 def Consumer(name):
19   count = 0
20   while count <10:
21     time.sleep(random.randrange(4))  #產生一個隨機數(1-3秒之間)
22     if not q.empty():
23         data = q.get()
24         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
25     else:
26         print("-----no baozi anymore----")
27         count +=1
28 
29 p1 = threading.Thread(target=Producer, args=('A君',))  #1我的生產包子 30 c1 = threading.Thread(target=Consumer, args=('B君',))
31 c2 = threading.Thread(target=Consumer, args=('C君',))  #3我的在吃包子,致使吃包子的人太多啦,生產不贏 32 c3 = threading.Thread(target=Consumer, args=('D君',))
33 
34 p1.start()
35 c1.start()
36 c2.start()
37 c3.start()

執行結果:

 1 making........
 2 -----no baozi anymore----      #生產不贏,供不該求,吃包子的人太多了
 3 -----no baozi anymore----
 4 Producer A君 has produced 0 baozi..
 5 ok......
 6 making........
 7 Producer A君 has produced 1 baozi..
 8 ok......
 9 making........
10 Consumer C君 has eat 0 baozi...
11 Consumer D君 has eat 1 baozi...
12 -----no baozi anymore----
13 -----no baozi anymore----
14 -----no baozi anymore----
15 -----no baozi anymore----
16 -----no baozi anymore----
17 -----no baozi anymore----
18 Producer A君 has produced 2 baozi..
19 ok......
20 making........
21 Producer A君 has produced 3 baozi..
22 ok......
23 making........
24 Producer A君 has produced 4 baozi..
25 ok......
26 making........
27 Consumer C君 has eat 2 baozi...
28 Consumer D君 has eat 3 baozi...
29 Consumer D君 has eat 4 baozi...
30 -----no baozi anymore----
31 -----no baozi anymore----
32 Producer A君 has produced 5 baozi..
33 ok......
34 making........
35 Producer A君 has produced 6 baozi..
36 ok......
37 making........
38 Producer A君 has produced 7 baozi..
39 ok......
40 making........
41 Producer A君 has produced 8 baozi..
42 ok......
43 making........
44 Consumer B君 has eat 5 baozi...
45 Consumer C君 has eat 6 baozi...
46 Consumer D君 has eat 7 baozi...
47 Producer A君 has produced 9 baozi..
48 ok......
49 Consumer B君 has eat 8 baozi...
50 Consumer C君 has eat 9 baozi...
51 -----no baozi anymore----
52 -----no baozi anymore----
53 -----no baozi anymore----
54 -----no baozi anymore----
55 -----no baozi anymore----
56 -----no baozi anymore----
57 -----no baozi anymore----
58 -----no baozi anymore----
59 -----no baozi anymore----
60 -----no baozi anymore----
61 -----no baozi anymore----
62 -----no baozi anymore----
63 -----no baozi anymore----
64 -----no baozi anymore----
65 -----no baozi anymore----
66 -----no baozi anymore----
67 -----no baozi anymore----
68 -----no baozi anymore----
69 -----no baozi anymore----
70 -----no baozi anymore----
View Code

 

示例3:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import time,random
 6 import queue,threading
 7 
 8 q = queue.Queue()
 9 
10 def Producer(name):
11   count = 0
12   while count <10:
13     print("making........")
14     time.sleep(5)
15     q.put(count)
16     print('Producer %s has produced %s baozi..' %(name, count))
17     count +=1
18     q.task_done()  #發信號告訴隊列在生產包子,讓join接收,就開始吃包子
19     print("ok......")
20 
21 def Consumer(name):
22   count = 0
23   while count <10:
24         time.sleep(random.randrange(4))  #產生一個隨機數(1秒-3秒之間)
25         print("waiting...等待包子作的過程當中...")
26         q.join()  #join開始接收
27         data = q.get()
28         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
29         count +=1
30 
31 p1 = threading.Thread(target=Producer, args=('A君',))
32 c1 = threading.Thread(target=Consumer, args=('B君',))
33 c2 = threading.Thread(target=Consumer, args=('C君',))
34 c3 = threading.Thread(target=Consumer, args=('D君',))
35 
36 p1.start()
37 c1.start()
38 c2.start()
39 c3.start()

執行結果:

 1 making........
 2 waiting...等待包子作的過程當中...
 3 waiting...等待包子作的過程當中...
 4 waiting...等待包子作的過程當中...
 5 Producer A君 has produced 0 baozi..
 6 ok......
 7 making........
 8 Consumer D君 has eat 0 baozi...
 9 waiting...等待包子作的過程當中...
10 Producer A君 has produced 1 baozi..
11 Consumer B君 has eat 1 baozi...
12 waiting...等待包子作的過程當中...
13 ok......
14 making........
15 部分代碼省略......
View Code

 

示例4:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import time,random
 6 import queue,threading
 7 
 8 q = queue.Queue()
 9 
10 def Producer(name):
11   count = 0
12   while count <10:
13     print("making.....正在製做包子...")
14     time.sleep(5)
15     q.put(count)
16     print('Producer %s has produced %s baozi..' %(name, count))
17     count +=1
18     q.join()
19     print("ok......")
20 
21 def Consumer(name):
22   count = 0
23   while count <10:
24         time.sleep(random.randrange(4))  #產生一個隨機數(1秒-3秒之間)
25         data = q.get()
26         print("eating.......")
27         time.sleep(4)  #4秒鐘這後
28         q.task_done()  #給他發一個信號,纔打印ok
29         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
30         count +=1
31 
32 p1 = threading.Thread(target=Producer, args=('A君',))
33 c1 = threading.Thread(target=Consumer, args=('B君',))
34 c2 = threading.Thread(target=Consumer, args=('C君',))
35 c3 = threading.Thread(target=Consumer, args=('D君',))
36 
37 p1.start()
38 c1.start()
39 c2.start()
40 c3.start()

執行結果:

 1 making.....正在製做包子...
 2 Producer A君 has produced 0 baozi..
 3 eating.......
 4 Consumer B君 has eat 0 baozi...
 5 ok......
 6 making.....正在製做包子...
 7 Producer A君 has produced 1 baozi..
 8 eating.......
 9 Consumer C君 has eat 1 baozi...
10 ok......
11 making.....正在製做包子...
12 Producer A君 has produced 2 baozi..
13 eating.......
14 Consumer D君 has eat 2 baozi...
15 ok......
16 making.....正在製做包子...
17 Producer A君 has produced 3 baozi..
18 eating.......
19 Consumer B君 has eat 3 baozi...
20 ok......
21 making.....正在製做包子...
22 Producer A君 has produced 4 baozi..
23 eating.......
24 Consumer C君 has eat 4 baozi...
25 ok......
26 making.....正在製做包子...
27 Producer A君 has produced 5 baozi..
28 eating.......
29 Consumer D君 has eat 5 baozi...
30 ok......
31 making.....正在製做包子...
32 Producer A君 has produced 6 baozi..
33 eating.......
34 Consumer B君 has eat 6 baozi...
35 ok......
36 making.....正在製做包子...
View Code

總結:

  task_done和join必須成對出現,相似於一個通訊工具,我給你發個信號,你就知道我作了某個操做

(例如:put or get) 對方就是join。若是我put or get 你就處理。(相似於收到信號就處理)

相似於,我發信號,你收到就處理,沒收到就Join卡住,一直在那等待。

十3、多進程模塊 multiprocessing (主要解決GIL問題)                                

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

因爲GIL的存在,python中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。

multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情景。

 

1、進程的調用

調用方式1:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 #多進程調用(並行)
 6 
 7 from multiprocessing import Process
 8 import time
 9 
10 
11 def f(name):
12     time.sleep(1)
13     print('hello', name,time.ctime())
14 
15 if __name__ == '__main__':
16     p_list=[]
17     for i in range(3):  #子進程
18 
19         p = Process(target=f, args=('alvin',))
20         p_list.append(p)
21         p.start()
22 
23     for i in p_list:
24         i.join()
25 
26     print('end')  #主進程

執行結果:

1 hello alvin Mon Jan 16 18:38:08 2017  #並行,三個同時出現
2 hello alvin Mon Jan 16 18:38:08 2017
3 hello alvin Mon Jan 16 18:38:08 2017
4 end

 

調用方式2:

示例1: 1秒鐘這後,4條消息同時執行

 1 from multiprocessing import Process
 2 import time
 3 
 4 class MyProcess(Process):
 5 
 6     def run(self):
 7         time.sleep(1)
 8         print ('hello', self.name,time.ctime())
 9 
10 
11 if __name__ == '__main__':
12     p_list=[]
13     for i in range(3):
14         p = MyProcess()  #進程對象 15         p.start()        #啓動執行run方法 16         p_list.append(p)
17 
18     for p in p_list:
19         p.join()  #子進程沒有執行完,主進程會一直等待
20 
21     print('end')

執行結果:

1 hello MyProcess-1 Mon Jan 16 18:56:58 2017  #結果同時出來
2 hello MyProcess-2 Mon Jan 16 18:56:58 2017
3 hello MyProcess-3 Mon Jan 16 18:56:58 2017
4 end

 

示例2:daemon=True 是屬性,不是方法

 1 #設置爲守護進程,打印的就是end
 2 
 3 from multiprocessing import Process
 4 import time
 5 
 6 class MyProcess(Process):
 7 
 8     def run(self):
 9         time.sleep(1)
10         print ('hello', self.name,time.ctime())
11 
12 if __name__ == '__main__':
13     p_list=[]
14 
15     for i in range(3):
16         p = MyProcess()  #進程對象
17         p.daemon=True  #是屬性,不是方法
18         p.start()       #啓動執行sun方法
19         p_list.append(p)
20 
21     print('end')   #主進程執行完以後,無論守護進程

執行結果:

1 end

 

調用方法3:

 1 from multiprocessing import Process
 2 import os
 3 import time
 4 
 5 def info(title):
 6     print("title:", title)
 7     print('parent process:', os.getppid())  #父進程的pid
 8     print('process id:', os.getpid())       #打印進程號
 9 
10 def f(name):
11     info('function f')
12     print('hello', name)
13 
14 if __name__ == '__main__':
15     info('main process line')
16 
17     time.sleep(1)
18     print("------------------")
19     p = Process(target=info, args=('yuan',))
20     p.start()
21     p.join()

執行結果:

1 title: main process line
2 parent process: 4204
3 process id: 7280
4 ------------------
5 title: yuan
6 parent process: 7280
7 process id: 3596

 

2、Process類 

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前尚未實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():無論任務是否完成,當即中止工做進程

屬性:

  daemon:和線程的setDeamon功能同樣

  name:進程名字。

  pid:進程號。

 

示例:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import time
 6 from multiprocessing import Process
 7 
 8 class MyProcess(Process):
 9 
10     def __init__(self,num):
11         super(MyProcess,self).__init__()
12         self.num=num
13 
14     def run(self):
15         time.sleep(1)
16         print(self.pid)
17         print(self.is_alive())
18         print(self.num)
19         time.sleep(1)
20 
21 
22 if __name__ == '__main__':
23     p_list=[]
24     for i in range(10):
25         p = MyProcess(i)
26         p_list.append(p)
27 
28     for p in p_list:
29         p.start()
30 
31     print('main process end')

執行結果:

 1 main process end
 2 7104
 3 True
 4 2
 5 7960
 6 True
 7 3
 8 9016
 9 True
10 1
11 4604
12 True
13 7
14 1280
15 True
16 6
17 7896
18 True
19 5
20 7704
21 True
22 0
23 8760
24 True
25 9
26 7088
27 True
28 8
29 2976
30 True
31 4
View Code

 

3、進程間通信 

一、進程對列Queue

示例:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 #進程通訊 (拷貝了一份數據),在linux上ID是同樣的。
 6 import time
 7 import multiprocessing
 8 
 9 def foo(q):
10     time.sleep(1)
11     print("son process",id(q))
12     q.put(123)
13     q.put("yuan")
14 
15 if __name__ == '__main__':
16     q=multiprocessing.Queue()
17     p=multiprocessing.Process(target=foo,args=(q,)) #傳q是數據複製了一份,消耗內存
18     p.start()
19     print("main process",id(q))
20     print(q.get())
21     print(q.get())

執行結果:

1 main process 31888848   #在windows上運行結果
2 son process 35289968
3 123
4 yuan
5 ------------------------------------------------------------
6 ('main process', 20253136)  #在linux上運行結果
7 ('son process',  20253136)
8 123
9 yuan

 

二、管道

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 #管道
 6 from multiprocessing import Process, Pipe
 7 
 8 def f(conn):
 9     conn.send([12, {"name":"yuan"}, 'hello']) #發送消息
10     response=conn.recv()  #接收消息
11     print("response",response)
12     conn.close()
13     print("q_ID2:",id(conn))
14 
15 if __name__ == '__main__':
16 
17     parent_conn, child_conn = Pipe()
18     print("q_ID1:",id(child_conn))
19     p = Process(target=f, args=(child_conn,))
20     p.start()
21 
22     print(parent_conn.recv())    #接收消息 prints "[42, None, 'hello']"
23     parent_conn.send("兒子你好!")  #發送消息
24     p.join()

執行結果:

1 q_ID1: 11578512
2 [12, {'name': 'yuan'}, 'hello']
3 response 兒子你好!
4 q_ID2: 31830928

 

三、Managers

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 from multiprocessing import Process, Manager
 6 
 7 def f(d, l,n):
 8     d[n] = '1'  #{0:"1"}
 9     d['2'] = 2  #{0:"1","2":2}
10     l.append(n) #[0.1.2.3.4.0]
11     # print(l)
12     print("son process:",id(d),id(l))
13 
14 if __name__ == '__main__':
15 
16     with Manager() as manager:  #用with至關於打開文件就不要關閉了
17 
18         d = manager.dict()  #{}  #主進程
19 
20         l = manager.list(range(5)) #[0,1,2,3,4]
21 
22         print("main process:",id(d),id(l))
23 
24         p_list = []
25 
26         for i in range(10): #10個子進程
27             p = Process(target=f, args=(d,l,i))  #主進程傳過去
28             p.start()
29             p_list.append(p)
30 
31         for res in p_list:
32             res.join()
33 
34         print(d)
35         print(l)

執行結果:

 1 main process: 23793136 50481712
 2 son process: 34508592 34286736
 3 son process: 23695120 23342192
 4 son process: 30969648 30682256
 5 son process: 23760656 23473264
 6 son process: 23432976 23080048
 7 son process: 31428368 31140976
 8 son process: 18976592 18754736
 9 son process: 20025072 19606608
10 son process: 23301904 23014512
11 son process: 26185488 25898096
12 {0: '1', 1: '1', 2: '1', 3: '1', 4: '1', '2': 2, 6: '1', 7: '1', 8: '1', 9: '1', 5: '1'}
13 [0, 1, 2, 3, 4, 1, 2, 3, 9, 6, 0, 8, 5, 4, 7]

 

十4、進程同步

python2: 在python2中是元組,由於共享一個屏幕,會出現串行的狀況,因此須要加把鎖

python3: 在python3中,加不加鎖,都不會出現串行的問題

示例1:

 1 #有鎖的狀況
 2 
 3 #!/usr/bin/env python
 4 # -*- coding:utf-8 -*- 
 5 #Author: nulige
 6 
 7 from multiprocessing import Process,Lock
 8 import time
 9 
10 def f(l, i):
11         l.acquire()
12         time.sleep(1)
13         print('hello world %s' % i)
14         l.release()
15 
16 if __name__ == '__main__':
17     lock = Lock()
18 
19     for num in range(10):
20         Process(target=f, args=(lock, num)).start()

執行結果:

 1 hello world 4
 2 hello world 5
 3 hello world 1
 4 hello world 2
 5 hello world 0
 6 hello world 8
 7 hello world 3
 8 hello world 9
 9 hello world 7
10 hello world 6

 

示例2: 有鎖的狀況

 1 #有鎖的狀況
 2 from multiprocessing import Process,Lock
 3 
 4 def f(l,i):
 5     l.acquire()
 6 
 7     print('hello world',i)  #python2是元組(由於共享一個屏幕,在python2中會串行,因此須要加把鎖)
 8     # print('hello world %s' %i)
 9 
10     l.release()
11 
12 if __name__ == '__main__':
13     lock = Lock()
14 
15     for num in range(10):
16         Process(target=f,args=(lock,num)).start()

執行結果:

 1 hello world 1
 2 hello world 7
 3 hello world 3
 4 hello world 2
 5 hello world 4
 6 hello world 0
 7 hello world 8
 8 hello world 9
 9 hello world 6
10 hello world 5

示例2:沒有鎖的狀況

 1 #沒有鎖的狀況(在python2.7下,才能看到效果)
 2 from multiprocessing import Process,Lock
 3 
 4 def f(l,i):
 5     print('hello world %s' %i)
 6 
 7 if __name__ == '__main__':
 8     lock = Lock()
 9 
10     for num in range(100):
11         Process(target=f,args=(lock,num)).start()

執行結果:

 1 #在python2.7中,沒有鎖會出現串行的狀況
 2 
 3 D:\python\day35>python2 s6.py
 4 hello world 1
 5 hello world 0
 6 hello world 6
 7 hello world 2
 8 hello world 5
 9 hhello world 4ello world 3
10 
11 hhhhello world 15
12 hello world 9
13 hhello world 24
14 hhhello world 25
15 hello world 29
16 ello world 12hello world 28hello world 10hello world 49
17 hhhhhello world 59
18 hello world 63
19 hello world 78
20 hello world 77
21 ello world 46hello world 70hhello world 57hhello world 58
22 h
23 hhello world 13hhello world 54ello world 39h
24 ello world 44hhello world 69
25 hello world 88
26 hhhhhhello world 45
27 hello world 38hello world 22
28 hhello world 42
29 ello world 52hhello world 90ello world 30
30 hh
31 
32 hhhello world 18
33 ello world 21h
34 hhhello world 32hhhello world 91
35 hh
36 ello world 89ello world 53hello world 85hh
37 hhhello world 27ello world 35
38 h
39 h
40 ello world 62h
41 hhhello world 23ello world 68ello world 14
42 ello world 65ello world 66ello world 73
43 hhhhhello world 71hhello world 37hh
44 
45 ello world 11hello world 7ello world 8ello world 20ello world 50ello world 81ello world 17
46 ello world 31hello world 82
47 
48 
49 ello world 16ello world 83
50 
51 ello world 94ello world 61
52 
53 ello world 26hello world 96ello world 40ello world 64ello world 51ello world 48
54 
55 ello world 33hello world 86hhello world 55ello world 75ello world 79ello world 41
56 ello world 74
57 
58 
59 
60 ello world 76ello world 67ello world 56ello world 95
61 
62 ello world 47ello world 84
63 
64 ello world 72
View Code

 

十5、進程池(python沒有線程池)

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有兩個方法:

  • apply:同步方法
  • apply_async :異步方法

示例1:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 from  multiprocessing import Process,Pool
 6 import time,os
 7 
 8 def Foo(i):
 9     time.sleep(1)
10     print(i)
11 if __name__ == '__main__':
12 
13     pool = Pool(5)  #進程池的數量
14     for i in range(100):  #進程數
15         # pool.apply(func=Foo, args=(i,))  #同步接口
16         pool.apply_async(func=Foo, args=(i,))  #用異步接口
17 
18     pool.close()
19     pool.join()  #join和close順序是固定的
20     print('end')

執行結果:

 

 1 0
 2 2
 3 3
 4 1
 5 4
 6 
 7 5
 8 7
 9 8
10 6
11 9
12 
13 10
14 11
15 13
16 12
17 14
18 
19 15
20 18
21 16
22 17
23 19
24 省略部分代碼.......
View Code

 

示例2:

 1 from  multiprocessing import Process,Pool
 2 import time,os
 3 
 4 def Foo(i):
 5     time.sleep(1)
 6     print(i)
 7     print("son:",os.getpid())
 8     return "HELLO %s "%i
 9 
10 def Bar(arg):  #回調函數
11     print(arg)
12     # print("hello")
13     # print("Bar:",os.getpid())
14 
15 if __name__ == '__main__':
16 
17     pool = Pool(5)  #進程池的數量
18     print("main pid", os.getpid())
19     for i in range(100):  #進程數
20         #pool.apply(func=Foo, args=(i,))
21         # pool.apply_async(func=Foo, args=(i,))
22 
23         #回調函數: 就是某個動做或者函數執行成功後再去執行的函數
24         pool.apply_async(func=Foo, args=(i,),callback=Bar)
25 
26     pool.close()
27     pool.join()
28     print('end')

執行結果:

 1 main pid 1220
 2 0
 3 son: 3920
 4 HELLO 0 
 5 1
 6 son: 8660
 7 HELLO 1 
 8 2
 9 son: 1948
10 HELLO 2 
11 3
12 son: 5808
13 HELLO 3 
14 4
15 部分結果省略............

 

示例3:子進程的結果,告訴callback,至關於獲取子進程執行的結果。

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 from multiprocessing import Process,Lock,Pool
 6 import time
 7 
 8 #進程池
 9 def f(i):
10     print('hello world %s'% i)
11     time.sleep(1)
12 
13     return i
14 
15 def callback(data):  #i的返回值給callback,其實就是個回執,告訴你幹完活了。
16     print("exec done--->",data)
17 
18 if __name__=='__main__':
19     lock = Lock()
20 
21     pool = Pool(processes=5)
22     for num in range(100):
23 
24         pool.apply_async(func=f, args=(num,), callback=callable)  #子進程幹完的結果,返回給callable
25 
26     pool.close()
27     pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。

執行結果:

 1 hello world 0  #分別5行,5行輸出結果
 2 hello world 1
 3 hello world 2
 4 hello world 3
 5 hello world 4
 6 
 7 hello world 5
 8 hello world 6
 9 hello world 7
10 hello world 8
11 hello world 9
12 
13 hello world 10
14 hello world 11
15 hello world 12
16 hello world 13
17 hello world 14

 

十6、協程                                                                                                                                                            

協程,又稱微線程,纖程。英文名Coroutine。

優勢1: 協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。

優勢2: 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。

由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。

 

示例:複習生成器

 1 #生成器
 2 def f():
 3 
 4     print("ok")
 5     s = yield 6
 6     print(s)
 7     print("ok2")
 8     yield
 9 
10 gen=f()
11 # print(gen)
12 # next(gen)  #方法一
13 # next(gen)
14 
15 RET=gen.__next__()  #方法二
16 print(RET)
17 
18 gen.send(5)  #方法三

執行結果:

1 ok
2 6
3 5
4 ok2

 

一、yield的簡單實現 

示例1:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import time
 6 import queue
 7 
 8 def consumer(name):
 9     print("--->ready to eat baozi........")
10     while True:
11         new_baozi = yield  #yield實現上下文切換,傳包子進來
12         print("[%s] is eating baozi %s" % (name,new_baozi))
13         #time.sleep(1)
14 
15 def producer():
16 
17     r = con.__next__()
18     r = con2.__next__()
19     n = 0
20     while 1:
21         time.sleep(1)
22         print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) )
23         con.send(n)  #發送告訴他有包子了
24         con2.send(n+1)
25 
26         n +=2
27 
28 if __name__ == '__main__':
29     con = consumer("c1")
30     con2 = consumer("c2")
31     producer()

執行結果:

 1 --->ready to eat baozi........
 2 --->ready to eat baozi........
 3 [producer] is making baozi 0 and 1
 4 [c1] is eating baozi 0
 5 [c2] is eating baozi 1
 6 [producer] is making baozi 2 and 3
 7 [c1] is eating baozi 2
 8 [c2] is eating baozi 3
 9 [producer] is making baozi 4 and 5
10 [c1] is eating baozi 4
11 [c2] is eating baozi 5
12 [producer] is making baozi 6 and 7
13 [c1] is eating baozi 6
14 [c2] is eating baozi 7
15 [producer] is making baozi 8 and 9
16 [c1] is eating baozi 8
17 [c2] is eating baozi 9
18 [producer] is making baozi 10 and 11
19 [c1] is eating baozi 10
20 [c2] is eating baozi 11
21 省略部分........

 

二、Greenlet

  greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

 1 from greenlet import greenlet
 2  
 3  
 4 def test1():
 5     print(12)
 6     gr2.switch()
 7     print(34)
 8     gr2.switch()
 9  
10  
11 def test2():
12     print(56)
13     gr1.switch()
14     print(78)
15  
16  
17 gr1 = greenlet(test1)
18 gr2 = greenlet(test2)
19 gr1.switch()
  
20 test2()

 

三、Gevent

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*- 
 3 #Author: nulige
 4 
 5 import gevent
 6 
 7 import requests,time
 8 
 9 start=time.time()
10 
11 def f(url):
12     print('GET: %s' % url)
13     resp =requests.get(url)
14     data = resp.text
15     print('%d bytes received from %s.' % (len(data), url))
16 
17 gevent.joinall([
18 
19         gevent.spawn(f, 'https://www.python.org/'),
20         gevent.spawn(f, 'https://www.yahoo.com/'),
21         gevent.spawn(f, 'https://www.baidu.com/'),
22         gevent.spawn(f, 'https://www.sina.com.cn/'),
23 
24 ])
25 
26 #上下時間對比
27 
28 # f('https://www.python.org/')
29 #
30 # f('https://www.yahoo.com/')
31 #
32 # f('https://baidu.com/')
33 #
34 # f('https://www.sina.com.cn/')
35 
36 print("cost time:",time.time()-start)

協程的優點:

一、沒有切換的消

二、沒有鎖的概念

 

有一個問題:能用多核嗎?

答:能夠採用多進程+協程,是一個很好的解決併發的方案

相關文章
相關標籤/搜索