讓計算機程序併發的運行是一個常常被討論的話題,今天我想討論一下Python下的各類併發方式。html
多線程幾乎是每個程序猿在使用每一種語言時都會首先想到用於解決併發的工具(JS程序員請回避),使用多線程能夠有效的利用CPU資源(Python例外)。然而多線程所帶來的程序的複雜度也不可避免,尤爲是對競爭資源的同步問題。java
然而在python中因爲使用了全局解釋鎖(GIL)的緣由,代碼並不能同時在多核上併發的運行,也就是說,Python的多線程不能併發,不少人會發現使用多線程來改進本身的Python代碼後,程序的運行效率卻降低了,這是多麼蛋疼的一件事呀!若是想了解更多細節,推薦閱讀這篇文章。實際上使用多線程的編程模型是很困難的,程序員很容易犯錯,這並非程序員的錯誤,由於並行思惟是反人類的,咱們大多數人的思惟是串行(精神分裂不討論),並且馮諾依曼設計的計算機架構也是以順序執行爲基礎的。因此若是你老是不能把你的多線程程序搞定,恭喜你,你是個思惟正常的程序猿:)python
Python提供兩組線程的接口,一組是thread模塊,提供基礎的,低等級(Low Level)接口,使用Function做爲線程的運行體。還有一組是threading模塊,提供更容易使用的基於對象的接口(相似於Java),能夠繼承Thread對象來實現線程,還提供了其它一些線程相關的對象,例如Timer,Lockgit
使用thread模塊的例子程序員
1
2
3
4
5
|
import
thread
def
worker():
"""thread worker function"""
print
'Worker'
thread.start_new_thread(worker)
|
使用threading模塊的例子github
1
2
3
4
5
6
|
import
threading
def
worker():
"""thread worker function"""
print
'Worker'
t
=
threading.Thread(target
=
worker)
t.start()
|
或者Java Style算法
1
2
3
4
5
6
7
8
9
10
|
import
threading
class
worker(threading.Thread):
def
__init__(
self
):
pass
def
run():
"""thread worker function"""
print
'Worker'
t
=
worker()
t.start()
|
因爲前文提到的全局解釋鎖的問題,Python下比較好的並行方式是使用多進程,這樣能夠很是有效的使用CPU資源,並實現真正意義上的併發。固然,進程的開銷比線程要大,也就是說若是你要建立數量驚人的併發進程的話,須要考慮一下你的機器是否是有一顆強大的心。sql
Python的mutliprocess模塊和threading具備相似的接口。apache
1
2
3
4
5
6
7
8
|
from
multiprocessing
import
Process
def
worker():
"""thread worker function"""
print
'Worker'
p
=
Process(target
=
worker)
p.start()
p.join()
|
因爲線程共享相同的地址空間和內存,因此線程之間的通訊是很是容易的,然而進程之間的通訊就要複雜一些了。常見的進程間通訊有,管道,消息隊列,Socket接口(TCP/IP)等等。編程
Python的mutliprocess模塊提供了封裝好的管道和隊列,能夠方便的在進程間傳遞消息。
Python進程間的同步使用鎖,這一點喝線程是同樣的。
另外,Python還提供了進程池Pool對象,能夠方便的管理和控制線程。
隨着大數據時代的到臨,摩爾定理在單機上彷佛已經失去了效果,數據的計算和處理須要分佈式的計算機網絡來運行,程序並行的運行在多個主機節點上,已是如今的軟件架構所必需考慮的問題。
遠程主機間的進程間通訊有幾種常見的方式
TCP/IP是全部遠程通訊的基礎,然而API比較低級別,使用起來比較繁瑣,因此通常不會考慮
遠程對象是更高級別的封裝,程序能夠想操做本地對象同樣去操做一個遠程對象在本地的代理。遠程對象最廣爲使用的規範CORBA,CORBA最大的好處是能夠在不一樣語言和平臺中進行通訊。當讓不用的語言和平臺還有一些各自的遠程對象實現,例如Java的RMI,MS的DCOM
Python的開源實現,有許多對遠程對象的支持
比起RPC或者遠程對象,消息是一種更爲靈活的通訊手段,常見的支持Python接口的消息機制有
在遠程主機上執行併發和本地的多進程並無很是大的差別,都須要解決進程間通訊的問題。固然對遠程進程的管理和協調比起本地要複雜。
Python下有許多開源的框架來支持分佈式的併發,提供有效的管理手段包括:
Celery是一個很是成熟的Python分佈式框架,能夠在分佈式的系統中,異步的執行任務,並提供有效的管理和調度功能。參考這裏
SCOOP (Scalable COncurrent Operations in Python)提供簡單易用的分佈式調用接口,使用Future接口來進行併發。
相比起Celery和SCOOP,Dispy提供更爲輕量級的分佈式並行服務
PP (Parallel Python)是另一個輕量級的Python並行服務, 參考這裏
Asyncoro是另外一個利用Generator實現分佈式併發的Python框架,
固然還有許多其它的系統,我沒有一一列出
另外,許多的分佈式系統多提供了對Python接口的支持,例如Spark
還有一種併發手段並不常見,咱們能夠稱之爲僞線程,就是看上去像是線程,使用的接口相似線程接口,可是實際使用非線程的方式,對應的線程開銷也不存的。
greenlet提供輕量級的coroutines來支持進程內的併發。
greenlet是Stackless的一個副產品,使用tasklet來支持一中被稱之爲微線程(mirco-thread)的技術,這裏是一個使用greenlet的僞線程的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from
greenlet
import
greenlet
def
test1():
print
12
gr2.switch()
print
34
def
test2():
print
56
gr1.switch()
print
78
gr1
=
greenlet(test1)
gr2
=
greenlet(test2)
gr1.switch()
|
1
2
3
|
12
56
34
|
eventlet,gevent和concurence都是基於greenlet提供併發的。
eventlet是一個提供網絡調用併發的Python庫,使用者能夠以非阻塞的方式來調用阻塞的IO操做。
1
2
3
4
5
6
7
8
9
10
11
12
|
import
eventlet
from
eventlet.green
import
urllib2
urls
=
[
'http://www.google.com'
,
'http://www.example.com'
,
'http://www.python.org'
]
def
fetch(url):
return
urllib2.urlopen(url).read()
pool
=
eventlet.GreenPool()
for
body
in
pool.imap(fetch, urls):
print
(
"got body"
,
len
(body))
|
執行結果以下
1
2
3
|
(
'got body'
,
17629
)
(
'got body'
,
1270
)
(
'got body'
,
46949
)
|
gevent和eventlet相似,關於它們的差別你們能夠參考這篇文章
1
2
3
4
5
6
7
|
import
gevent
from
gevent
import
socket
urls
=
[
'www.google.com'
,
'www.example.com'
,
'www.python.org'
]
jobs
=
[gevent.spawn(socket.gethostbyname, url)
for
url
in
urls]
gevent.joinall(jobs, timeout
=
2
)
print
[job.value
for
job
in
jobs]
|
1
|
[
'206.169.145.226'
,
'93.184.216.34'
,
'23.235.39.223'
]
|
concurence是另一個利用greenlet提供網絡併發的開源庫,我沒有用過,你們能夠本身嘗試一下。
一般須要用到併發的場合有兩種,一種是計算密集型,也就是說你的程序須要大量的CPU資源;另外一種是IO密集型,程序可能有大量的讀寫操做,包括讀寫文件,收發網絡請求等等。
對應計算密集型的應用,咱們選用著名的蒙特卡洛算法來計算PI值。基本原理以下
蒙特卡洛算法利用統計學原理來模擬計算圓周率,在一個正方形中,一個隨機的點落在1/4圓的區域(紅色點)的機率與其面積成正比。也就該機率 p = Pi * R*R /4 : R* R , 其中R是正方形的邊長,圓的半徑。也就是說該機率是圓周率的1/4, 利用這個結論,只要咱們模擬出點落在四分之一圓上的機率就能夠知道圓周率了,爲了獲得這個機率,咱們能夠經過大量的實驗,也就是生成大量的點,看看這個點在哪一個區域,而後統計出結果。
基本算法以下:
1
2
3
4
5
|
from
math
import
hypot
from
random
import
random
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
|
經過大量的併發,咱們能夠快速的運行屢次試驗,試驗的次數越多,結果越接近真實的圓周率。
這裏給出不一樣併發方法的程序代碼
咱們先在單線程,但進程運行,看看性能如何
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from
math
import
hypot
from
random
import
random
import
eventlet
import
time
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
def
calcPi(nbFutures, tries):
ts
=
time.time()
result
=
map
(test, [tries]
*
nbFutures)
ret
=
4.
*
sum
(result)
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
print
calcPi(
3000
,
4000
)
|
爲了使用線程池,咱們用multiprocessing的dummy包,它是對多線程的一個封裝。注意這裏代碼雖然一個字的沒有提到線程,但它千真萬確是多線程。
經過測試咱們開(jing)心(ya)的發現,果真不出所料,當線程池爲1是,它的運行結果和沒有併發時同樣,當咱們把線程池數字設置爲5時,耗時幾乎是沒有併發的2倍,個人測試數據從5秒到9秒。因此對於計算密集型的任務,仍是放棄多線程吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from
multiprocessing.dummy
import
Pool
from
math
import
hypot
from
random
import
random
import
time
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
def
calcPi(nbFutures, tries):
ts
=
time.time()
p
=
Pool(
1
)
result
=
p.
map
(test, [tries]
*
nbFutures)
ret
=
4.
*
sum
(result)
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
if
__name__
=
=
'__main__'
:
p
=
Pool()
print
(
"pi = {}"
.
format
(calcPi(
3000
,
4000
)))
|
理論上對於計算密集型的任務,使用多進程併發比較合適,在如下的例子中,進程池的規模設置爲5,修改進程池的大小能夠看到對結果的影響,當進程池設置爲1時,和多線程的結果所需的時間相似,由於這時候並不存在併發;當設置爲2時,響應時間有了明顯的改進,是以前沒有併發的一半;然而繼續擴大進程池對性能影響並不大,甚至有所降低,也許個人Apple Air的CPU只有兩個核?
小心,若是你設置一個很是大的進程池,你會遇到 Resource temporarily unavailable的錯誤,系統並不能支持建立太多的進程,畢竟資源是有限的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from
multiprocessing
import
Pool
from
math
import
hypot
from
random
import
random
import
time
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
def
calcPi(nbFutures, tries):
ts
=
time.time()
p
=
Pool(
5
)
result
=
p.
map
(test, [tries]
*
nbFutures)
ret
=
4.
*
sum
(result)
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
if
__name__
=
=
'__main__'
:
print
(
"pi = {}"
.
format
(calcPi(
3000
,
4000
)))
|
不管是gevent仍是eventlet,由於不存在實際的併發,響應時間和沒有併發區別不大,這個和測試結果一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import
gevent
from
math
import
hypot
from
random
import
random
import
time
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
def
calcPi(nbFutures, tries):
ts
=
time.time()
jobs
=
[gevent.spawn(test, t)
for
t
in
[tries]
*
nbFutures]
gevent.joinall(jobs, timeout
=
2
)
ret
=
4.
*
sum
([job.value
for
job
in
jobs])
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
print
calcPi(
3000
,
4000
)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from
math
import
hypot
from
random
import
random
import
eventlet
import
time
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
def
calcPi(nbFutures, tries):
ts
=
time.time()
pool
=
eventlet.GreenPool()
result
=
pool.imap(test, [tries]
*
nbFutures)
ret
=
4.
*
sum
(result)
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
print
calcPi(
3000
,
4000
)
|
SCOOP中的Future接口符合PEP-3148的定義,也就是在Python3中提供的Future接口。
在缺省的SCOOP配置環境下(單機,4個Worker),併發的性能有提升,可是不如兩個進程池配置的多進程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from
math
import
hypot
from
random
import
random
from
scoop
import
futures
import
time
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
def
calcPi(nbFutures, tries):
ts
=
time.time()
expr
=
futures.
map
(test, [tries]
*
nbFutures)
ret
=
4.
*
sum
(expr)
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
if
__name__
=
=
"__main__"
:
print
(
"pi = {}"
.
format
(calcPi(
3000
,
4000
)))
|
任務代碼
1
2
3
4
5
6
7
8
9
10
11
|
from
celery
import
Celery
from
math
import
hypot
from
random
import
random
app
=
Celery(
'tasks'
, backend
=
'amqp'
, broker
=
'amqp://guest@localhost//'
)
app.conf.CELERY_RESULT_BACKEND
=
'db+sqlite:///results.sqlite'
@app
.task
def
test(tries):
return
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
|
客戶端代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from
celery
import
group
from
tasks
import
test
import
time
def
calcPi(nbFutures, tries):
ts
=
time.time()
result
=
group(test.s(tries)
for
i
in
xrange
(nbFutures))().get()
ret
=
4.
*
sum
(result)
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
print
calcPi(
3000
,
4000
)
|
Asyncoro的測試結果和非併發保持一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import
asyncoro
from
math
import
hypot
from
random
import
random
import
time
def
test(tries):
yield
sum
(hypot(random(), random()) <
1
for
_
in
range
(tries))
def
calcPi(nbFutures, tries):
ts
=
time.time()
coros
=
[ asyncoro.Coro(test,t)
for
t
in
[tries]
*
nbFutures]
ret
=
4.
*
sum
([job.value()
for
job
in
coros])
/
float
(nbFutures
*
tries)
span
=
time.time()
-
ts
print
"time spend "
, span
return
ret
print
calcPi(
3000
,
4000
)
|
IO密集型的任務是另外一種常見的用例,例如網絡WEB服務器就是一個例子,每秒鐘能處理多少個請求時WEB服務器的重要指標。
咱們就以網頁讀取做爲最簡單的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from
math
import
hypot
import
time
import
urllib2
urls
=
[
'http://www.google.com'
,
'http://www.example.com'
,
'http://www.python.org'
]
def
test(url):
return
urllib2.urlopen(url).read()
def
testIO(nbFutures):
ts
=
time.time()
map
(test, urls
*
nbFutures)
span
=
time.time()
-
ts
print
"time spend "
, span
testIO(
10
)
|
經過測試咱們能夠發現,對於IO密集型的任務,使用多線程,或者是多進程均可以有效的提升程序的效率,而使用僞線程性能提高很是顯著,eventlet比沒有併發的狀況下,響應時間從9秒提升到0.03秒。同時eventlet/gevent提供了非阻塞的異步調用模式,很是方便。這裏推薦使用線程或者僞線程,由於在響應時間相似的狀況下,線程和僞線程消耗的資源更少。
Python提供了不一樣的併發方式,對應於不一樣的場景,咱們須要選擇不一樣的方式進行併發。選擇合適的方式,不但要對該方法的原理有所瞭解,還應該作一些測試和試驗,數據纔是你作選擇的最好參考。
轉自博客在線