🍖Python併發編程之多進程

引入

在進入多進程的學習以前, 必定須要先了解一個應用程序是如何開啓一個進程的, 以及操做系統對進程是如何進行分配資源的, 進程、線程、進程池、進程三態、同步、異步、併發、並行、串行的概念也要很是的明確, 下面將介紹 Python 併發編程之多進程html

一.multiprocessing 模塊介紹

1.什麼是 multiprocessing 模塊

  • multiprocess 模塊是 Python 中的多進程管理模塊

2.multiprocessing 模塊簡介

  • python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程, Python提供了multiprocessing

3.multiprocessing 模塊的做用

  • multiprocessing 模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似
  • multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件

ps : 值得注意的是 : 與線程不一樣,進程沒有任何共享狀態,多個進程的內存空間相互物理隔離, 進程修改的數據,改動僅限於該進程內python

二.Process類介紹

multiprocessing 模塊提供了 Process 類,該類可用來在 Windows 平臺上建立新進程編程

使用 Process 類建立實例化對象,其本質是調用該類的構造方法建立新進程json

Process([group [, target [, name [, args [, kwargs]]]]])  # 其實是調用了下面的構造方法
def __init__(self,group=None,target=None,name=None,args=(),kwargs={})

值得注意的是 :數組

​ 參數的指定須要使用關鍵字的方式安全

args 指定的值是爲 target 指定的函數的位置參數, 而且是一個元組形式, 一個值必須帶逗號bash

  • 參數含義 :

參數名 說明
group 該參數未進行實現,不須要傳參
target 爲新建進程指定執行任務,也就是指定一個函數
name 爲新建進程設置名稱
args 爲 target 參數指定的參數傳遞非關鍵字參數
kwargs 爲 target 參數指定的參數傳遞關鍵字參數
  • 經常使用方法

方法 做用
run( ) 第 2 種建立進程的方式須要用到,繼承類中須要對方法進行重寫,該方法中包含的是新進程要執行的代碼
start( ) 和啓動子線程同樣,新建立的進程也須要手動啓動,該方法的功能就是啓動新建立的線程
join([timeout]) 主線程等待子進程終止(強調:是主線程處於等的狀態,而p是處於運行的狀態),timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
is_alive( ) 判斷當前進程是否還活着
terminate( ) 中斷該進程
  • 經常使用屬性

屬性 做用
name 能夠爲該進程重命名,也能夠得到該進程的名稱。
daemon 和守護線程相似,經過設置該屬性爲 True,可將新建進程設置爲「守護進程」
pid 返回進程的 ID 號。大多數操做系統都會爲每一個進程配備惟一的 ID 號
exitcode 進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
authkey 進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網

三.Process類建立子進程的兩種方式

0.Process 類使用的注意點

WindowsProcess( ) 必須放在 if __name__ == '__main__': 之下網絡

  • 這是 Windows上多進程的實現問題, 在 Windows 上,子進程會自動 import 啓動它的這個文件,而在 import 的時候是會執行這些語句的。若是你這麼寫的話就會無限遞歸建立子進程報錯, 因此必須把建立子進程的部分用那個 if 判斷保護起來,import 的時候若是不是當前執行文件就不會執行 Process, 也就不會無限遞歸了 (Linux上沒有這個問題)

ps : fork 是 OS提供的方法 os.fork(), 該方法能夠在當前程序中再建立出一個進程, 可是在 Windows 平臺上無效, 只在 Linux, UNIX, Mac OSX上有效數據結構

1.開啓子進程方式一

  • 直接建立 Process 類的實例對象,由此就能夠建立一個新的進程
from multiprocessing import Process
import time,os

def test(n):
    print(f"父進程{os.getppid()},紫禁城{os.getpid()}")
    time.sleep(n)
    print(f"父進程{os.getppid()},紫禁城{os.getpid()}")

if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()  # 作發起系統調用的活
    print(f"當前執行文件{os.getpid()}")

'''
當前執行文件16860
父進程16860,紫禁城6404
父進程16860,紫禁城6404
'''

2.開啓子進程方式二

  • 經過繼承 Process 類的子類,建立實例對象,也能夠建立新的進程
  • 繼承 Process 類的子類需重寫父類的 run( ) 方法
from multiprocessing import Process
import time,os

class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n

    def run(self) -> None:
        print(f"父進程{os.getppid()},紫禁城{self.pid}")
        time.sleep(self.n)
        print(f"父進程{os.getppid()},紫禁城{os.getpid()}")

if __name__ == '__main__':
    p = MyProcess(2)
    p.start()
    print(f"當前執行文件{os.getpid()}")
    
'''
當前執行文件8136
父進程8136,紫禁城1280
父進程8136,紫禁城1280
'''

四.驗證進程的內存空間是相互隔離的

from multiprocessing import Process
import time

x = 222

def test():
    global x
    x = 111

if __name__ == '__main__':
    p = Process(target=test)
    p.start()     # 發送系統調用
    time.sleep(1) # 等待子進程運行完
    print(x)      # 222 (仍是原來的)

子進程 test 函數中聲明全局變量 x, 並修改 x 的值, 等待子進程運行完畢, 最後打印 x , 發現 x 的值並無改變多線程

五.Process 對象的 join 方法

  • 讓父進程等待子進程的終止, 父進程在等, 子進程在運行
from multiprocessing import Process

x = 222

def test():
    global x
    x = 111

if __name__ == '__main__':
    p = Process(target=test)
    p.start()   # 發送系統調用
    p.join()    # 等待子進程運行完(以前咱們使用sleep並不能精確的知道子進程結束運行的時間)
    print(x)    # 222 (仍是原來的)
  • 參數 timeout 是可選的超時間, 等多久就不等了
from multiprocessing import Process

x = 222

def test():
    global x
    x = 111

if __name__ == '__main__':
    p = Process(target=test)
    p.start()        # 發送系統調用
    p.join(0.001)    # 等待 0.001 秒就不等了
  • 注意點 : start() 只是發起系統調用, 並非運行子進程, 當 start() 執行完後緊接着就執行後面的代碼
  • start() 發起調用以後, 是通知操做系統建立一個子進程, 操做系統須要申請一個內存空間, 將父進程的數據複製一份到子進程的內存空間中做爲初始化用 (Linux是將父進程的數據原本來本的複製一份, 而Windows 稍有些不一樣), 而後子進程才運行起來
import time,os

def test(n):
    time.sleep(n)
    print(f"父進程{os.getppid()} 子進程{os.getpid()}")

if __name__ == '__main__':
    p1 = Process(target=test,args=(3,))
    p2 = Process(target=test,args=(2,))
    p3 = Process(target=test,args=(1,))

    p1.start()  # 用時 3 秒
    p2.start()  # 用時 2 秒
    p3.start()  # 用時 1 秒

    start_time = time.time()
    p1.join()
    p2.join()
    p3.join()   # 三個進程都在併發的運行, 主進程一共運行3秒多
    stop_time = time.time()
    print(f'主進程{os.getpid()} 用時{stop_time-start_time}')
    
'''
父進程10888 子進程6792
父進程10888 子進程13368
父進程10888 子進程14800
主進程10888 用時3.131737470626831
'''

六. Process 對象其餘經常使用方法介紹

1.terminate( ) : 關閉進程

2.is_alive( ) : 查看進程是否存活

from multiprocessing import Process
import time

def test():
    time.sleep(1)

if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()
    p.terminate()  # 只是發起系統調用, 通知操做系統關閉子進程
    print(p.is_alive())  # True

由上面可知 terminate() 只是發起系統調用, 並非當即關閉子進程, 操做系統關閉子進程回收資源也要一小會, 咱們可使用sleep簡單延時

from multiprocessing import Process
import time

def test():
    time.sleep(1)

if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()
    p.terminate()   # 只是發起系統調用, 通知操做系統關閉子進程
    time.sleep(0.1) # 稍微延時一點
    print(p.is_alive())  # False

3.name : 爲新建進程設置名字

4.pid : 進程號

from multiprocessing import Process
import time,os

class MyProcess(Process):
    def __init__(self,n,name):
        super().__init__()
        self.n = n
        self.name = name

    def run(self) -> None:
        time.sleep(self.n)
        print(f"子進程pid:{self.pid}")    # 子進程pid:14156
        print(f"子進程模塊名:{__name__}")  # 子進程模塊名:__mp_main__
        print(f"子進程名:{self.name}")    # 子進程名:aaaa

if __name__ == '__main__':
    p = MyProcess(1,"aaaa")
    p.start()
    p.join()
    print(f"打印子進程pid:{p.pid}")       # 打印子進程pid:14156
    print(f"打印主進程pid:{os.getpid()}") # 打印主進程pid:16340
    print(f"子進程名:{p.name}")           # 子進程名:aaaa
    print(f"主進程模塊名:{__name__}")      # 主進程模塊名:__main__

__name__ : Python中每一個模塊都有本身的名字, __name__是一個系統變量, 是模塊的標識符, 值是模塊的名稱, 而且在自身模塊中:__name__的值等於__mian__

七.孤兒進程

1.什麼是孤兒進程

  • 當一個父進程建立了多個子進程, 子進程再建立子子進程等等
  • 父進程因正常運行完畢或其餘狀況被幹掉的時候, 它的子進程就變成了孤兒進程
  • 爲了不孤兒進程完成任務後沒有父親通知操做系統回收資源
  • 因而 PID 爲 "1"的頂級進程 systemd 就接手了這個孤兒進程
  • systemd 至關於一個孤兒院, 但凡是孤兒進程都會成爲它的子進程

2.孤兒進程演示

  • 先在一個虛擬終端裏開啓一個 Bash 進程,把他當作父進程
  • 緊接着開啓一個 "sleep 1000 &" 進程, 把它當作子進程
  • 而後在另外一個虛擬終端查看這兩個進程信息

孤兒進程bash和sleep演示

  • 再殺掉 sleep 的父進程 Bash 看看結果如何

殺掉父進程bash2

  • 圖示

八.殭屍進程

1.什麼是殭屍進程

  • 這是Linux出於好心的設計
  • 一個父進程開啓了一堆子進程, 當子進程比父進程先運行完(死掉)
  • 操做系統會釋放子進程佔用的重型資源(內存空間, CPU資源, 打開的文件)
  • 但會保留子進程的關鍵信息(PID, 退出狀態, 運行時間等)
  • 目的是爲了讓父進程能隨時查看本身的子進程信息(無論該子進程有沒有死掉)
  • 這種已經死掉的子進程都會進入殭屍狀態, ''殭屍進程''是Linux系統的一種數據結構

ps : 任何正常結束的子進程都會進入到殭屍狀態, 而被強制終止的進程的全部信息將會被清除

2.殭屍進程回收----概念

  • 操做系統保留子進程信息供父進程查看
  • 當父進程以爲再也不須要查看的時候, 會向操做系統發送一個 wait / waitpid 系統調用
  • 因而操做系統再次清理殭屍進程的殘餘信息

3.殭屍進程回收----實際

  • 優秀的開源軟件
這些軟件在開啓子進程時, 父進程內部會及時調用"wait" / "waitpid" 通知操做系統來回收殭屍進程
  • 水平良好的開發者
功底深厚,知道父進程要對子進程負責
會在父進程內部考慮到調用 "wait" / "waitpid" 通知操做系統回收殭屍進程
可是發起系統調用時間可能慢了一點
因而咱們就可使用 "ps aux | grep [z]+" 命令查看到殭屍進程
  • 水平很是低的開發者
技術半吊子,只知道開子進程,父進程也不結束,並在那一直開子進程,不知道什麼是殭屍進程
系統調用 "wait" / "waitpid" 也沒有據說過
因而計算機會堆積許多的殭屍進程,佔用着大量的"pid",(每啓動一個進程就會分配一個"pid號")
計算機進入一個奇怪的現象: 內存夠用,硬盤充足,CPU空閒,但新的程序沒法啓動
這就是由於"PID"不夠用了

4.如何清理殭屍進程

  • 針對良好的開發者
咱們能夠手動發信號給父進程: "# kill -CHLD [父進程的PID]"
通知父進程快點向操做系統發起系統調用 "wait" / "waitpid" 來清理變成殭屍的兒子們
  • 針對半吊子水平的開發者
這種狀況子下,咱們只能將父進程終結,由於你發給它的信號不會獲得迴應
父進程被殺死,"殭屍進程"將會變成"殭屍孤兒進程"
但凡是"孤兒進程"都會被Linux系統中"PID"爲"1"的頂級進程"systemd"回收
"systemd"會發起系統調用 "wait" / "waitpid" 來通知操做系統清理殭屍進程
# Centos7 的頂級進程爲 systemd
# Centos6 的頂級進程爲 init

5.使用Process類製造殭屍進程

本來 multiprocessing 模塊在你發起系統調用 start() 開啓子進程的時候會自動檢測當前狀態下是否存在殭屍進程, 並將其回收, join() 調用也是同樣, 咱們能夠查看這兩個調用的源碼進行查看 :

image-20210120172357714

image-20210120172459572

  • 咱們可讓父進程建立子進程後暫停在原地什麼事情都不作, 因而 multiprocessing 模塊的底層機制都沒有運行, 也就無法清除運行完畢並變成殭屍態的子進程, 下面再 Linux 上進行演示 :
# coding:utf-8
from multiprocessing import Process
import os,time

def task():
    print("子進程:%s"%os.getpid())
    time.sleep(4)  # 子進程 4 秒後結束變成殭屍進程

if __name__ == "__main__":
    for i in range(400):
        print("父進程:%s"%os.getpid())
        p = Process(target=task)
        p.start()
    time.sleep(100000)  # 讓父進程停在原地什麼也不作

使用 top 命令查看系統狀態信息, 能夠發現已經出現了 400 個殭屍進程

image-20210120183029242

咱們能夠經過 kill 剛運行的 py 文件將這些殭屍進程變成孤兒進程, 從而被 systemd 接管, systemd 再發起系統調用將其清除

九.守護進程

1.什麼是守護進程

由主進程建立, 並會隨着主進程的結束而結束

2.守護進程的生命週期

  • 進程之間是相互獨立的, 守護進程會在主進程代碼執行結束後就終止

  • 守護進程內沒法再次開啓子進程, 不然會拋出異常 : AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process
import os,time

class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n

    def run(self) -> None:
        print(f'子進程:{os.getpid()}開始')
        time.sleep(2)
        print(f"子進程:{os.getpid()}結束")

if __name__ == '__main__':
    p = MyProcess(2)
    p.daemon = True  # 須要在 strat() 以前設置
    p.start()
    print(f"主進程:{os.getpid()}結束")  
    # 在當前主進程的代碼已經運行完畢, 守護進程就會終止, 甚至守護進程還沒來的急啓動
'''輸出
主進程:16924結束
'''

咱們使用 sleep 讓主進程簡單延時一下好讓子進程啓動起來

from multiprocessing import Process
import os,time

class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n

    def run(self) -> None:
        print(f'子進程:{os.getpid()}開始')
        time.sleep(2)
        print(f"子進程:{os.getpid()}結束")

if __name__ == '__main__':
    p = MyProcess(2)
    p.daemon = True
    p.start()
    time.sleep(1)  # 延時一秒, 足夠操做系統將子進程開起來
    print(f"主進程:{os.getpid()}結束")

'''輸出
子進程:8620開始
主進程:10480結束
'''

再次強調, 守護進程是在主進程的代碼執行完畢終止

from multiprocessing import Process
import os,time

def Foo():
    print(f"Foo:{os.getpid()}-->111")
    time.sleep(1)
    print(f"Foo--->222")

def Bar():
    print(f"Bar:{os.getpid()}-->333")
    time.sleep(2)
    print(f"Bar--->444")

if __name__ == '__main__':
    p1 = Process(target=Foo)
    p2 = Process(target=Bar)

    p1.daemon = True  # 將 p1 設置守護進程
    p1.start()
    p2.start()
    print("------>end")
# 當運行到這一行的時候主進程代碼已經運行完了, 那麼守護進程也已經終止了, 與主進程在等着 p2 運行無關, 這時操做系統還沒來的急啓動 p1 這個子進程

'''輸出
------>end
Bar:18124-->333
Bar--->444
'''

十.進程同步鎖(互斥鎖/排它鎖)

上面咱們實現了進程的併發, 進程之間的數據是不共享的, 可是他們能夠共享同一個文件(硬盤空間), 或者是同一個打印空間, 然而在共享的同時也帶來了問題 : 進程的運行不是同時進行的, 它們沒有前後順序, 一旦開啓也不受咱們的限制, 當多個進程使用同一份數據資源時, 就會引起數據安全或者數據混亂問題

1.什麼是互斥鎖

咱們打個簡單的比方, 公司裏的一臺打印機, 每一個人均可以使用, 但同事只能有一我的在使用, 否則就會形成打印錯亂; 又好比合租房的衛生間, 合住的同伴均可以使用衛生間, 但每次只能一我的進去, 進去以後門就鎖上了(至關於加鎖 Lock( ).acquire( )), 出來以後開門, 其餘人又可使用衛生間了(至關於解鎖Lock( ).release( ))

  • 模擬多個用戶共同使用同一份文件(搶票)

🍓餘票文件 "aaa.json"
{"count": 1}  # 剩一張票

🍓模擬多我的搶票
# coding:utf-8
from multiprocessing import Process
import os,time,json

def check():           # 先查票
    time.sleep(1)      # 模擬網絡延遲
    with open("aaa.json")as f:
        dic = json.load(f)
        print(f"剩餘票數 : {dic['count']}")
def get():             # 查完以後開始搶
    time.sleep(1)      # 模擬網絡延遲
    with open("aaa.json")as f:
        dic = json.load(f)
    if dic["count"] >0:
        dic["count"] -= 1
        time.sleep(1)  # 模擬網絡延遲
        with open("aaa.json","w")as f2:  # 搶完以後修改數據並提交到服務端
            json.dump(dic,f2)
        print(f"用戶 : {os.getpid()} 搶票成功")
    else:
        print(f"用戶 : {os.getpid()} 搶票失敗")

def run():
    check()
    time.sleep(1)      # 模擬網絡延遲
    get()

if __name__ == "__main__":
    for i in range(4):
        p = Process(target=run)
        p.start()
        
'''輸出
剩餘票數 : 1
剩餘票數 : 1
剩餘票數 : 1
剩餘票數 : 1
用戶 : 13116 搶票成功
用戶 : 2364 搶票成功
用戶 : 1796 搶票成功
用戶 : 6228 搶票成功
'''

打印的結果發現只有一張票, 可是四我的都搶成功了, 這就很是不合理,形成了數據混亂

  • 加入鎖進行搶票

# coding:utf-8
from multiprocessing import Process,Lock
import os,time,json

def check():           # 先查票
    time.sleep(1)      # 模擬網絡延遲
    with open("aaa.json")as f:
        dic = json.load(f)
        print(f"剩餘票數 : {dic['count']}")
def get():             # 查完以後開始搶
    time.sleep(1)      # 模擬網絡延遲
    with open("aaa.json")as f:
        dic = json.load(f)
    if dic["count"] >0:
        dic["count"] -= 1
        time.sleep(1)  # 模擬網絡延遲
        with open("aaa.json","w")as f2:  # 搶完以後修改數據並提交到服務端
            json.dump(dic,f2)
        print(f"用戶 : {os.getpid()} 搶票成功")
    else:
        print(f"用戶 : {os.getpid()} 搶票失敗")

def run(lock):
    check()
    time.sleep(1)      # 模擬網絡延遲
    lock.acquire()     # 在搶票環節加鎖
    get()
    lock.release()     # 搶完後解鎖

if __name__ == "__main__":
    lock = Lock()
    for i in range(4):
        p = Process(target=run,args=(lock,))
        p.start()
        
'''輸出
剩餘票數 : 1
剩餘票數 : 1
剩餘票數 : 1
剩餘票數 : 1
用戶 : 432 搶票成功
用戶 : 2636 搶票失敗
用戶 : 7772 搶票失敗
用戶 : 1272 搶票失敗
'''

加鎖以後, 一張票只有一我的能搶成功, 其實就是讓搶票這個局部環節變成了串行, 誰搶到了就誰用, 犧牲了效率, 提高了數據安全性

2.總結

  • 以上加鎖的操做方法能夠保證多個進程修改同一份數據時保證數據的安全性, 即串行的修改, 可是也帶來了一些問題 :
一、共享的數據基於文件, 文件又屬於硬盤, 效率就比較低
二、須要本身加鎖和解鎖操做, 這是一件很是危險的操做, 若是忘記解鎖程序就停在原地
  • 所以咱們就須要一種兼顧效率以及能自動幫咱們處理鎖的問題的這種介質
🍑需求
一、多個進程共享同一塊內存數據, 實現高效率
二、找到一個能幫咱們處理好鎖的問題的機制 : multiprocessing模塊爲咱們提供了IPC通訊機制:管道和隊列

🍑介質
一、管道和隊列, 基於內存中的空間存放數據
二、隊列是基於管道和鎖實現的, 可讓咱們從複雜的鎖問題中解脫出來

ps : 咱們應該儘可能避免使用共享數據, (好比一個文件的傳遞應該將文件保存到硬盤, 在管道中放的應該是一個路徑, 而不該該是一個完整的文件), 儘量使用消息傳遞和隊列, 避免處理複雜 的同步和鎖問題, 並且在進程數目增多時, 每每能夠得到更好的可擴展性

十一.進程間通訊 (IPC)

進程間通訊機制簡稱 IPC (Inter Process Communication)

進程間彼此隔離, 要實現 IPC, multiprocessing 模塊爲咱們提供了隊列和管道這兩種形式

1.什麼是管道, 什麼是隊列

  • 管道(瞭解便可) : 一個進程將一個數據放入管道內(共享內存), 另外一個進程從管道內取出數據進行處理
  • 隊列 : 管道加鎖, 先進先出, 幫咱們實現了複雜的加鎖解鎖操做, 如下咱們主要介紹隊列的使用

2.隊列的經常使用方法

  • 建立一個隊列實例
🍑導入模塊
from multiprocessing import Queue
🍑建立一個隊列對象
q = Queue([maxsize])  # 多進程可使用Queue進行數據傳遞
🍑參數介紹
maxsize               # 是隊列中容許最大項, 省略則無大小限制
  • 經常使用方法介紹
方法 功能
q.put( ) 向隊列中傳入數據,可選參數 : blocked(鎖定狀態)和timeout(超時時間)。若是blocked爲True(默認值), 而且timeout爲正值, 該方法會阻塞timeout指定的時間, 直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常
q.get( ) 從隊列讀取走一個元素, 有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常
q.get_nowait( ) 同q.get(blocked=False)
q.put_nowait( ) 同q.put(blocked=False)
q.empty( ) 調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目
q.full( ) 調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走
q.qsize( ) 返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
  • 其餘方法介紹
方法 做用
q.cancel_join_thread( ) 不會在進程退出時自動鏈接後臺線程, 能夠防止join_thread()方法阻塞
q.close( ) 關閉隊列, 防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤
q.join_thread( ) 鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

3.隊列的使用

from multiprocessing import Queue

q = Queue(3)           # 建立一個隊列,設置最大項爲3

q.put({"name":"ii"})   # 放入一個字典
q.put([1,2,3,4,5])     # 放入一個列表
q.put("shawn")         # 放入一個字符串
try:
    # q.put(1777,block=True,timeout=3)
    q.put(1777,block=False)  # 放入一個整形,並設置隊列已滿立馬拋異常
except Exception:
    print("隊列已滿")

print(q.get())         # 取一個值
print(q.get())         # 2
print(q.get())         # 3
try:
    # print(q.get(block=True,timeout=3))
    print(q.get(block=False))  # 取一個值,隊列爲空立馬拋出異常
except Exception:
    print("隊列已空")
    
'''輸出
隊列已滿
{'namwe': 'ahsns'}
[1, 2, 3, 4, 5]
shawn
隊列已空
'''

十二.生產者消費者模型

1.什麼是生產者消費者模型

  • 生產者 : 程序中負責產生數據的一方
  • 消費者 : 程序中負責處理數據的一方

2.爲何引入生產者消費者模型

在併發編程中, 生產者消費者模式經過一個容器來解決生產者和消費者之間的強耦合性, 二者之間再也不是直接通訊, 而是經過堵塞隊列來進行通訊, 生產者(生產速度快)沒必要再等待消費者是否處理完數據, 消費者直接從隊列中取, 該隊列就至關於一個緩衝區, 平衡了生產者和消費者的工做能力, 從而提升了程序總體的數據處理速度

3.如何實現

經過隊列 : 生產者------>隊列------->消費者

4.生產者消費者示例

from multiprocessing import Process, Queue
import time, random


def producer(q, name, food):
    for i in range(3):
        res = f"{food}{i}"
        time.sleep(random.randint(1, 3))  # 模擬生產者數據產出時間
        q.put(res)             # 將產生的數據放入到隊列中
        print(f"\033[1;35m{name}:生產了:{res}\033[0m")


def consumer(q, name):
    while True:
        res = q.get()           # 取出數據
        if res == None: break   # 判斷是否None, None表明隊列取完了,結束
        time.sleep(random.randint(1, 3))   # 模擬消費者處理數據時間
        print(f"\033[1;36m{name}吃了{res}\033[0m")


if __name__ == "__main__":
    q = Queue()  # 建立隊列
    # 開啓三個生產者進程
    p1 = Process(target=producer, args=(q, "shawn", "香腸")) 
    p2 = Process(target=producer, args=(q, "派大星", "熱狗"))
    p3 = Process(target=producer, args=(q, "海綿寶寶", "雞"))
    # 開啓兩個消費者進程
    c1 = Process(target=consumer, args=(q, "章魚哥"))
    c2 = Process(target=consumer, args=(q, "蟹老闆"))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # 等待生產者所有生產完畢結束進程
    p1.join()
    p2.join()
    p3.join()
    # 主進程再想隊列裏面放入兩個None,當消費者拿到後表明取完了
    q.put(None)
    q.put(None)
    print("痞老闆:主")
    
'''輸出
shawn:生產了:香腸0
派大星:生產了:熱狗0
章魚哥吃了香腸0
蟹老闆吃了熱狗0
派大星:生產了:熱狗1
shawn:生產了:香腸1
海綿寶寶:生產了:雞0
章魚哥吃了熱狗1
海綿寶寶:生產了:雞1
派大星:生產了:熱狗2
章魚哥吃了雞0
蟹老闆吃了香腸1
shawn:生產了:香腸2
海綿寶寶:生產了:雞2
痞老闆:主
蟹老闆吃了熱狗2
章魚哥吃了雞1
蟹老闆吃了香腸2
章魚哥吃了雞2
Process finished with exit code 0
'''

5.第二種生產者消費者模型使用JoinableQueue類 (瞭解)

  • JoinableQueue類的實例

q = JoinableQueue([maxsize]) : 與 Queue 的對象同樣, 但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的

  • 方法
方法 做用
q.task_done( ) 使用者使用此方法發出信號,表示q.get( )的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
q.join( ) 生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
from multiprocessing import Process, JoinableQueue
import time, random


def producer(q, name, food):
    for i in range(3):
        res = f"{food}{i}"
        q.put(res)
        time.sleep(random.randint(1, 3))
        print(f"\033[1;35m{name}:生產了:{res}\033[0m")
    q.join()  # 等待每一個生產者本身放入的數據被消費者取完才結束該進程


def consumer(q, name):
    while True:
        res = q.get()
        if res == None: break
        time.sleep(random.randint(1, 3))
        print(f"\033[1;36m{name}吃了{res}\033[0m")
        q.task_done()  
        # 消費者每次取走一個數據都發送一個task_done信號,生產者那邊的計數相應減1


if __name__ == "__main__":
    q = JoinableQueue()  # 建立一個對象
    # 建立三個生產者
    p1 = Process(target=producer, args=(q, "shawn", "香腸"))
    p2 = Process(target=producer, args=(q, "派大星", "熱狗"))
    p3 = Process(target=producer, args=(q, "海綿寶寶", "雞"))
    # 建立兩個消費者
    c1 = Process(target=consumer, args=(q, "章魚哥"))
    c2 = Process(target=consumer, args=(q, "蟹老闆"))
    # 將兩個消費者設置成守護進程, 主進程代碼結束,這兩個消費者進程相應結束
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # 等待三個生產者進程結束
    p1.join()
    p2.join()
    p3.join()
🔰#原理分析 : 生產者生產數據, 假設一個生產者生產3個數據帶隊列,每一個相應的計數爲3
🔰#消費者從隊列中取走數據的時候發送task_done信號給生產者,生產者的計數3-1,剩下兩個
🔰#消費者繼續取數據併發送信號,當生產者的計數爲0,表明隊列已經取完了,這時q.join()就再也不進行堵塞,生產者進程結束
🔰#而此時的消費者也已經沒有做用了,將消費者進程設置成守護進程,主進程等待生產者進程結束就結束,消費者進程天然被帶走

'''輸出
shawn:生產了:香腸0
海綿寶寶:生產了:雞0
章魚哥吃了香腸0
派大星:生產了:熱狗0
蟹老闆吃了熱狗0
shawn:生產了:香腸1
海綿寶寶:生產了:雞1
章魚哥吃了雞0
蟹老闆吃了香腸1
shawn:生產了:香腸2
派大星:生產了:熱狗1
章魚哥吃了雞1
蟹老闆吃了熱狗1
海綿寶寶:生產了:雞2
派大星:生產了:熱狗2
章魚哥吃了香腸2
蟹老闆吃了雞2
章魚哥吃了熱狗2
Process finished with exit code 0
'''

十三.信號量 Semaphore (瞭解)

互斥鎖同時只容許一個線程修改數據, 而 Semaphore 容許同時有必定數量的進程更改數據, 就像理髮店, 好比只有3個託尼老師, 那最多隻容許3我的同時理髮, 後面的人只能等到有人理完了才能開始, 若是指定信號量爲3, 那麼來一我的得到一把鎖, 計數加1, 當計數等於3時, 後面的人均須要等待 , 一旦釋放, 就有人能夠得到一把鎖

from multiprocessing import Semaphore,Process
import time,random

def haircut(sem,name):
    start_time = time.time()
    sem.acquire()  # 加鎖
    print(f"{name}開始理髮")
    time.sleep(random.randint(2,3)) # 模擬理髮時間
    print(f"{name}理髮加等待用時%.2f"%(time.time()-start_time))
    sem.release()  # 解鎖

if __name__ == '__main__':
    sem = Semaphore(3)  # 最大進程數爲3
    user_list = []
    for i in range(8):
        p = Process(target=haircut,args=(sem,f"明星{i}"))
        p.start()
        user_list.append(p)

    for obj in user_list:
        obj.join()
    print("關門")
    
'''輸出
明星0開始理髮
明星1開始理髮
明星2開始理髮
明星0理髮加等待用時3.00
明星3開始理髮
明星1理髮加等待用時3.00
明星4開始理髮
明星2理髮加等待用時3.00
明星5開始理髮
明星3理髮加等待用時4.93
明星6開始理髮
明星4理髮加等待用時4.87
明星7開始理髮
明星5理髮加等待用時5.82
明星7理髮加等待用時6.69
明星6理髮加等待用時7.74
關門
Process finished with exit code 0
'''

十四.死鎖與遞歸鎖

進程死鎖、遞歸鎖與線程死鎖、遞歸鎖同樣, 將統一放在線程一塊兒講, 請參見多線程

十五.事件 (Event)

參見多線程

十五.進程池概念

1.什麼是進程池?

  • 👉進程池是資源進程, 管理進程組成的技術的應用.

2.爲何要有進程池?

😮忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。
😒那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?
😓首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。
😟第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。
😥所以咱們不能無限制的根據任務去開啓或者結束進程。那麼咱們要怎麼作呢?

3.進程池的概念

  • 😺定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務

  • 😸等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務

  • 😹若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。

  • 😻也就是說,進池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行

  • 😼這樣不會增長操做系統的調度難度,還節省了開關進程的時間,也必定程度上可以實現併發效果

4.資源進程

  • 👉預先建立好的空閒進程,管理進程(比如池子🏊)會把工做分發到空閒進程來處理。

5.管理進程🏊

  • 👉管理進程負責建立資源進程,把工做交給空閒資源進程處理,回收已經處理完工做的資源進程。

  • 資源進程與管理進程的交互

😱管理進程如何有效的管理資源進程,分配任務給資源進程?
👉經過IPC,信號,信號量,消息隊列,管道等進行交互。

十六.進程池的使用

咱們能夠經過維護一個進程池來控制進程的數目, 好比使用httpd的進程模式, 能夠規定最大進程數和最小進程數, multiprocessing模塊Pool類能夠提供指定數量的進程供用戶調用

1.建立一個進程池

  • 方法 : Pool([numprocess],[initializer],[initargs])
  • 參數
參數 做用
numprocess 要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer 每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs 傳給initializer的參數組

2.經常使用方法介紹

方法 做用
p.apply(func,args,kwargs) (同步調用)在進程池工做的進程中執行func函數,後面是參數,而後返回結果,若是想要傳入不一樣的參數併發的執行func, 就須要以不一樣的線程去調用p.apply()函數或者使用p.apply_async()
p.apply_async(func,args,kwargs) (異步調用)在進程池工做的進程中執行func函數,後面是參數,而後返回結果, 結果是AsyncResult類的實例, 可使用回調函數callback, 將前面funct返回的結果單作參數傳給回調函數
p.close( ) 關閉進程池,防止進一步操做, 若是全部操做持續掛起,它們將在工做進程終止前完成
P.jion() 等待全部工做進程退出。此方法只能在close()或teminate()以後調用,不然報錯

3.其餘方法

如下方法運用於 pply_async()map_async()的返回值, 返回值是**AsyncResul **實例的對象, 也就是該對象的方法

方法 做用
obj.get( ) 返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起
obj.ready( ) 若是調用完成,返回True
obj.successful( ) 若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]) 等待結果變爲可用, 參數是超時時間
obj.terminate( ) 當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是對象被垃圾回收, 將自動調用這個方法

4.同步調用示例 (apply)

from multiprocessing import Pool
import time,os,random
def test(n):
    print(f"子進程:{os.getpid()}")
    time.sleep(2)
    return n*random.randint(2,9)

if __name__ == '__main__':
    n = os.cpu_count()     # 本機CPU個數,個人是4,進程池容量個數自定義,默認CPU核數
    p = Pool(processes=n)  # 設置進程池進程個數,從無到有,而且之後一直只有這四個進程在執行任務
    li = []
    start_time = time.time()
    for i in range(10):
        res = p.apply(test,args=(2,))  # 建立十個個任務, 使用同步調用的方式
        li.append(res)
    p.close()  # 先關閉進程池, 不會再有新的進程加入到pool中, 防止進一步的操做(同步調用能夠不加此方法)
    p.join()   # 必須在close調用以後執行, 不然報錯, 執行後等待全部子進程結束(同步調用能夠不加此方法)
    print(li)  # 同步調用, 獲得的就是最終結果,(異步調用獲得的是對象, 須要使用get方法取值)
    print(f'使用時間:{time.time()-start_time}')
    
'''輸出
子進程:7768
子進程:16276
子進程:17544
子進程:15680
子進程:7768
子進程:16276
子進程:17544
子進程:15680
子進程:7768
子進程:16276
[4, 18, 14, 14, 12, 14, 16, 14, 6, 10]
使用時間:20.226498126983643
'''

從上面的輸出結果能夠看到,進程一直是那四個 : 776九、1627六、1754四、15680, 而且異步提交須要等待上一個任務結束拿到結果才能進行下一個任務, 因此用時 20 秒多一點

5.異步調用示例 (apply_async)

from multiprocessing import Pool
import time,os,random

def test(n):
    print(f"子進程:{os.getpid()}")
    time.sleep(2)
    return n*n*random.randint(2,9)

if __name__ == '__main__':
    n = os.cpu_count()     # 本機CPU個數,個人是4,進程池容量個數自定義,默認CPU核數
    p = Pool(processes=n)  # 設置進程池大小, 從無到有, 並以後只有這四個進程執行任務
    li = []
    start_time = time.time()
    for i in range(10):
        res = p.apply_async(test,args=(2,))  # 開啓十個任務, 使用異步調用的方式
        li.append(res)
    p.close()  # 關閉進程池, 不會再有新的進程加入到pool中, 防止進一步的操做
    p.join()   # join必須在close函數以後進行, 不然報錯, 執行後等待全部子進程結束
    print(li)  # 返回的是AsyncResul的對象[<multiprocessing.pool.ApplyResult object at 0x000002318511B408>,....]
    print([i.get() for i in li])  # 使用get方法來獲取異步調用的值(同步調用沒有該方法),並放入列表中打印
    print(f"使用時間:{time.time()-start_time}")

'''輸出
子進程:8636
子進程:10828
子進程:7432
子進程:13976
子進程:8636
子進程:10828
子進程:7432
子進程:13976
子進程:8636
子進程:10828
[<multiprocessing.pool.ApplyResult object at 0x000001623059B308>,...省略]
[16, 24, 24, 24, 16, 28, 36, 28, 8, 32]
使用時間:6.301024436950684
'''

從上面結果也能看出自始至終都只有四個進程在工做 : 863六、1082八、743二、13976,異步調用方式若是任務進行時遇到阻塞操做將立馬接收其它異步操做中的結果, 若是進程池滿了, 則只能等待任務進行完畢拿到結果, 拿到的結果是 AsyncResul 的對象, 須要使用 get 方法取值, 用時 6 秒多一點

6.服務端使用進程池來控制接入客戶端進程的個數示例

  • 服務端
from socket import *
from multiprocessing import Pool
import os

s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)  # 重用IP和端口
s.bind(("127.0.0.1",8055))
s.listen(5)

def connection(conn):
    print(f"當前進程:{os.getpid()}")
    while 1:
        try:
            date = conn.recv(1024)
            if len(date) == 0:break
            conn.send("阿巴阿巴".encode("utf-8"))
        except Exception:
            break

if __name__ == '__main__':
    p = Pool(2)  # 不指定,默認本機CPU核數
    print("connection....")
    while 1:
        conn,addr = s.accept()
        print(f"已連上{addr}")
        p.apply_async(connection,args=(conn,))
  • 客戶端(咱們開啓四個客戶端來作實驗)
from socket import *

c = socket(AF_INET,SOCK_STREAM)
c.connect(("127.0.0.1",8055))

while 1:
    msg = input("內容>>").strip()
    if len(msg) == 0:continue
    c.send(msg.encode("utf-8"))
    date = c.recv(1024)
    print(f"服務端的回覆:{date.decode('utf-8')}")
  • 測試效果

四個客戶端一個服務端 :

image-20210123173810653

啓動五臺機器, 讓四臺客戶端發送信息

前兩臺能發收消息, 後兩臺阻塞原地

image-20210123174103928

image-20210123174114486

image-20210123174127446

image-20210123174135593

服務端顯示兩個進程啓動成功 : 892八、17584, 剩餘兩個阻塞

image-20210123174240246

咱們將前面兩個客戶端進程關閉, 看看進程號是否變化

關閉前兩個客戶端進程以後, 後兩個客戶端進程立馬啓動起來了, 而且發現PID仍是原來的兩個

image-20210123174611903

十七.回調函數 (callback)

1.什麼是回調函數

將第一個函數的指針(也就是內存地址,Python中淡化了指針的概念)做爲參數傳給另外一個函數處理, 這第一個函數就稱爲回調函數

2.簡單示例

def foo(n):
    print(f"foo輸出{n}")

def Bar(i,func):
    func(i)

for i in range(3):
    Bar(i,foo)
    
'''輸出
foo輸出0
foo輸出1
foo輸出2
'''

3.回調函數應用場景

當進程池中一個任務處理完以後, 它去通知主進程本身結束了, 讓主進程處理本身的結果, 因而主進程去調用另外一個函數去處理該結果, 咱們能夠將耗時間或者阻塞的任務放入進程池, 在主進程中指定回調函數, 並由主進程負責執行, 這樣主進程在執行回調函數的時候就省去了I/O的過程, 直接拿到的就是任務的結果

  • 舉一個簡單易於理解的示例
from multiprocessing import Pool
import os

def get(n):
    print(f"get--->{os.getpid()}")
    return n   # 返回任務執行的結果

def set(num):  # 拿到回調函數的處理結果--->num
    print(f"set--->{os.getpid()} : {num**2}")

if __name__ == '__main__':
    p = Pool(3)
    nums = [2,3,4,1]
    li = []
    for i in nums:
        # 異步調用,並使用callback指定回調函數
        res = p.apply_async(get,args=(i,),callback=set)  
        li.append(res)

    p.close()  # 關閉進程池
    p.join()   # 等待子進程結束
    print([ii.get() for ii in li])  # 使用get方法拿到結果
    
'''輸出
get--->8388
get--->8388
set--->8768 : 4
get--->8388
set--->8768 : 9
get--->8388
set--->8768 : 16
set--->8768 : 1
[2, 3, 4, 1]
'''
  • 獲取網頁源碼大小示例
from multiprocessing import Pool
import requests,os

def get_htm(url):
    print(f"進程:{os.getpid()}開始獲取:{url}網頁")
    response = requests.get(url)
    if response.status_code == 200:   # 若是是200,則獲取成功
        return {'url':url,'text':response.text}
    else:
        return {'url':url,'text':''}  # 有些網頁獲取不到,設置空

def parse_htm(htm_dic):
    print(f'進程:{os.getpid()}正在處理:{htm_dic["url"]}的text')
    parse_data = f"url:{htm_dic['url']} size:{len(htm_dic['text'])}"
    with open("./db.txt","a")as f:    # 將URL和對應網頁源碼大小保存到文件
        f.write(f"{parse_data}\n")

if __name__ == '__main__':
    urls=[
        'https://zhuanlan.zhihu.com',
        'https://www.cnblogs.com',
        'https://www.python.org',
        'https://blog.csdn.net',
        'http://www.china.com.cn',
    ]
    p = Pool(3)  # 設置進程池最大進程數爲3
    li = []
    for url in urls:
        # 異步調用並指定回調函數
        res = p.apply_async(get_htm,args=(url,),callback=parse_htm)  
        li.append(res)

    p.close()  # 關閉進程池
    p.join()   # 等待子進程結束
    print([i.get() for i in li])  # 使用get方法獲取結果
    
'''輸出
進程:11484開始獲取:https://zhuanlan.zhihu.com網頁
進程:17344開始獲取:https://www.cnblogs.com網頁
進程:2688開始獲取:https://www.python.org網頁
進程:11484開始獲取:https://blog.csdn.net網頁
進程:3928正在處理:https://zhuanlan.zhihu.com的text
進程:17344開始獲取:http://www.china.com.cn網頁
進程:3928正在處理:https://www.cnblogs.com的text
進程:3928正在處理:https://blog.csdn.net的text
進程:3928正在處理:http://www.china.com.cn的text
進程:3928正在處理:https://www.python.org的text
[{'url': 'https://zhuanlan.zhihu.com', 'text': ''},...一堆網頁源碼的bytes(省略)]
'''
  • 查看一下保存的 "db.txt" 文件

image-20210123220247697

4.爬取福布斯全球排行榜

image-20210123233054258

from multiprocessing import Pool
import re
import requests

def get_htm(url,format1):
    response = requests.get(url)
    if response.status_code == 200:
        return (response.text,format1)
    else:
        return ('',format1)

def parse_htm(res):
    text,format1 = res
    data_list = re.findall(format1,text)
    for data in data_list:
        with open("福布斯排行.txt","a",encoding="utf-8")as f:
            f.write(f"排名:{data[0]},名字:{data[1]},身價:{data[2]},公司:{data[3]},國家:{data[4]}\n")

if __name__ == '__main__':
    url1 = "https://www.phb123.com/renwu/fuhao/shishi.html"
    # 使用正則匹配關鍵字
    format1 = re.compile(r'<td.*?"xh".*?>(\d+)<.*?title="(.*?)".*?alt.*?<td>(.*?)</td>.*?<td>(.*?)<.*?title="(.*?)"', re.S)
    url_list = [url1]
    for i in range(2, 16):  # 總共15頁排行,將連接都加進列表裏
        url_list.append(f"https://www.phb123.com/renwu/fuhao/shishi_{i}.html")
    p = Pool()
    li = []
    for url in url_list:
        res = p.apply_async(get_htm,args=(url,format1),callback=parse_htm)
        li.append(res)

    p.close()
    p.join()
    print("保存完成")
  • 查看一下文件, 看看本身有沒有上榜

image-20210123232324652

image-20210123232352767

ps : 若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

相關文章
相關標籤/搜索