python 進程 multiprocessing模塊

1、multiprocess.process模塊linux

1.process類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
 
參數介紹:
group參數未使用,值始終爲None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=(1,2,'egon',)
kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
name爲子進程的名稱
 
 
2.process類的方法
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程  
 
 
3.process類的屬性
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
 
 
4.在windows運行時要注意的問題
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。
 
 
5.實例代碼
1)建立一個子進程,並運行
from multiprocessing import Process
def func(name):
    print('子進程:你好,',name)
 
if __name__ == '__main__':
    p = Process(target=func,args=('hsr',))
    p.start()
 
2)查看子進程和父進程的進程號
from multiprocessing import Process
import os
def func():
    print('子進程PID:',os.getpid())
 
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    print('父進程PID:',os.getpid())
 
3)join
from multiprocessing import Process
import time
def func(*args):
    print('*'*args[0])
    time.sleep(5)
    print('*' * args[1])
if __name__ == '__main__':
    p = Process(target=func,args=(10,20))
    p.start()
    p.join()    #主線程等待p終止
    print("-------運行完了-------")
 
4)開啓多個子進程
from multiprocessing import Process
import time
def func(no,*args):
    print(str(no)+"  :"+'*'*args[0])
    time.sleep(5)
    print(str(no)+"  :"+'*'*args[1])
if __name__ == '__main__':
    p_li = []
    for i in range(10):
        p_li.append(Process(target=func,args=(i,10,20)))
    for i in p_li:
        i.start()
 
    [i.join() for i in p_li]    #讓最後的print等子進程都結束了再執行
    print('運行完了')
 
 
5)實現多進程的另外一種方法
#自定義類 繼承Process類
#必須實現run方法,run方法就是子進程執行的方法
#若是要參數,則實現本身的init方法,並在其中調用父類的init方法
from multiprocessing import Process
import os
class MyProcess(Process):
    def __init__(self,arg1):
        super().__init__()
        self.arg1 = arg1
    def run(self):
        print("My Process:",self.pid)
        print(self.arg1)
if __name__ == '__main__':
    print(os.getpid())
    p1 = MyProcess(4)
    p1.start()
 
 
6.進程間的數據隔離
#進程間不會共享數據
from multiprocessing import Process
import os
def func():
    global n
    n = 0
    print('pid:'+str(os.getpid())+"      "+str(n))
if __name__ == '__main__':
    n = 100
    p = Process(target=func)
    p.start()
    p.join()
    print('pid:'+str(os.getpid())+"      "+str(n))
 
 
7.守護進程
守護進程(daemon)是一類在後臺運行的特殊進程,用於執行特定的系統任務。不少守護進程在系統引導的時候啓動,而且一直運行直到系統關閉。
會隨着主進程的結束而結束。
主進程建立守護進程
  其一:守護進程會在主進程代碼執行結束後就終止
  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
#守護進程
from multiprocessing import Process
import time
def func():
    while 1:
        time.sleep(2)
        print('Good')
if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True #設置子進程爲守護進程
    p.start()
    i = 10
    while i>0:
        print('Do something')
        time.sleep(5)
        i -= 1

 

 

2、進程同步編程

1.進程鎖
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理
 
#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
 
#模擬吃50我的吃5個蘋果
#使用Lock對象的acquire請求鎖,release釋放鎖
from multiprocessing import Process
from multiprocessing import Lock
import json
def eat(no,lock):
    lock.acquire()
    with open('info.json') as f:
        dic = json.load(f)
        AppleNum = dic["Apple"]
        print("蘋果個數:" + str(AppleNum))
    if AppleNum >0:
        print("%d 吃了一個蘋果" %no)
        AppleNum -= 1
        dic["Apple"] = AppleNum
        with open('info.json','w') as f:
            json.dump(dic,f)
    else:
        print("%d 沒有蘋果吃了" %no)
    lock.release()
if __name__ == '__main__':
    lock = Lock()
    for i in range(50):
        Process(target=eat, args=(i,lock)).start()

 

 
 
2.信號量
信號量(Semaphore),有時被稱爲信號燈,是在多線程環境下使用的一種設施,是能夠用來保證兩個或多個關鍵代碼段不被併發調用。在進入一個關鍵代碼段以前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那麼該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。
信號量(semaphore)的數據結構爲一個值和一個指針,指針指向等待該信號量的下一個進程。信號量的值與相應資源的使用狀況有關。當它的值大於0時,表示當前可用資源的數量;當它的值小於0時,其絕對值表示等待使用該資源的進程個數。注意,信號量的值僅能由PV操做來改變。
 
對於信號量,能夠認爲是一個倉庫,有兩個概念,容量和當前的貨物個數。
P操做從倉庫拿貨,若是倉庫中沒有貨,線程一直等待,直到V操做,往倉庫裏添加了貨物,爲了不P操做一直等待下去,會有一個超時時間。
V操做往倉庫送貨,若是倉庫滿了,線程等待,直到有P操做,從倉庫中拿走貨物,有空的位置。
建立信號量,設置容量,先有V操做,才能P操做。
P操做:貨物個數減1,減過以後,貨物個數大於等於0,說明已經拿到貨物,線程繼續。否者線程阻塞。
V操做:貨物個數加1,加過以後,貨物個數小於等於容量,說明添加成功,線程繼續。否者線程阻塞。
信號量:0≤ 信號量≤容量 ,取值 表示當前可使用的貨物;
        信號量<0 ,  取值 表示當前等待使用貨物的線程;
            信號量>容量 ,  信號量-容量 表示當前等待添加貨物的線程。
一般,信號量的容量設置很大,能夠一直V操做,不會阻塞,可是P操做的時候,極可能阻塞。
當容量爲1,也就是互斥,執行流程一定是V操做,P操做,V操做,P操做...
 
P原語操做的動做是:
(1) sem減1;
(2) 若sem減1後仍大於或等於零,則進程繼續執行;
(3) 若sem減1後小於零,則該進程被阻塞後進入與該信號相對應的隊列中,而後轉進程調度。
V原語操做的動做是:
(1) sem加1;
(2) 若相加結果大於零,則進程繼續執行;
(3) 若相加結果小於或等於零,則從該信號的等待隊列中喚醒一等待進程,而後再返回原進程繼續執行或轉進程調度。
 
 
from multiprocessing import Process,Semaphore
import time
import random
def grid(i,sem):
    sem.acquire()
    print(str(i)+'放入了格子')
    time.sleep(random.randint(2,6))
    print(str(i)+'拿出了格子')
    sem.release()
if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(20):
        Process(target=grid,args=(i,sem)).start()
 
 
3.事件
經過一個信號來控制多個進程同時執行或阻塞
一個事件被建立後,默認是阻塞狀態
 
from multiprocessing import Event
if __name__ == "__main__":
    e = Event()  # c建立一個事件
    print(e.is_set())   # 查看一個事件的狀態
    e.set() #將事件的狀態改成True
    e.wait()    #根據e.is_set()的值決定是否阻塞
    print(1235455)
    e.clear()   #將事件的狀態改成False
    e.wait()
    print(12323545555)
 
紅綠燈問題
from multiprocessing import Event,Process
import time
import random
def traffic_light(e):
    while 1:
        if e.is_set():
            e.clear()
            print('\033[31m[-----------紅燈-----------]\033[0m')
        else:
            e.set()
            print('\033[32m[-----------綠燈-----------]\033[0m')
        time.sleep(2)
 
def car(e,i):
    if not e.is_set():
        print('%s號車在等紅燈' %i)
        e.wait()    #阻塞直到狀態改變
    print('\033[0;32;40m%s號車經過\033[0m' %i)
 
if __name__ == '__main__':
    e = Event()
    light = Process(target=traffic_light,args=(e,))
    light.start()
    for i in range(20):
        time.sleep(random.random())
        cars = Process(target=car,args=(e,i))
        cars.start()

 

 

3、進程間通訊json

1.隊列Queuewindows

Queue([maxsize])
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
 
方法
Queue([maxsize])
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具備如下方法:
 
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。
 
q.get_nowait( )
同q.get(False)方法。
 
q.put(item [, block [,timeout ] ] )
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。
 
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
 
 
q.empty()
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
 
q.full()
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。

 

簡單使用:
from multiprocessing import Process,Queue
if __name__ == '__main__':
    q = Queue(5)    #建立隊列
    for i in range(5):
        q.put(i)    #放進數據
    print(q.full())
    #q.put(6)   此處阻塞
    for i in range(5):
        print(q.get())  #獲取數據
    print(q.empty())
    #q.get() 此處阻塞

 

簡單的進程間通訊:
from multiprocessing import Event,Process,Queue
def produce(q):
    q.put('from produce')
def comsume(q):
    print(q.get())
if __name__ == '__main__':
    q = Queue(5)    #建立隊列
    pro = Process(target=produce,args=(q,))
    pro.start()
    com = Process(target=comsume, args=(q,))
    com.start()
 
生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
 
1)使用Queue
from multiprocessing import Process,Queue
import time
import random
def producer(name,goods,q):
    for i in range(10):
        time.sleep(random.randint(1,4))
        print('%s生產了第%s個%s'%(name,i,goods))
        q.put('第%s個%s'%(i,goods))
def comsumer(q,name):
    while 1:
        goods = q.get()
        if goods == None:break
        print('\033[31m%s買了了%s\033[0m' % (name,goods))
        time.sleep(random.randint(2,6))
if __name__ == '__main__':
    q = Queue(10)
    p = Process(target=producer,args=('HSR','牛奶',q))
    p2 = Process(target=producer, args=('TTT', '麪包', q))
    c = Process(target=comsumer, args=(q,'Lisi'))
    c2 = Process(target=comsumer, args=(q, 'ZhangSan'))
    p.start()
    p2.start()
    c.start()
    c2.start()
    p.join()
    p2.join()
    q.put(None)
    q.put(None)
 
 
2)使用JoinableQueue
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
from multiprocessing import Process,JoinableQueue
import time
import random
def producer(name,goods,q):
    for i in range(10):
        time.sleep(random.randint(1,4))
        print('%s生產了第%s個%s'%(name,i,goods))
        q.put('第%s個%s'%(i,goods))
    q.join()    #阻塞,直到隊列中的數據被所有執行完畢
def comsumer(q,name):
    while 1:
        goods = q.get()
        if goods == None:break
        print('\033[31m%s買了了%s\033[0m' % (name,goods))
        time.sleep(random.randint(2,6))
        q.task_done()   #count - 1
if __name__ == '__main__':
    q = JoinableQueue(10)
    p = Process(target=producer,args=('HSR','牛奶',q))
    p2 = Process(target=producer, args=('TTT', '麪包', q))
    c = Process(target=comsumer, args=(q,'Lisi'))
    c2 = Process(target=comsumer, args=(q, 'ZhangSan'))
    p.start()
    p2.start()
    c.daemon = True #設置爲守護進程,主進程結束則子進程結束,而這裏的主進程等待生產進程的結束
    c2.daemon = True    #生產進程又等待消費進程消費完。因此消費者消費完了就會結束進程
    c.start()
    c2.start()
    p.join()
    p2.join()

 

 

2.管道數組

multiprocessing.Pipe
(兩端在不一樣的進程也能夠)
 
 
from multiprocessing import Pipe,Process
def func(conn1,conn2):
    conn2.close()
    while 1:
        try:
            print(conn1.recv())
        except EOFError:
            conn1.close()
            break
if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p1 = Process(target=func,args=(conn1, conn2))   #傳給不一樣進程的conn是不會相互影響的
    p1.start()
    conn1.close()
    for i in range(20):
        conn2.send("hi")
    conn2.close()

 

 
  一頭的conn都關閉後,會EOFError
 
使用管道實現生產者消費者模型
#Pipe有數據不安全性
#管道可能出現一端的多個消費者同時取一個數據
#因此能夠加上一個進程鎖來保證安全性
from multiprocessing import Pipe,Process,Lock
import time
import random
def producer(con,pro,name,goods):
    con.close()
    for i in range(8):
        time.sleep(random.randint(1,3))
        print('%s生成了第%s個%s'%(name,i,goods))
        pro.send('第%s個%s'%(i,goods))
    pro.close()
def consumer(con,pro,name,lock):
    pro.close()
    while 1:
        try:
            lock.acquire()
            goods = con.recv()
            lock.release()
            print('%s喝了%s'%(name,goods))
            time.sleep(random.random())
        except EOFError:
            lock.release()  #由於最後消費者經過異常來結束進程,因此最後一次的recv後面的lock.release不會執行,因此要在
                            #這個地方再寫一個release()
            con.close()
            break
if __name__ == '__main__':
    con, pro = Pipe()
    lock = Lock()
    p = Process(target=producer, args=(con,pro,'HSR','牛奶'))
    c = Process(target=consumer, args=(con, pro, 'TTT',lock))
    c2 = Process(target=consumer, args=(con, pro, 'TTT2',lock))
    p.start()
    c.start()
    c2.start()
    con.close()
    pro.close()

 

 

3.Manager安全

multiprocessing.Manager模塊
 
#報 AttributeError: 'ForkAwareLocal' object has no attribute 'connection' 的緣由
#運行這段代碼時,主進程執行完了,斷開了鏈接,而子進程要鏈接,此時會報錯
#因此能夠用join(),讓主進程等待子進程的結束
from multiprocessing import Manager,Process
def func(dic):
    dic['count'] -= 1
    print(dic)
if __name__ == '__main__':
    m = Manager()    建立一個Manger()
    dic = m.dict({'count':100})    #變成進程共享的字典
    p = Process(target=func, args=(dic,))
    p.start()
    p.join()    #等待子進程結束
 
這裏會有進程搶佔形成的數據不安全問題,經過加鎖解決
 
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1
 
if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

 

 

4、進程池網絡

1.爲何要有進程池?
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?
在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。
 
2.multiprocess.Pool模塊
Pool([numprocess [,initializer [, initargs]]]):建立進程池
 
參數
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組
 
2.方法
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
 
p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
 
P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
 
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
 
 
 
3.實例
1)簡單例子1(使用map)
#map若是要給函數傳參數,只能傳可迭代對象
from multiprocessing import Pool
def func(dic):
    print(dic)
def func2(dic):
    print(dic+2)
if __name__ == '__main__':
    pool = Pool(5)  #進程數,CPU核心數+1
                            #若是Pool()不傳參數,默認是cpu核心數
    pool.map(func2,range(100))   #100個任務
    #這裏自帶join效果
    pool.map(func, ['hsr','ttt'])  # 2個任務
 
 
2)簡單例子2(使用apply)
from multiprocessing import Pool
import os
import time
def func(n):
    print('[pid:%s]start id:%s'%(os.getpid(),n))
    time.sleep(1.5)
    print('\033[31m[pid:%s]end id:%s\033[0m'%(os.getpid(),n))
 
if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
        #pool.apply(func,args=(i,))  #同步
        pool.apply_async(func,args=(i,))    #異步。與主進程徹底異步,須要手動close和join
 
    pool.close()    # 結束進程池接收任務
    pool.join() # 感知進程中的任務都執行結束
 
 
3)使用進程池實現多個TCP socket鏈接
服務端:
import socket
from multiprocessing import Pool
 
def func(conn):
    while 1:
        conn.send(b'hello')
        ret = conn.recv(1024).decode('utf-8')
        if ret == 'q':
            break
        print(ret)
    conn.close()
 
if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1', 8081))
    sk.listen()
    pool = Pool(5)
    while 1:
        conn, addr = sk.accept()
        pool.apply_async(func,args=(conn,))
 
客戶端:
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8081))
ret = sk.recv(1024).decode('utf-8')
print(ret)
c = input().encode('utf-8')
sk.send(c)
sk.close()
 
 
4.進程池的返回值
from multiprocessing import Pool
def func(i):
    return i**2
if __name__ == '__main__':
    pool = Pool(5)
    #使用map的返回值
    ret = pool.map(func,range(10))
    print(ret)
    res_l = []
    for i in range(10):
        #同步
        # res = pool.apply(func,args=(i,))    #apply的結果就是func的返回值
        # print(res)
        #異步
        res = pool.apply_async(func,args=(i,))    #apply_async的結果
        #這裏若是直接使用res.get()來獲取返回值,會阻塞,因此先將其放入列表中,後面再get
        # print(res.get())    #阻塞等待func的結果
        res_l.append(res)
    for i in res_l:
        print(i.get())
 
 
5.回調函數
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
 
from multiprocessing import Pool
def func(i):
    print('in func1')
    return i**2
def func2(n):
    print('in func2')
    print(n)
if __name__ == '__main__':
    pool = Pool(5)
    pool.apply_async(func, args=(10,), callback=func2)  #執行func1,把返回值做爲fun2的參數執行func2
                                                        #回調函數func2在主進程中zhi'x
    pool.close()
    pool.join()
 
 
簡單例子:
import requests
from multiprocessing import  Pool
 
def get(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.content.decode('utf-8'),url
 
def call_back(args):
    print(args[1] +"    "+ str(len(args[0])))
 
url_lst = [
    'http://www.cnblog.com',
    'https://www.baidu.com',
    'http://www.sohu.com'
]
 
if __name__ == '__main__':
    pool = Pool(5)
    for i in url_lst:
        pool.apply_async(get,args=(i,),callback=call_back)
    pool.close()
    pool.join()
相關文章
相關標籤/搜索