[ python ] 進程的操做

目錄

(見右側目錄欄導航)
- 1. 前言
- 2. multiprocess模塊
- 2.1 multiprocess.Process模塊
    - 2.2 使用Process模塊建立進程
    - 2.3 守護進程
    - 2.4 socket聊天併發實例
- 3. 進程鎖 - multiprocess.Lock
- 4. 信號量(multiprocessing.Semaphore)
- 5. 事件
- 6. 進程間通訊
    - 6.1 隊列
    - 6.2 管道(瞭解)
    - 6.3 進程之間數據共享
- 7. 進程池
    - 7.1 爲何要有進程池
    - 7.2 Pool模塊
python

 

1. 前言

運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發的效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所學的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。linux

 

2. multiprocess模塊

mulitiprocess 是python的一個內建模塊,包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。數據庫

 

2.1 multiprocess.Process模塊

 Process模塊介紹
process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:編程

group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name爲子進程的名稱

 

 

方法介紹:json

p.start():啓動進程,並調用該子進程中的p.run() p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 p.is_alive():若是p仍然運行,返回True p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程  

 

 

屬性介紹:數組

p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

 

注意:
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。安全

 

2.2 使用Process模塊建立進程

在一個python進程中開啓子進程,start方法和併發效果。服務器

from multiprocessing import Process import time def f(name):     print('hello,', name)     print('我是子進程.') if __name__ == '__main__':     p = Process(target=f, args=('hkey',))     p.start()     time.sleep(1)     print('執行主進程的內容了。')
在python中啓動一個子進程
from multiprocessing import Process import time def f(name):     print('hello,', name)     time.sleep(1)     print('我是子進程.') if __name__ == '__main__':     p = Process(target=f, args=('hkey',))     p.start()     p.join()  # 等待子進程執行完畢而後在繼續執行主進程
    print('執行主進程的內容了。')
join方法
from multiprocessing import Process import time import os def f(name):     print('hello,', name)     time.sleep(1)     print('子進程id:', os.getpid())  # 子進程id
    print('父進程id:', os.getppid())  # 子進程的父進程id
    print('我是子進程.') if __name__ == '__main__':     p = Process(target=f, args=('hkey',))     p.start()     p.join()  # 等待子進程執行完畢而後在繼續執行主進程
    print('執行主進程的內容了。')     print('主進程的id:', os.getpid())  # 主進程的id
查看主進程和子進程的進程號

 

 

進階,多個進程同時運行(注意:子進程的執行順序不是根據啓動順序決定的)網絡

from multiprocessing import Process import time def f(name):     print('hello,', name)     time.sleep(1) if __name__ == '__main__':     for i in range(5):  # 產生 5 個子進程同時運行
        p = Process(target=f, args=('hkey',))         p.start()
多個進程同時運行
from multiprocessing import Process import time def f(name):     print('hello,', name)     time.sleep(1) if __name__ == '__main__':     p_lst = []     for i in range(5):  # 產生 5 個子進程同時運行
        p = Process(target=f, args=('hkey',))         p.start()         p_lst.append(p)     [p.join() for p in p_lst]  # 等待全部子進程執行結束在繼續執行後續的代碼
    print('hello  world.')
多個進程同時運行,join方法的使用

 

 

除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式多線程

from multiprocessing import Process import time class MyProcess(Process):     def __init__(self, name):         super(MyProcess, self).__init__()  # 須要繼承父類中的初始化方法
        self.name = name     def run(self):  # 必須重寫的方法
        print('hello,', self.name)         time.sleep(1) if __name__ == '__main__':     p_lst = []     for i in range(5):         p = MyProcess(str(i))         p.start()         p_lst.append(p)     [p.join() for p in p_lst]     print('hello hkey')
經過繼承Process類開啓進程

進程之間數據是相互隔離的。
這裏能夠想象電腦上經過開啓QQ和wrod文檔,它們是相互獨立的兩個進程,不存在任何數據共享。

 

2.3 守護進程

守護進程:會隨着主進程的結束而結束的進程。
主進程建立守護進程:
        (1) 守護進程會在主進程代碼執行結束後就終止
        (2) 守護進程內沒法再開啓子進程,不然拋出異常

注意:進程之間是相互獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process import time import os class MyProcess(Process):     def __init__(self, name):         super(MyProcess, self).__init__()         self.name = name         self.daemon = True  # True爲開啓守護進程
    def run(self):         print(os.getpid(), self.name)         print('%s正在吃飯.' % self.name) if __name__ == '__main__':     p = MyProcess('小王')     p.start()     time.sleep(4)     print('主進程.')
守護進程的啓動
from multiprocessing import Process import time def foo():     print('123')     time.sleep(3)     print('end123') if __name__ == '__main__':     p1 = Process(target=foo)     p1.daemon = True     p1.start()     time.sleep(0.1)     print('main-end')
主進程代碼執行結束守護進程當即結束

 

 

2.4 socket聊天併發實例

from multiprocessing import Process import socket class MyProcess(Process):     def __init__(self, conn, addr):         super(MyProcess, self).__init__()         self.conn = conn         self.addr = addr     def run(self):         try:             while True:                 res = self.conn.recv(1024)                 if not res: break                 print(self.addr, res)                 self.conn.send(res.upper())         except ConnectionResetError as e:             print('Error', e) if __name__ == '__main__':     sk_server = socket.socket()     sk_server.bind(('localhost', 8080))     sk_server.listen(5)     while True:         conn, addr = sk_server.accept()         conn.send('welcome QQ'.encode())         p = MyProcess(conn, addr)         p.start()
使用多進程實現socket聊天併發 - server.py
import socket sk_client = socket.socket() sk_client.connect(('localhost', 8080)) res = sk_client.recv(1024).decode() print(res) while True:     cmd = input('>>>').strip()     if not cmd: continue     sk_client.send(cmd.encode())     res = sk_client.recv(1024)     print(res)
client.py

 

 

3. 進程鎖 - multiprocess.Lock

經過上面的學習,實現了併發的效果,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,但也給咱們帶來了新的問題。
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。

import os import time import random from multiprocessing import Process, Lock class Work(Process):     def __init__(self, n):         super(Work, self).__init__()         self.n = n     def run(self):         print('%s: %s is running.' % (self.n, os.getppid()))         time.sleep(random.random())         print('%s:%s is done' % (self.n, os.getpid())) if __name__ == '__main__':     for i in range(5):         p = Work(i)         p.start()
多進程搶佔輸出資源
import os import time import random from multiprocessing import Process, Lock class Work(Process):     def __init__(self, n, lock):         super(Work, self).__init__()         self.n = n         self.lock = lock     def run(self):         self.lock.acquire()         print('%s: %s is running.' % (self.n, os.getppid()))         time.sleep(random.random())         print('%s:%s is done' % (self.n, os.getpid()))         self.lock.release() if __name__ == '__main__':     lock = Lock()     for i in range(5):         p = Work(i, lock)         p.start()
使用鎖維護執行順序(由併發變成了串行,犧牲了運行效率,但避免了競爭)

上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。

接下來,咱們以模擬搶票爲例,來看看數據安全的重要性。

# 文件 ticket.json 的內容爲:{"count":1} # 注意必定要用雙引號,否則json沒法識別 # 併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process, Lock import time, json def search():     with open('ticket.json', 'r') as f:         dic = json.load(f)         print('\033[33;1m餘票:\033[0m', dic['count']) class Get(Process):     def __init__(self, name):         super(Get, self).__init__()         self.name = name     def run(self):         with open('ticket.json', 'r') as f:             ticket = json.load(f)         if ticket['count'] >= 1:             ticket['count'] -= 1             time.sleep(0.5)             with open('ticket.json', 'w') as f:                 json.dump(ticket, f)             print('\033[32;1m%s,購票成功.\033[0m' % self.name)         else:             print('\033[31;1m%s,購票失敗.\033[0m' % self.name) if __name__ == '__main__':     search()     for i in ('xiaosan', 'xiaowang', 'xiaogou'):         g = Get(i)         g.start()
多進程同時搶票
# 文件 ticket.json 的內容爲:{"count":1} # 注意必定要用雙引號,否則json沒法識別 # 併發變成串行,犧牲了效率,但保證了數據的準確性
from multiprocessing import Process, Lock import time, json def search():     with open('ticket.json', 'r') as f:         dic = json.load(f)         print('\033[33;1m餘票:\033[0m', dic['count']) class Get(Process):     def __init__(self, name, lock):         super(Get, self).__init__()         self.name = name         self.lock = lock     def run(self):         self.lock.acquire()         with open('ticket.json', 'r') as f:             ticket = json.load(f)         if ticket['count'] >= 1:             ticket['count'] -= 1             time.sleep(0.5)             with open('ticket.json', 'w') as f:                 json.dump(ticket, f)             print('\033[32;1m%s,購票成功.\033[0m' % self.name)         else:             print('\033[31;1m%s,購票失敗.\033[0m' % self.name)         self.lock.release() if __name__ == '__main__':     search()     lock = Lock()     for i in ('xiaosan', 'xiaowang', 'xiaogou'):         g = Get(i, lock)         g.start()
使用鎖來保證數據安全

 

4. 信號量(multiprocessing.Semaphore)

互斥鎖(multiprocessing.Lock)同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

from multiprocessing import Process, Semaphore
import time
import random


def ktv(name, sem):
    sem.acquire()   # 每次進來一我的就拿一把鑰匙
    print('\033[32;1m%s 進入ktv.\033[0m' % name)
    time.sleep(random.randint(1, 3))
    print('\033[31;1m%s 退出ktv.\033[0m' % name)
    sem.release()   # 退出就釋放鑰匙

if __name__ == '__main__':
    sem = Semaphore(5)  # 設定共享的ktv數量
    for i in range(20): # 20我的要使用這5個資源
        p = Process(target=ktv, args=(str(i), sem))
        p.start()
信號量實例

 

 

5. 事件

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False
set:將「Flag」設置爲True

能夠這樣理解:兩個子類共享父類的一個全局變量。

from multiprocessing import Process, Event
import time


def cars(e, n):
    while True:
        if not e.is_set():  # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait()  # 阻塞,等待is_set()的值變成True,模擬信號燈爲綠色,等待時間爲:5秒
            print('\033[32m綠燈亮\033[0m,car%s通行' % n)
            if not e.is_set():
                continue
            break


def police_car(e, n):
    while True:
        if not e.is_set():  # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(0.5)  # 阻塞,等待設置等待時間,等待0.5s以後沒有等到綠燈就闖紅燈走了
            if not e.is_set():
                print('\033[31;1m紅燈,警車先走\033[0m')
            else:
                print('\033[31;1m綠燈,警車走\033[0m')
        break


def light(e):
    '''紅綠燈控制器'''
    while True:
        time.sleep(5)  # 紅燈和綠燈切換時間:5秒
        if e.is_set():
            print(e.is_set())
            e.clear()
        else:
            print(e.is_set())
            e.set()


if __name__ == '__main__':
    e = Event()
    for i in range(3):
        car = Process(target=cars, args=(e, str(i)))  # 建立3個進程控制3輛車
        car.start()
    for i in range(1):
        pli = Process(target=police_car, args=(e, str(i)))  # 建立1個進程控制1輛警車
        pli.start()
    p = Process(target=light, args=(e,))  # 建立一個進程控制紅綠燈
    p.start()
事件實例

 

6. 進程間通訊

進程間通訊簡稱:IPC(Inter-Process Communication),隊列通常遵循先進先出的規則。

6.1 隊列

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

Queue([maxsize]) 

 
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。

Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
multiprocessing.Queue 方法介紹
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.cancel_join_thread() 
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。
q.join_thread() 
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
其餘一些不經常使用的方法

 

代碼實現

from multiprocessing import Process, Queue

q = Queue(3)  # 3 設定隊列的最大值
q.put(1)  # put 向隊列中添加值
q.put(2)
q.put(3)
# q.put(4)  # 若是隊列已經滿了,程序會阻塞在這裏,等待隊列中的值被取走,再將數據放入隊列。
try:
    q.put_nowait(3)  # 使用 put_nowait 若是隊列滿了不會阻塞,可是會由於隊列滿了拋出異常
except:  # 使用 try...except 可使程序不會一直阻塞到這裏,可是 q.put(4) 這個消息會丟掉
    print('隊列已經滿了。')

print(q.full())  # True 表示隊列已滿
print(q.get())  # get 從隊列中取值
print(q.get())
print(q.get())
# print(q.get())  # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait()  # 使用 get_nowait 若是隊列空了不會阻塞,可是會由於隊列滿了拋出異常
except:  # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了。')

print(q.empty())
隊列基本用法

 

上面這個例子尚未加入進程通訊,只是先來看看隊列爲咱們提供的方法,以及這些方法的使用和現象。

from multiprocessing import Process, Queue
import time


def f(q):
    q.put([time.asctime(), 'xiaoA', 'xiaoB'])  # 將數據添加到隊列

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # 從隊列中取數據
子進程發送數據給父進程

經過前面的學習,得知進程間不能直接通訊的,可是利用隊列就能實現進程間通訊。

 

6.1.1 生產者消費者模型

在併發編程中使用生產者和消費者模式能解決絕大多數併發問題,該模式經過平衡生產線和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者消費者模型

  在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

  生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

from multiprocessing import Process, Queue
import time


def producer(q):
    for i in range(5):  # 生產者將 5 條數據放到隊列中
        q.put(time.asctime(), i)


def consumer(q):
    while True:
        print(q.get())  # 消費者不斷從隊列中取數據,當數據被取完後,get()將阻塞繼續等待隊列中的數據
        time.sleep(1)


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q,))
    p.start()

    s = Process(target=consumer, args=(q,))
    s.start()
基於隊列實現生產者消費者模型

 

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。

from multiprocessing import Process, JoinableQueue


def producer(q, name):
    for i in range(10):
        res = '%s-%s' % (name, i)
        q.put(res)
    q.join()


def consumer(q):
    while True:
        print(q.get())
        q.task_done()


if __name__ == '__main__':
    q = JoinableQueue()
    p_lst = []
    for i in ['蘋果', '香蕉', '菠蘿']:
        p = Process(target=producer, args=(q, i))
        p.start()
        p_lst.append(p)

    c = Process(target=consumer, args=(q,))
    c.daemon = True
    c.start()

    [p.join() for p in p_lst]
基於隊列實現消費者生產者模型

對上面這段代碼的解讀:

在生產者端:
    每次產生一個數據;
    且每次生產的數據都放在隊列中
    在隊列中刻上一個記號
    在生產者所有生產完畢以後
    join 信號:已經中止生產數據了
            且要等待以前被刻上記號的數據消費完畢
            當數據處理完成時,join阻塞結束
            
在消費者端:
    每次獲取一個數據
    處理一個數據
    發送一個記號:標記一個數據被處理成功
    
主進程:
    consumer 中把全部的任務消耗完
    producer 端的join感知到,中止阻塞
    全部的 producer 進程結束
    主進程代碼結束
    守護進程(consumer)結束

 

6.2 管道(瞭解)

介紹
建立管道,Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1, conn2)其中conn1,conn2 表示管道兩端的鏈接對象,強調一點:必須在生產Process對象以前產生管道。

參數介紹:

duplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。

 
主要方法:

conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
主要方法

 


應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。

 

6.3 進程之間數據共享

 

展望將來,基於消息傳遞的併發編程是大勢所趨
即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。
這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。
但進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。
之後咱們會嘗試使用數據庫來解決如今進程之間的數據共享問題。

Manager 模塊介紹
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此

manger 支持的數據類型:

list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array

 

   
基礎使用實例:

from multiprocessing import Process, Manager


def func(m):
    m['count'] += 1  # 在子進程中 + 1

if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'count': 100})
    p = Process(target=func, args=(dic,))
    p.start()
    p.join()  # 在使用 Manager模塊的時候必須等待子進程執行完畢,由於若是主進程直接結束,共享的資源也會消失,程序報錯
    print('---主進程---')
    print(dic)  # 主進程查看數據是否修改

當在多進程中使用數據共享操做,必須經過互斥鎖的方式來保證數據的可靠性:
def func(dic, lock):
    with lock:  # 使用 with lock: 自動獲取和釋放鎖
        dic['count'] += 1
        dic['count'] -= 1


if __name__ == '__main__':
    lock = Lock()
    m = Manager()
    dic = m.dict({'count': 100})
    p_lst = []
    for i in range(100):
        p = Process(target=func, args=(dic, lock))
        p.start()
        p_lst.append(p)

    for p in p_lst: p.join()
    print('---主---')
    print(dic)
基礎使用實例

 

7. 進程池

 

7.1 爲何要有進程池

  在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?

  在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。

 

7.2 Pool模塊

 

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

numprocess: 要建立的進程數,若是省略,將默認使用 cpu_count() 的值
initializer: 是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs: 是要傳給 initializer 的參數組

主要方法:

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將結果傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
  
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
主要方法

代碼實例:
進程池和多進程效率比較:

from multiprocessing import Pool, Process
import time


def func(n):
    for i in range(3):
        n + 1


if __name__ == '__main__':
    start = time.time()
    pool = Pool(5)
    pool.map(func, range(100))  # 進程池的方式運行
    t1 = time.time() - start
    start = time.time()
    p_lst = []
    for i in range(100):    # 多進程的方式運行
        p = Process(target=func, args=(i,))
        p.start()
        p_lst.append(p)
    for p in p_lst: p.join()

    t2 = time.time() - start

    print(t1, t2)
進程池和多進程效率比較

 
同步和異步

from multiprocessing import Pool
import time


def func(n):
    print(n * 2)
    time.sleep(1)


if __name__ == '__main__':
    pool = Pool(5)  # 進程池中從無到有建立5個進程,之後一直是這5個進程在執行任務
    for i in range(5):
        pool.apply(func, args=(i,))  # 同步調用
    print('---主進程---')  # 同步調用是須要等待子進程所有結束在執行主進程
進程池的同步調用
from multiprocessing import Pool
import os, random, time


def func(n):
    print('%s run.' % os.getpid())
    time.sleep(random.random())
    return n ** 2


if __name__ == '__main__':
    p = Pool(5)  # 進程池從無到有建立5個進程,之後一直是這5個進程執行任務
    res_lst = []
    for i in range(5):
        # 異步運行,根據進程池中有的進程數,每次最多5個子進程在異步執行, 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
                # 須要注意的是,進程池中的5個進程不會同時不會同時開啓或者同時結束,而是執行完一個釋放出來接收新的任務
        res = p.apply_async(func, args=(i,))
        res_lst.append(res)

    # 異步 apply_sync用法:若是使用異步提交的任務,主進程須要使用join,等待進程池內任務都處理完,而後能夠用get收集結果
    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    p.close()
    p.join()

    # 使用get來獲取apply_aync的結果, 若是是apply, 則沒有get方法, 由於apply是同步執行, 馬上獲取結果, 也根本無需get
    for res in res_lst: print(res.get())
進程池的異步調用

 
socket:進程池版本socket併發聊天

import socket
from multiprocessing import Pool


def chat(conn):
    conn.send(b'welcome!')
    while True:
        res = conn.recv(1024)
        conn.send(res.upper())


if __name__ == '__main__':
    sk_server = socket.socket()
    sk_server.bind(('localhost', 8080))
    sk_server.listen(5)
    p = Pool(5)
    while True:
        conn, addr = sk_server.accept()
        print('welcome ', addr)
        p.apply_async(chat, args=(conn))
server.py
import socket

sk_client = socket.socket()
sk_client.connect(('localhost', 8080))
res = sk_client.recv(1024).decode()
print(res)
while True:
    inp = input('>>>').strip()
    if not inp: continue
    sk_client.send(inp.encode())
    inp_res = sk_client.recv(1024).decode()
    print(inp_res)
client.py

 
執行上面的代碼,當客戶端啓動到第6個的時候,須要中止以前的其中一個,第6個客戶端纔會接進來。


這裏總結下,在進程池中,異步調用(apply_async)時,close 和 join的用法:
當子進程執行完畢後,仍然須要主進程執行後續的代碼,這時就須要用到close() 和 join()


回調函數
    回調函數必定是在主進程中執行的。使用場景:
進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
    咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
   

from multiprocessing import Pool
import requests, os


def get(url):
    print('<進程:%s> %s' % (os.getpid(), url))
    response = requests.get(url)
    if response.status_code == 200:
        return {'url': url, 'text': response.text}


def pasrse_page(res):
    print('<進程:%s> %s' % (os.getpid(), res['url']))
    parse_res = 'url: %s size: %s\n' % (res['url'], len(res['text']))
    with open('db.txt', 'w') as f:
        f.write(parse_res)


if __name__ == '__main__':
    p = Pool(3)
    urls = [
        'https://www.baidu.com/',
        'https://www.taobao.com/',
        'https://www.cnblogs.com/',
    ]
    res_lst = []
    for url in urls:
        res = p.apply_async(get, args=(url,), callback=pasrse_page)
        res_lst.append(res)

    p.close()
    p.join()

    for res in res_lst: print(res.get())
使用多進程請求多個url來減小網絡等待浪費時間
import re
from urllib.request import urlopen
from multiprocessing import Pool


def get_page(url, pattern):
    response = urlopen(url).read().decode('utf-8')
    return pattern, response


def parse_page(info):
    pattern, page_content = info
    res = re.findall(pattern, page_content)
    for item in res:
        dic = {
            'index': item[0].strip(),
            'title': item[1].strip(),
            'actor': item[2].strip(),
            'time': item[3].strip(),
        }
        print(dic)


if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1 = re.compile(regex, re.S)

    url_dic = {
        'http://maoyan.com/board/7': pattern1,
    }

    p = Pool()
    res_l = []
    for url, pattern in url_dic.items():
        res = p.apply_async(get_page, args=(url, pattern), callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()
爬蟲實例

 
若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

from multiprocessing import Pool
import time


def work(n):
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = Pool()
    res_lst = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))
        res_lst.append(res)

    p.close()
    p.join()
    for res in res_lst: print(res.get())
無需回調函數
相關文章
相關標籤/搜索