基礎概念
1、進程、程序和線程
- 程序:程序只是一堆代碼而已
- 進程:指的是程序的運行過程,是對正在運行程序的一個抽象。進程是一個資源單位
- 線程:每一個進程有一個地址空間,並且默認就有一個控制線程。線程纔是cpu上的執行單位
2、併發與並行
不管是並行仍是併發,在用戶看來都是'同時'運行的,無論是進程仍是線程,都只是一個任務而已,真是幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行一個任務python
- 併發:是僞並行,即看起來是同時運行。單個cpu+多道技術就能夠實現併發,(並行也屬於併發)
- 並行:同時運行,只有具有多個cpu才能實現並行
3、同步與異步
- 同步:發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回。
- 異步:異步功能調用發出後,不會等返回,而是繼續往下執行當,當該異步功能完成後,經過狀態、通知或回調函數來通知調用者。
4、阻塞與非阻塞
- 阻塞:調用結果返回以前,當前線程會被掛起(如遇到io操做)。函數只有在獲得結果以後纔會將阻塞的線程激活。
- 非阻塞:在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程。
5、總結
- 同步與異步針對的是函數/任務的調用方式:同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處於激活狀態。而異步狀況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行當,函數返回的時候經過狀態、通知、事件等方式通知進程任務完成。
- 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能知足的時候就將進程掛起,而非阻塞則不會阻塞當前進程
多進程
1、進程的狀態
2、multiprocessing模塊介紹
- python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。
- multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。
- 與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
3、Process類
一、方法介紹
- p.start():啓動進程,並調用該子進程中的p.run()
- p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
- p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
- p.is_alive():若是p仍然運行,返回True
- p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
二、屬性介紹
- p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
- p.name:進程的名稱
- p.pid:進程的pid
三、開啓子進程的兩種方式
注意:在windows中Process()必須放到# if __name__ == '__main__':下git
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#方式一:
from
multiprocessing
import
Process
import
time,os
def
task(name):
print
(
'%s %s is running,parent id is <%s>'
%
(name,os.getpid(),os.getppid()))
time.sleep(
3
)
print
(
'%s %s is running,parent id is <%s>'
%
(name, os.getpid(), os.getppid()))
if
__name__
=
=
'__main__'
:
# Process(target=task,kwargs={'name':'子進程1'})
p
=
Process(target
=
task,args
=
(
'子進程1'
,))
p.start()
#僅僅只是給操做系統發送了一個信號
print
(
'主進程'
, os.getpid(), os.getppid())
#方式二
from
multiprocessing
import
Process
import
time,os
class
MyProcess(Process):
def
__init__(
self
,name):
super
().__init__()
self
.name
=
name
def
run(
self
):
print
(
'%s %s is running,parent id is <%s>'
%
(
self
.name, os.getpid(), os.getppid()))
time.sleep(
3
)
print
(
'%s %s is running,parent id is <%s>'
%
(
self
.name, os.getpid(), os.getppid()))
if
__name__
=
=
'__main__'
:
p
=
MyProcess(
'子進程1'
)
p.start()
print
(
'主進程'
, os.getpid(), os.getppid())
|
4、守護進程
主進程建立守護進程github
- 守護進程會在主進程代碼執行結束後就終止
- 守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
- 注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
- 必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
from
multiprocessing
import
Process
import
time
def
foo():
print
(
123
)
time.sleep(
1
)
print
(
"end123"
)
def
bar():
print
(
456
)
time.sleep(
3
)
print
(
"end456"
)
if
__name__
=
=
'__main__'
:
p1
=
Process(target
=
foo)
p2
=
Process(target
=
bar)
p1.daemon
=
True
p1.start()
p2.start()
# p2.join()
print
(
"main-------"
)
"""
#主進程代碼運行完畢,守護進程就會結束
main-------
456
end456
"""
|
5、互斥鎖
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的, 而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理web
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
"""
db.txt的內容爲:{"count":1}
"""
from
multiprocessing
import
Process,Lock
import
json
import
time
def
search(name):
time.sleep(
1
)
dic
=
json.load(
open
(
'db.txt'
,
'r'
,encoding
=
'utf-8'
))
print
(
'<%s> 查看到剩餘票數【%s】'
%
(name,dic[
'count'
]))
def
get(name):
time.sleep(
1
)
dic
=
json.load(
open
(
'db.txt'
,
'r'
,encoding
=
'utf-8'
))
if
dic[
'count'
] >
0
:
dic[
'count'
]
-
=
1
time.sleep(
3
)
json.dump(dic,
open
(
'db.txt'
,
'w'
,encoding
=
'utf-8'
))
print
(
'<%s> 購票成功'
%
name)
def
task(name,mutex):
search(name)
mutex.acquire()
#=============
get(name)
mutex.release()
#=============
if
__name__
=
=
'__main__'
:
mutex
=
Lock()
#=============
for
i
in
range
(
10
):
p
=
Process(target
=
task,args
=
(
'路人%s'
%
i,mutex))
p.start()
|
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然能夠用文件共享數據實現進程間通訊,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.須要本身加鎖處理 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。這兩種方式都是使用消息傳遞的。 1 隊列和管道都是將數據存放於內存中 2 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
6、隊列(推薦使用)
一、Queue
- Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
- maxsize是隊列中容許最大項數,省略則無大小限制。
- q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
- q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
- q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
- q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
- q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
"""
基本用法
"""
from
multiprocessing
import
Process, Queue
import
time
q
=
Queue(
3
)
# put ,get ,put_nowait,get_nowait,full,empty
q.put(
3
)
q.put(
3
)
q.put(
3
)
print
(q.full())
# 滿了 True
print
(q.get())
print
(q.get())
print
(q.get())
print
(q.empty())
# 空了 True
"""
基於隊列來實習一個生產者消費者模型
問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,
可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。
"""
from
multiprocessing
import
Process,Queue
import
time,random
def
consumer(q,name):
while
True
:
res
=
q.get()
time.sleep(random.randint(
1
,
3
))
print
(
'%s 吃 %s'
%
(name,res))
def
producer(q,name,food):
for
i
in
range
(
3
):
time.sleep(random.randint(
1
,
3
))
res
=
'%s%s'
%
(food,i)
q.put(res)
print
(
'%s 生產了 %s'
%
(name,res))
if
__name__
=
=
'__main__'
:
q
=
Queue()
#生產者們:即廚師們
p1
=
Process(target
=
producer,args
=
(q,
'egon'
,
'包子'
))
#消費者們:即吃貨們
c1
=
Process(target
=
consumer,args
=
(q,
'alex'
))
#開始
p1.start()
c1.start()
print
(
'主'
)
"""
解決方式:讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環
"""
from
multiprocessing
import
Process,Queue
import
time,random,os
def
consumer(q,name):
while
True
:
res
=
q.get()
if
res
is
None
:
break
time.sleep(random.randint(
1
,
3
))
print
(
'%s 吃 %s'
%
(name,res))
def
producer(q,name,food):
for
i
in
range
(
3
):
time.sleep(random.randint(
1
,
3
))
res
=
'%s%s'
%
(food,i)
q.put(res)
print
(
'%s 生產了 %s'
%
(name,res))
if
__name__
=
=
'__main__'
:
q
=
Queue()
#生產者們:即廚師們
p1
=
Process(target
=
producer,args
=
(q,
'egon'
,
'包子'
))
#消費者們:即吃貨們
c1
=
Process(target
=
consumer,args
=
(q,
'alex'
))
#開始
p1.start()
c1.start()
p1.join()
q.put(
None
)
print
(
'主'
)
"""
有多個生產者和多個消費者時,有幾個消費者就須要發送幾回結束信號:至關low,須要使用JoinableQueue
"""
from
multiprocessing
import
Process,Queue
import
time,random,os
def
consumer(q,name):
while
True
:
res
=
q.get()
if
res
is
None
:
break
time.sleep(random.randint(
1
,
3
))
print
(
'\%s 吃 %s'
%
(name,res))
def
producer(q,name,food):
for
i
in
range
(
3
):
time.sleep(random.randint(
1
,
3
))
res
=
'%s%s'
%
(food,i)
q.put(res)
print
(
'%s 生產了 %s'
%
(name,res))
if
__name__
=
=
'__main__'
:
q
=
Queue()
#生產者們:即廚師們
p1
=
Process(target
=
producer,args
=
(q,
'egon1'
,
'包子'
))
p2
=
Process(target
=
producer,args
=
(q,
'egon2'
,
'骨頭'
))
p3
=
Process(target
=
producer,args
=
(q,
'egon3'
,
'泔水'
))
#消費者們:即吃貨們
c1
=
Process(target
=
consumer,args
=
(q,
'alex1'
))
c2
=
Process(target
=
consumer,args
=
(q,
'alex2'
))
#開始
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(
None
)
q.put(
None
)
q.put(
None
)
print
(
'主'
)
|
二、JoinableQueue
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。編程
- maxsize是隊列中容許最大項數,省略則無大小限制。
- JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
- q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
- q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
from
multiprocessing
import
Process,JoinableQueue
import
time
def
producer(q):
for
i
in
range
(
2
):
res
=
'包子%s'
%
i
time.sleep(
0.5
)
print
(
'生產者生產了%s'
%
res)
q.put(res)
q.join()
def
consumer(q):
while
True
:
res
=
q.get()
if
res
is
None
:
break
time.sleep(
1
)
print
(
'消費者吃了%s'
%
res)
q.task_done()
#向q.join()發送一次信號,證實一個數據已經被取走了
if
__name__
=
=
'__main__'
:
#容器
q
=
JoinableQueue()
#生產者們
p1
=
Process(target
=
producer,args
=
(q,))
p2
=
Process(target
=
producer,args
=
(q,))
p3
=
Process(target
=
producer,args
=
(q,))
#消費者們
c1
=
Process(target
=
consumer,args
=
(q,))
c2
=
Process(target
=
consumer,args
=
(q,))
c1.daemon
=
True
c2.daemon
=
True
# p1.start()
# p2.start()
# p3.start()
# c1.start()
# c2.start()
# 開始
p_l
=
[p1, p2, p3, c1, c2]
for
p
in
p_l:
p.start()
p1.join()
p2.join()
p3.join()
print
(
'主'
)
#主進程等--->p1,p2,p3等---->c1,c2
#p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
#於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程
|
7、進程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
"""
服務端多進程
"""
from
socket
import
*
from
multiprocessing
import
Process
server
=
socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,
1
)
server.bind((
'127.0.0.1'
,
8080
))
server.listen(
5
)
def
talk(conn,client_addr):
while
True
:
try
:
msg
=
conn.recv(
1024
)
if
not
msg:
break
conn.send(msg.upper())
except
Exception:
break
if
__name__
=
=
'__main__'
:
#windows下start進程必定要寫到這下面
while
True
:
conn,client_addr
=
server.accept()
p
=
Process(target
=
talk,args
=
(conn,client_addr))
p.start()
"""
服務端進程池
#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count())
#開啓6個客戶端,會發現2個客戶端處於等待狀態
#在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
"""
from
socket
import
*
from
multiprocessing
import
Pool
import
os
server
=
socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,
1
)
server.bind((
'127.0.0.1'
,
8080
))
server.listen(
5
)
def
talk(conn,client_addr):
print
(
'進程pid: %s'
%
os.getpid())
while
True
:
try
:
msg
=
conn.recv(
1024
)
if
not
msg:
break
conn.send(msg.upper())
except
Exception:
break
if
__name__
=
=
'__main__'
:
p
=
Pool()
while
True
:
conn,client_addr
=
server.accept()
p.apply_async(talk,args
=
(conn,client_addr))
# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
"""
客戶端都同樣
"""
from
socket
import
*
client
=
socket(AF_INET,SOCK_STREAM)
client.connect((
'127.0.0.1'
,
8080
))
while
True
:
msg
=
input
(
'>>: '
).strip()
if
not
msg:
continue
client.send(msg.encode(
'utf-8'
))
msg
=
client.recv(
1024
)
print
(msg.decode(
'utf-8'
))
|
多線程
多線程指的是,在一個進程中開啓多個線程,簡單的講:若是多個任務共用一塊地址空間,那麼必須在一個進程內開啓多個線程。json
1、進程與線程的區別
- 開進程的開銷遠大於開線程
- 同一進程內的多個線程共享該進程的地址空間
- from multiprocessing import Process p1=Process(target=task,) 換成 from threading import Thread t1=Thread(target=task,)
- 計算密集型的多線程不能加強性能,多進程才能夠,I/O密集型的多線程會加快程序的執行速度
2、threading模塊介紹
multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性windows
一、Thread實例對象的方法
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
二、threading模塊提供的一些方法:
- threading.currentThread(): 返回當前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
from
threading
import
Thread,currentThread,active_count,
enumerate
import
time
def
task():
print
(
'%s is ruuning'
%
currentThread().getName())
time.sleep(
2
)
print
(
'%s is done'
%
currentThread().getName())
if
__name__
=
=
'__main__'
:
# 在主進程下開啓線程
t
=
Thread(target
=
task,name
=
'子線程1'
)
t.start()
t.setName(
'兒子線程1'
)
# t.join()
print
(t.getName())
currentThread().setName(
'主線程'
)
print
(t.isAlive())
print
(
'主線程'
,currentThread().getName())
print
(active_count())
print
(
enumerate
())
#連同主線程在內有兩個運行的線程
"""
子線程1 is ruuning
兒子線程1
True
主線程 主線程
2
[<_MainThread(主線程, started 8672)>, <Thread(兒子線程1, started 7512)>]
兒子線程1 is done
"""
|
三、開啓子線程的兩種方式
注意:在windows中Process()必須放到# if __name__ == '__main__':下安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
#方式一
from
threading
import
Thread
import
time
def
sayhi(name):
time.sleep(
2
)
print
(
'%s say hello'
%
name)
if
__name__
=
=
'__main__'
:
t
=
Thread(target
=
sayhi,args
=
(
'egon'
,))
t.start()
print
(
'主線程'
)
#方式二
from
threading
import
Thread
import
time
class
Sayhi(Thread):
def
__init__(
self
,name):
super
().__init__()
self
.name
=
name
def
run(
self
):
time.sleep(
2
)
print
(
'%s say hello'
%
self
.name)
if
__name__
=
=
'__main__'
:
t
=
Sayhi(
'egon'
)
t.start()
print
(
'主線程'
)
|
3、守護線程
- 不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬,須要強調的是:運行完畢並不是終止運行
- 對主進程來講,運行完畢指的是主進程代碼運行完畢---主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,
- 對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢---主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
- 必定要在t.start()前設置,設置t爲守護線程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
from
threading
import
Thread
import
time
def
foo():
print
(
123
)
time.sleep(
1
)
print
(
"end123"
)
def
bar():
print
(
456
)
time.sleep(
3
)
print
(
"end456"
)
t1
=
Thread(target
=
foo)
t2
=
Thread(target
=
bar)
t1.daemon
=
True
t1.start()
t2.start()
print
(
"main-------"
)
"""
123
456
main-------
end123
end456
"""
|
4、 GIL
- 在Cpython解釋器中,同一個進程下開啓的多線程,由於有GIL的存在,同一時刻同一進程中只能有一個線程被執行
- GIL本質就是一把互斥鎖,既然是互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。
- 保護不一樣的數據的安全,就應該加不一樣的鎖。
- 在一個python的進程內,不只有test.py的主線程或者由該主線程開啓的其餘線程,還有解釋器開啓的垃圾回收等解釋器級別的線程,總之,全部線程都運行在這一個進程內
- 全部數據都是共享的,這其中,代碼做爲一種數據也是被全部線程共享的(test.py的全部代碼以及Cpython解釋器的全部代碼)
- 全部線程的任務,都須要將任務的代碼當作參數傳給解釋器的代碼去執行,即全部的線程要想運行本身的任務,首先須要解決的是可以訪問到解釋器的代碼。
- GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock
若是多個線程的target=work,那麼執行流程是: 多個線程先訪問到解釋器的代碼,即拿到執行權限,而後將target的代碼交給解釋器的代碼去執行 解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就致使了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操做,解決這種問題沒有什麼高明的方法,就是加鎖處理,以下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼
#分析: 咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,解決方案能夠是: 方案一:開啓四個進程 方案二:一個進程下,開啓四個線程 #單核狀況下,分析結果: 若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝 若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝 #多核狀況下,分析結果: 若是四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝 若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝 #結論:如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。 由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程, 每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的, 假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻, 可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了, 爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。 應用: 多線程用於IO密集型,如socket,爬蟲,web 多進程用於計算密集型,如金融分析
5、互斥鎖
- 線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來
- join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高
過程分析:全部線程搶的是GIL鎖,或者說全部線程搶的是執行權限 線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程當中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後正常執行到釋放Lock。。。這就致使了串行運行的效果 既然是串行,那咱們執行 t1.start() t1.join t2.start() t2.join() 這也是串行執行啊,爲什麼還要加Lock呢,需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from
threading
import
Thread,Lock
import
time
n
=
100
def
task():
global
n
mutex.acquire()
temp
=
n
time.sleep(
0.1
)
n
=
temp
-
1
mutex.release()
if
__name__
=
=
'__main__'
:
mutex
=
Lock()
t_l
=
[]
for
i
in
range
(
100
):
t
=
Thread(target
=
task)
t_l.append(t)
t.start()
for
t
in
t_l:
t.join()
print
(
'主'
,n)
#主 0
|
6、死鎖現象與遞歸鎖
- 進程也有死鎖與遞歸鎖
- 所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程
- 解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。 這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# 死鎖
from
threading
import
Thread,Lock
import
time
mutexA
=
Lock()
mutexB
=
Lock()
class
MyThread(Thread):
def
run(
self
):
self
.f1()
self
.f2()
def
f1(
self
):
mutexA.acquire()
print
(
'%s 拿到了A鎖'
%
self
.name)
mutexB.acquire()
print
(
'%s 拿到了B鎖'
%
self
.name)
mutexB.release()
mutexA.release()
def
f2(
self
):
mutexB.acquire()
print
(
'%s 拿到了B鎖'
%
self
.name)
time.sleep(
0.1
)
mutexA.acquire()
print
(
'%s 拿到了A鎖'
%
self
.name)
mutexA.release()
mutexB.release()
if
__name__
=
=
'__main__'
:
for
i
in
range
(
10
):
t
=
MyThread()
t.start()
#互斥鎖只能acquire一次
from
threading
import
Thread,Lock
mutexA
=
Lock()
mutexA.acquire()
mutexA.release()
# 遞歸鎖:能夠連續acquire屢次,每acquire一次計數器+1,只有計數爲0時,才能被搶到acquire
from
threading
import
Thread,RLock
import
time
mutexB
=
mutexA
=
RLock()
#一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
class
MyThread(Thread):
def
run(
self
):
self
.f1()
self
.f2()
def
f1(
self
):
mutexA.acquire()
print
(
'%s 拿到了A鎖'
%
self
.name)
mutexB.acquire()
print
(
'%s 拿到了B鎖'
%
self
.name)
mutexB.release()
mutexA.release()
def
f2(
self
):
mutexB.acquire()
print
(
'%s 拿到了B鎖'
%
self
.name)
time.sleep(
7
)
mutexA.acquire()
print
(
'%s 拿到了A鎖'
%
self
.name)
mutexA.release()
mutexB.release()
if
__name__
=
=
'__main__'
:
for
i
in
range
(
10
):
t
=
MyThread()
t.start()
|
7、信號量Semaphore
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念 進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程 同進程的同樣 Semaphore管理一個內置的計數器, 每當調用acquire()時內置計數器-1; 調用release() 時內置計數器+1; 計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。 互斥鎖同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from
threading
import
Thread,Semaphore,currentThread
import
time,random
sm
=
Semaphore(
3
)
def
task():
# sm.acquire()
# print('%s in' %currentThread().getName())
# sm.release()
with sm:
print
(
'%s in'
%
currentThread().getName())
time.sleep(random.randint(
1
,
3
))
if
__name__
=
=
'__main__'
:
for
i
in
range
(
4
):
t
=
Thread(target
=
task)
t.start()
|
8、Event
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。 若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。 爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。 在初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象,而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。 一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
- event.isSet():返回event的狀態值;
- event.wait():若是 event.isSet()==False將阻塞線程;
- event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
- event.clear():恢復event的狀態值爲False。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
from
threading
import
Thread,Event
import
time
event
=
Event()
def
student(name):
print
(
'學生%s 正在聽課'
%
name)
event.wait(
2
)
print
(
'學生%s 課間活動'
%
name)
def
teacher(name):
print
(
'老師%s 正在授課'
%
name)
time.sleep(
7
)
event.
set
()
if
__name__
=
=
'__main__'
:
stu1
=
Thread(target
=
student,args
=
(
'tom'
,))
stu2
=
Thread(target
=
student,args
=
(
'rose'
,))
stu3
=
Thread(target
=
student,args
=
(
'jack'
,))
t1
=
Thread(target
=
teacher,args
=
(
'tony'
,))
stu1.start()
stu2.start()
stu3.start()
t1.start()
from
threading
import
Thread,Event,currentThread
import
time
event
=
Event()
def
conn():
n
=
0
while
not
event.is_set():
if
n
=
=
3
:
print
(
'%s try too many times'
%
currentThread().getName())
return
print
(
'%s try %s'
%
(currentThread().getName(),n))
event.wait(
0.5
)
n
+
=
1
print
(
'%s is connected'
%
currentThread().getName())
def
check():
print
(
'%s is checking'
%
currentThread().getName())
time.sleep(
5
)
event.
set
()
if
__name__
=
=
'__main__'
:
for
i
in
range
(
3
):
t
=
Thread(target
=
conn)
t.start()
t
=
Thread(target
=
check)
t.start()
|
9、定時器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
"""
60s後換驗證碼
"""
from
threading
import
Timer
import
random
class
Code:
def
__init__(
self
):
self
.make_cache()
def
make_cache(
self
,interval
=
60
):
self
.cache
=
self
.make_code()
print
(
self
.cache)
self
.t
=
Timer(interval,
self
.make_cache)
self
.t.start()
def
make_code(
self
,n
=
4
):
res
=
''
for
i
in
range
(n):
s1
=
str
(random.randint(
0
,
9
))
s2
=
chr
(random.randint(
65
,
90
))
res
+
=
random.choice([s1,s2])
return
res
def
check(
self
):
while
True
:
code
=
input
(
'請輸入你的驗證碼>>: '
).strip()
if
code.upper()
=
=
self
.cache:
print
(
'驗證碼輸入正確'
)
self
.t.cancel()
break
obj
=
Code()
obj.check()
|
10、線程queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import
queue
q
=
queue.Queue(
3
)
#先進先出->隊列
q.put(
'first'
)
q.put(
2
)
q.put(
'third'
)
# q.put(4)
# q.put(4,block=False) #q.put_nowait(4)
# q.put(4,block=True,timeout=3)
print
(q.get())
print
(q.get())
print
(q.get())
# print(q.get(block=False)) #q.get_nowait()
# print(q.get_nowait())
# print(q.get(block=True,timeout=3))
#======================================
q
=
queue.LifoQueue(
3
)
#後進先出->堆棧
q.put(
'first'
)
q.put(
2
)
q.put(
'third'
)
print
(q.get())
print
(q.get())
print
(q.get())
#======================================
q
=
queue.PriorityQueue(
3
)
#優先級隊列
q.put((
10
,
'one'
))
q.put((
40
,
'two'
))
q.put((
30
,
'three'
))
print
(q.get())
print
(q.get())
print
(q.get())
|
11、線程池
#1 介紹 concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做 #shutdown(wait=True) 至關於進程池的pool.close()+pool.join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 #result(timeout=None) 取得結果 #add_done_callback(fn) 回調函數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
"""
shutdown
把ProcessPoolExecutor換成ThreadPoolExecutor,其他用法所有相同
"""
from
concurrent.futures
import
ThreadPoolExecutor,ProcessPoolExecutor
import
os,time,random
def
task(n):
print
(
'%s is runing'
%
os.getpid())
time.sleep(random.randint(
1
,
3
))
return
n
*
*
2
if
__name__
=
=
'__main__'
:
pool
=
ProcessPoolExecutor(max_workers
=
3
)
futures
=
[]
for
i
in
range
(
11
):
future
=
pool.submit(task,i)
futures.append(future)
pool.shutdown(
True
)
print
(
'+++>'
)
for
future
in
futures:
print
(future.result())
"""
map方法
"""
from
concurrent.futures
import
ThreadPoolExecutor,ProcessPoolExecutor
import
os,time,random
def
task(n):
print
(
'%s is runing'
%
os.getpid())
time.sleep(random.randint(
1
,
3
))
return
n
*
*
2
if
__name__
=
=
'__main__'
:
pool
=
ThreadPoolExecutor(max_workers
=
3
)
pool.
map
(task,
range
(
1
,
12
))
#map取代了for+submit
"""
add_done_callback
"""
from
concurrent.futures
import
ThreadPoolExecutor,ProcessPoolExecutor
import
requests
import
os
def
get_page(url):
print
(
'<進程%s> get %s'
%
(os.getpid(),url))
respone
=
requests.get(url)
if
respone.status_code
=
=
200
:
return
{
'url'
:url,
'text'
:respone.text}
def
parse_page(res):
res
=
res.result()
print
(
'<進程%s> parse %s'
%
(os.getpid(),res[
'url'
]))
parse_res
=
'url:<%s> size:[%s]\n'
%
(res[
'url'
],
len
(res[
'text'
]))
with
open
(
'db.txt'
,
'a'
) as f:
f.write(parse_res)
if
__name__
=
=
'__main__'
:
urls
=
[
'https://www.baidu.com'
,
'https://www.python.org'
,
'https://www.openstack.org'
,
'https://help.github.com/'
,
'http://www.sina.com.cn/'
]
p
=
ProcessPoolExecutor(
3
)
for
url
in
urls:
p.submit(get_page,url).add_done_callback(parse_page)
#parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#基於多線程實現
from
socket
import
*
from
threading
import
Thread
#多線程
# from multiprocessing import Process #多進程
def
communicate(conn):
while
True
:
try
:
data
=
conn.recv(
1024
)
if
not
data:
break
conn.send(data.upper())
except
ConnectionResetError:
break
conn.close()
def
server(ip,port):
server
=
socket(AF_INET, SOCK_STREAM)
server.bind((ip,port))
server.listen(
5
)
while
True
:
conn, addr
=
server.accept()
t
=
Thread(target
=
communicate,args
=
(conn,))
# 多線程
# t = Process(target=communicate,args=(conn,)) # 多進程
t.start()
server.close()
if
__name__
=
=
'__main__'
:
server(
'127.0.0.1'
,
8081
)
#基於線程池實現
from
socket
import
*
from
concurrent.futures
import
ThreadPoolExecutor
#線程池
# from concurrent.futures import ProcessPoolExecutor #進程池
def
communicate(conn):
while
True
:
try
:
data
=
conn.recv(
1024
)
if
not
data:
break
conn.send(data.upper())
except
ConnectionResetError:
break
conn.close()
def
server(ip,port):
server
=
socket(AF_INET, SOCK_STREAM)
server.bind((ip,port))
server.listen(
5
)
while
True
:
conn, addr
=
server.accept()
pool.submit(communicate,conn)
# #####
server.close()
if
__name__
=
=
'__main__'
:
pool
=
ThreadPoolExecutor(
2
)
#線程池
#pool=ProcessPoolExecutor() #進程池
server(
'127.0.0.1'
,
8081
)
|