進程池和線程池詳解

進程池和線程池詳解

##池
##進程池
##線程池
##爲何要有池?
##10000
#池
# 預先的開啓固定個數的進程數,當任務來臨的時候,直接提交給已經開好的進程
# 讓這個進程去執行就能夠了
池的優勢:
# 節省了進程,線程的開啓 關閉 切換都須要時間
# 而且減輕了操做系統調度的負擔
#html


一、開啓進程池。任務少於進程數。開啓進程池,這裏也要放到if name == 'main':下

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(5)
p.submit(func)
-----------結果:
start 4516
end 4516多線程

二、開啓的進程最多和進程池的大小同樣。任務多於進程數

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func)
----------------結果:
start 2548
start 5816
end 2548
start 2548
end 5816
start 5816
end 2548
end 5816
#分析:2548執行完任務以後又來了一個任務再次執行了一遍,總共開啓的進程最可能是定義的進程池的大小。這裏是以2個進程最大效率的執行這提交的4個任務併發

三、池是實現異步的

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func)
print('main',os.getpid())
-------------結果:
main 5512
start 5352
start 5376
end 5376
start 5376
end 5352
start 5352
end 5376
end 5352
#先打印的的main。咱們經常須要的是等提交到進程池中的任務都執行完畢後再往下執行主進程中的代碼。這時須要阻塞,池的阻塞是池.shutdown()app

四、submit和shutdown配合,進程池中任務都執行完畢再(作某件事)往下執行主進程代碼(shutdown後面的)。池.shutdown()

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func():
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end',os.getpid())
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func)
p.shutdown() #關閉池以後就不能繼續提交任務,而且會阻塞,直到已經提交的任務完成
print('main',os.getpid())
------------------結果:
start 5780
start 796
end 5780
start 5780
end 5780
start 5780
end 796
end 5780
main 5640
#關閉池以後繼續提交任務報錯:RuntimeError: cannot schedule new futures after shutdowndom

五、池執行的函數中傳參數,submit後面直接傳參,可是池函數必須是第一個參數。

from concurrent.futures import ProcessPoolExecutor
def func(i):
print(i)
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
p.submit(func,i)
p.shutdown()
----------結果:
0
1
2
3異步

#源碼:def submit(self, fn, *args, **kwargs):函數

六、

import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func(i):
time.sleep(random.randint(1,3))
return ii
if name == 'main':
p=ProcessPoolExecutor(2)
for i in range(4):
ret=p.submit(func,i)
print(ret,ret.result()) #這裏有阻塞,取返回值結果
p.shutdown()
print('main',os.getpid())
-------------結果:
<Future at 0x2913090 state=finished returned int> 0
<Future at 0x293b770 state=finished returned int> 1
<Future at 0x293b810 state=finished returned int> 4
<Future at 0x293b7f0 state=finished returned int> 9
main 5384
#註釋:ret後面的返回值是個Future對象,這個對象.result()返回池中任務函數的返回值。這個返回值在for循環中打印,那麼執行完一個任務打印出結果纔會提交下一個任務,這樣就不是異步了,而是同步了,不是咱們用池實現異步的效果。
七、既能實現併發又能得到返回值。得到返回值的時候不在提交任務的循環中,得到一個對象就放到列表中,從列表中取返回值,不影響它任務的提交。
import os,time,random
from concurrent.futures import ProcessPoolExecutor
def func(i):
time.sleep(random.randint(1,3))
return i
i
if name == 'main':
p=ProcessPoolExecutor(2)
li=[]
for i in range(4):
ret=p.submit(func,i)
li.append(ret)
for ret in li:print('ret-->',ret.result()) #ret這裏是同步阻塞
p.shutdown()
print('main',os.getpid())
--------------結果:
ret--> 0
ret--> 1
ret--> 4
ret--> 9
main 5928
八、進程池的特色:
#開銷大
#一個池中的任務個數限制了咱們程序的併發個數url

九、線程池提交一個任務加傳參

#從併發.將來導入線程程池執行者,實例化線程池執行者並指定線程個數,對象.提交(任務函數,參數(多個參數均可以))
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
tp=ThreadPoolExecutor(20) #線程個數能開不少,通常開啓cpu個數乘以4或5
tp.submit(func,1)
----------------結果:
start 2844 arg: 1spa

十、提交多個線程池任務

#for 循環提交多個任務
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
tp=ThreadPoolExecutor(2)
for i in range(5):
tp.submit(func,i)
------------結果:
start 2100 arg: 0
start 2100 arg: 1
start 2100 arg: 2
start 2100 arg: 3
start 2100 arg: 4
#這個建立的順序和執行的順序好像差很少,可是是實現併發的。操作系統

十一、線程池獲取線程任務函數的返回值

#接收提交,追加到列表,循環列表中對象,對象.結果()
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
return i**2
tp=ThreadPoolExecutor(2)
ret_li=[]
for i in range(5):
ret=tp.submit(func,i)
ret_li.append(ret)
for ret in ret_li:print('ret-->',ret.result())
----------------結果:
start 2856 arg: 0
start 2856 arg: 1
start 2856 arg: 2
ret--> 0
start 2856 arg: 3
ret--> 1
start 2856 arg: 4
ret--> 4
ret--> 9
ret--> 16

十二、線程池中任務都結束才能再執行的代碼

#tp.shutdown()線程池對象.關閉(),池中任務都結束才能執行關閉以後的代碼
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
tp=ThreadPoolExecutor(2)
for i in range(4):
ret=tp.submit(func,i)
tp.shutdown()
print('main')
------------------結果:
start 1468 arg: 0
start 1468 arg: 1
start 1468 arg: 2
start 1468 arg: 3
main

1三、線程池其它方法map(省代碼量批量提交任務)

#批量建立線程池任務和建立好後獲取每一個任務函數返回值更簡便
#任務返回值列表變量=線程池對象.map(任務函數,傳參可迭代對象)。建立任務數是可迭代對象個數
import os,time,random
from concurrent.futures import ThreadPoolExecutor
def func(i):
print('start', os.getpid(),' arg:',i)
time.sleep(random.randint(1,3))
return i**2
tp=ThreadPoolExecutor(2)
ret=tp.map(func,range(5))
for i in ret:print(i)
---------------結果:
start 2976 arg: 0
start 2976 arg: 1
start 2976 arg: 2
start 2976 arg: 3
0
1
start 2976 arg: 4
4

print(ret,type(ret))
--------結果:
<generator object Executor.map. .result_iterator at 0x02938300> <class 'generator'>

9
16

1四、多線程爬取網頁,獲取到結果後使用回調函數分析網頁

import requests
from concurrent.futures import ThreadPoolExecutor
def get_page(url):
res = requests.get(url)
return {'url': url, 'content': res.text} #ret的結果會做爲參數返回給綁定的函數
def parserpage(ret):
dic = ret.result()
print(dic['url'])
tp = ThreadPoolExecutor(5)
url_lst = [
'http://www.baidu.com', # 3
'http://www.cnblogs.com', # 1
'http://www.douban.com', # 1
'http://www.tencent.com',
'http://www.cnblogs.com/Eva-J/articles/8306047.html',
'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
#方法一:回調函數,使用回調函數簡便並且效率高
for url in url_lst:
ret = tp.submit(get_page, url)
ret.add_done_callback(parserpage) #綁定的函數中的內容#誰先執行完誰先使用回調函數
-------------------結果:
http://www.baidu.com
http://www.cnblogs.com/Eva-J/articles/7206498.html
http://www.tencent.com
http://www.cnblogs.com/Eva-J/articles/8306047.html
http://www.douban.com
http://www.cnblogs.com

#方法二:非回調函數。
ret_l = []
for url in url_lst:
ret = tp.submit(get_page, url)
ret_l.append(ret)
for ret in ret_l:
parserpage(ret)
print("ret_l:", ret_l)
--------------------結果:
http://www.baidu.com
http://www.cnblogs.com
http://www.douban.com
http://www.tencent.com
http://www.cnblogs.com/Eva-J/articles/8306047.html
http://www.cnblogs.com/Eva-J/articles/7206498.html
ret_l: [<Future at 0x2db7150 state=finished returned dict>, <Future at 0x2e152d0 state=finished re

1五、回調函數的使用

import time
from concurrent.futures import ThreadPoolExecutor
def son():
print(123)
time.sleep(3)
return 123
def call_back(num):
print(num.result())
t = ThreadPoolExecutor(20)
obj = t.submit(son)
print('main : ',obj)
obj.add_done_callback(call_back)
----------結果;
123
main : <Future at 0x39f5b0 state=running>
123

#代碼分析:
導入類:從併發.將來導入線程池執行者
建立池對象:線程池(大小)
獲取返回值:用變量接收池對象.提交(任務函數),即將來對象
執行回調函數:將來對象.添加完成回調(回調函數)
回調函數接收一個參數,這個參數是obj將來對象,回調函數中取任務函數返回值就是這個參數.result()

1六、線程池小結

建立一個池子

tp = ThreadPoolExcutor(池中線程/進程的個數)

異步提交任務

ret = tp.submit(函數,參數1,參數2....)

獲取返回值

ret.result()

在異步的執行完全部任務以後,主線程/主進程纔開始執行的代碼

tp.shutdown() 阻塞 直到全部的任務都執行完畢

map方法

ret = tp.map(func,iterable) 迭代獲取iterable中的內容,做爲func的參數,讓子線程來執行對應的任務

for i in ret: 每個都是任務的返回值

回調函數

ret.add_done_callback(函數名)

要在ret對應的任務執行完畢以後,直接繼續執行add_done_callback綁定的函數中的內容,而且ret的結果會做爲參數返回給綁定的函數

5個進程

20個線程

5*20 = 100個併發

相關文章
相關標籤/搜索