進程池

進程池:python

1.進程池初識,2.效率比較,3.同步和異步,4.進程池的返回值和回調函數,5.socket併發的服務端算法


爲何要有進程池?進程池的概念。數組

在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?網絡

在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。併發

 

multiprocess.Pool模塊:

Pool([numprocess  [,initializer [, initargs]]]):建立進程池
1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
3 initargs:是要傳給initializer的參數組
參數介紹
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

主要方法
主要方法
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
其餘方法(瞭解)

 

進程池初識:app

#進程池
#開闢一個空間,固定放着固定數目的進程,好比5個。如今有50個任務,每次任務一執行,就去池子裏找這5個進程,一次執行五個任務,
#執行完的任務,把進程放回進程池,供其餘任務繼續使用。從而大大減小了進程的開啓和銷燬的內存佔用,提升了效率

#更高級的進程池  #···python裏沒有
#假設 進程池最少的時候有n個進程,最可能是m個
#當咱們訪問網站,用戶量多的時候,進程池的進程,就會根據算法,慢慢增長,增長到m個
#當訪問量,或者任務降低,進程池就會 減減減  減到最少 n個


#信號量:是同一段代碼,只能同一時間由幾個進程執行,可是這些進程數都是要悉數開啓和銷燬的,若是有200個進程,這些進程都會佔用內存
#進程池:進程都是固定的,不用每次都開啓和銷燬

 

進程池和普通多進程的效率比較:異步

from multiprocessing import Pool,Process
import time

def func(n):
    for i in range(10):
        print(n+1)

if __name__ == '__main__':
    pool = Pool(5) #通常進程數是 cpu核數+1
    start = time.time()
    pool.map(func,range(100)) #開啓了一百個任務,map是自帶join,close的
    #map的功能最多就只能這樣了,若是想傳更多參數,能夠iterable裏能夠傳tuple,list..
    t1 = time.time() - start

    start2 = time.time()
    p_lst = []
    for i in range(100):
        p = Process(target=func,args=(i,))
        p.start()
        p_lst.append(p)
    [p.join() for p in p_lst]
    t2 = time.time() - start2
    print(t1,t2) # 0.19 VS  2.61 進程池完勝
進程池和多進程-效率比較

 

同步: apply   異步: apply_async(func, args=(args1,args2,...),callback=func2):socket

import os
import time
from multiprocessing import Pool,Process

def func(n):
    print('%s 開始了~~~'%n,os.getpid())
    time.sleep(1)
    print('%s 結束了'%n,os.getpid())

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
        # pool.apply(func,args=(i,))  #apply 進程池的同步調用(天然不須要close,join),要等待進程中的任務完結,纔會執行下一個任務.基本上不會用
        pool.apply_async(func,args=(i,))#apply_async異步,  一但有任務結束了,它所使用的進程(pid)立刻會被其餘任務使用
                                        #須要close和join,否則會致使主代碼的結束,從而結束進程中的任務
    pool.close() #結束進程池接收任務
    pool.join()  #感知全部任務的結束,而後在執行主程序接下來的代碼
同步和異步-進程池

 

#對於Proces開啓的進程,是不能接收子進程的返回值的,想獲取,只能經過 IPC 進程間通訊:隊列(子進程put,主進程get),管道
#可是進程池 是能夠直接接收子進程的返回值的
#apply的結果就是func的返回值async

#經過get方法,獲取 apply_async的返回值ide

#對於Proces開啓的進程,是不能接收子進程的返回值的,想獲取,只能經過 IPC  進程間通訊:隊列(子進程put,主進程get),管道
#可是進程池 是能夠直接接收子進程的返回值的
#apply的結果就是func的返回值

from multiprocessing import Pool
import time

def func(n):
    time.sleep(0.2)
    return n*n

if __name__ == '__main__':
    pool = Pool(5)
    ret_lst=[]
    for i in range(10):
        # ret = pool.apply(func, args=(i,))
        ret = pool.apply_async(func,args=(i,))
        #print(ret.get()) #get會阻塞着,直到拿到ret的結果。每一個ret立刻跟着一個get,因此會按順序一個個print
                        #一旦異步提交了一個任務,返回一個對象,這個對象得到一個get方法,獲取這個任務的返回值,因此get能夠替代close和join
        ret_lst.append(ret)
    for ret in ret_lst:print(ret.get())
    # pool.close()
    # pool.join()

    # ret = pool.map(func,range(10))
    # print(ret)  #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    #由於map自帶close和join,它會把全部的值都計算好了,再打印出來。
    #因此雖然map簡單,可是apply_async能夠實時提交,仍是要用後者
進程池的返回值

 

回調函數不能傳參數,惟一的參數就是註冊函數的返回值

回調函數
import os
from multiprocessing import Pool
def func1(n):
    print('in func1',os.getpid())
    return n*n

def func2(nn):
    print('in func2',os.getpid())
    print(nn)

if __name__ == '__main__':
    print('主進程 :',os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply_async(func1,args=(10,),callback=func2) #回調函數不能傳參數,惟一的參數就是註冊函數的返回值
    p.close()                                          #回調函數是在主進程裏執行的
    p.join()
進程池的回調函數
import requests
from multiprocessing import Pool

def get(url):  #讓併發進程去獲取url,把網絡延時交給併發,把數據處理返回給主進程
    res = requests.get(url)
    if res.status_code == 200:
        return url,res.content.decode('utf8')

def call_back(args):
    url,content = args
    print(url,len(content))

if __name__ == '__main__':
    url_lst = [
        'https://www.cnblogs.com/',
        'http://www.baidu.com',
        'https://www.sogou.com/',
        'http://www.sohu.com/',
    ]
    p = Pool(5)
    for url in url_lst:
        p.apply_async(get,args=(url,),callback=call_back)
    p.close()
    p.join()
小爬蟲-回調函數
url = r'https://movie.douban.com/subject/26281899/?from=subject-page'
import requests
res = requests.get(url)
print(res) #返回爲一個 <Response [200]>
print(res.status_code) #狀態碼:200
print(res.content)      #二進制的網頁源碼 (無格式版的)
print(res.text)         #網頁源碼

# from urllib.request import urlopen
# ret = urlopen(url)
# print(ret.read().decode('utf8')) #得到一個跟原網頁源碼同樣格式的內容
requests模塊
    # apply
        # 同步的:只有當func執行完以後,纔會繼續向下執行其餘代碼
        # ret = apply(func,args=())
        # 返回值就是func的return
    # apply_async
        # 異步的:當func被註冊進入一個進程以後,程序就繼續向下執行
        # apply_async(func,args=())
        # 返回值 : apply_async返回的對象obj
        #          爲了用戶能從中獲取func的返回值obj.get()
        # get會阻塞直到對應的func執行完畢拿到結果
        # 使用apply_async給進程池分配任務,
        # 須要先close後join來保持多進程和主進程代碼的同步性
複習

 

使用進程池來實現socket服務端的併發:

import socket
from multiprocessing import Pool

def server(conn):
    conn.send(b'hi')
    ret = conn.recv(1024).decode('utf8')
    print(ret)
    conn.send(b'hello')
    conn.close()

if __name__ == '__main__':
    pool = Pool(5)
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        pool.apply_async(server,args=(conn,))
    sk.close()
    # pool.close()
    # pool.join()
server端
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))

print(sk.recv(1024))
msg = input('>>>> ')
sk.send(msg.encode('utf8'))
msg = sk.recv(1024).decode('utf8')
print(msg)
# sk.close()
client端
相關文章
相關標籤/搜索