進程 multiprocessing Process join Lock Queue

多道技術

1.空間上的複用linux

多個程序公用一套計算機硬件算法

2.時間上的複用 cpu 切換程序+保存程序狀態數據庫

1.當一個程序遇到IO操做,操做系統會剝奪該程序的cpu執行權限(提升了cpu的利用率,而且不影響程序的執行效率編程

2.當一個程序長時間佔用cpu 操做系統也會剝奪該程序的cpu執行權限(下降了程序的執行效率)json

進程

進程是正在運行的程序windows

進程是操做系統中最基本、重要的概念。是多道程序系統出現後,爲了刻畫系統內部出現的動態狀況,描述系統內部各道程序的活動規律引進的一個概念,全部多道程序設計操做系統都創建在進程的基礎上。
ps: 同一個程序屢次執行,就會在操做系統中出現兩個進程,因此咱們能夠同時運行一個軟件,分別作不一樣的事情也不會混亂

進程調度

時間片轉輪法安全

基本思路是讓每一個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,須要將CPU的處理時間分紅固定大小的時間片,例如,幾十毫秒至幾百毫秒。若是一個進程在被調度選中以後用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放本身所佔有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。

並行與併發

並行: 多個程序同時運行,而且每一個程序有單獨的cpu運算(只能在多核計算機上實現)服務器

併發:多個程序看起來是同時運行,其實是由一個cpu來回切換執行的session

**並行**是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
**併發**是從宏觀上,在一個時間段上能夠看出是同時執行的,好比一個服務器同時處理多個session。

同步異步 / 阻塞非阻塞

在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞多線程

同步/異步

表示的是任務的提交方式

同步:任務提交後 原地等待任務的執行結果並拿到返回結果後才走,期間不會作任何事(程序層面表現的是卡住了)

異步:任務提交後 再也不原地等待,而是繼續執行下一行代碼(提交的任務結果仍是要的,可是是用騏達方式獲取)

import time
​
def func(name):
    print('%s runing'%name)
    time.sleep(3)
​
func('waller')  # 同步 任務提交後原地等待函數運行結束後再走下面的代碼
print('procedure over')

 

 

阻塞/非阻塞

表示的是進程的運行狀態

阻塞: 指進程的阻塞態

非阻塞: 指進程的就緒態 運行態

multiprocessing模塊

multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數)
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。

Process 建立進程的類

語法

from multiprocessing import Process
​
代碼塊
​
if __name__ == __main___:  # 
    p = Process(target=調用對象, args=調用對象的位置參數元組)  # 建立一個進程對象
    代碼塊
    p.start()  # 告訴操做系統幫你建立一個進程
    
​
注意: windows建立進程必定要在 if __name__ == __main___: 代碼塊內建立,不然會報錯

 



windows建立進程會將代碼已模塊的方式,從上往下執行一遍,linux會直接將代碼完完整整的拷貝一份
建立進程就是在內存中從新開闢了一塊內存空間,將運行產生的代碼丟進去,一個進程對應砸死內存就是一塊獨立的內存空間
進程進程之間的數據時隔離的,沒法直接交互,可是能夠經過某些技術實現間接交互

參數介紹

Process源碼:
    
class Process(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        ...
# group參數未使用,值始終爲None
# target表示調用對象,即子進程要執行的任務
# args表示調用對象的位置參數元組,如:args=(1,2,'egon',)
# kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
# name爲子進程的名稱

 

方法介紹

p.start():建立啓動進程,並調用該子進程中的p.run() 
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖   
p.is_alive():若是p仍然運行,返回True    
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程      

 

建立進程的兩種方式

from multiprocessing import Process
import time
​
def func(name):
    print('%s running'%name)
    time.sleep(3)
    print('over')
print('w')
​
if __name__ == '__main__':
    # 建立進程對象 
    p = Process(target=func, args=('waller',)) # target=func 是在p進程內存空間中調用執行了 func 函數
    # 告訴操做系統建立一個進程 開闢p進程的內存空間
    p.start()  # 啓動p進程
    print('主進程')  # 異步
from multiprocessing import Process
import time
​
class MyProcess(Process):
    def __init__(self, name):
        super(MyProcess, self).__init__()
        self.name = name
    def run(self):
        print('%s running'%self.name)
        time.sleep(3)
        print('over')
​
if __name__ == '__main__':
    p = MyProcess('waller')
    p.start()  # 自動運行 run 方法
    print('主進程')
​

 

join

主進程等待子進程運行結束後再運行

from multiprocessing import Process
import time
​
def func(name, i):
    print('%s 進程 running'%name)
    time.sleep(i)
    print('%s 進程 over'%name)
​
if __name__ == '__main__':
    p1 = Process(target=func, args=('A',1))
    p2 = Process(target=func, args=('B',2))
    p3 = Process(target=func, args=('C',3))
    start_time = time.time()
    p1.start()
    p2.start()
    p3.start()
    p2.join()
    p1.join()
    p3.join()
    print('主進程')
    print(time.time() - start_time)
    '''
    B 進程 running
    A 進程 running
    C 進程 running
    A 進程 over
    B 進程 over
    C 進程 over
    主進程
    3.1728098392486572
​
    '''

 

進程之間數據時隔離的

from multiprocessing import Process
​
x = 100
def func():
    global x
    x = 200
    print(x)
​
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    print(x)
    >>> 
    100
    200

 

進程對象及其餘方法

os.getpid() 當前進程的pid
os.getppid() 當前進程的父進程pid
p.terminate() 殺死p進程,告訴操做系統幫你殺死一個進程
p.is_alive() 判斷p進程是否存活,返回bool值
def func(name):
    print('%s 進程 running'%name, '%s 進程 %s'%(name, os.getpid()), '父進程%s'%os.getppid())
    time.sleep(3)
    print('%s 進程 over'%name)
​
if __name__ == '__main__':
    p = Process(target=func, args=('w',))
    p.start()
    print('父進程%s'%os.getpid(), '父父進程%s'%os.getppid())
    >>>
    父進程5224 父父進程10332
    w 進程 running w 進程 1912 父進程5224
    w 進程 over
from multiprocessing import Process
import os
import time
​
def func(name):
    print('%s 進程 running'%name, '%s 進程 %s'%(name, os.getpid()), '父進程%s'%os.getppid())
    time.sleep(3)
    print('%s 進程 over'%name)
​
if __name__ == '__main__':
    p = Process(target=func, args=('w',))
    p.start()
    print(p.is_alive())  # True
    p.terminate()  # 告訴操做系統殺死p進程
    time.sleep(1)  # 操做系統的速度沒有代碼運行的速度快,因此須要睡眠
    print(p.is_alive())  # False
    print('父進程%s'%os.getpid(), '父父進程%s'%os.getppid())

 

殭屍進程與孤兒進程

父進程回收子進程資源的兩種方式

1.join方法

2.父進程進程死亡

全部程序都將步入殭屍進程

孤兒進程: 子進程沒死,父進程意外死亡

守護進程

會隨着主進程的結束而結束

p.daemon=True
必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行

 

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time
​
def func(name):
    print('%s running'%name)
    time.sleep(3)
    print('over')
​
​
if __name__ == '__main__':
    p = Process(target=func, args=('waller',))
    p.daemon = True
    p.start()
    time.sleep(1)  # 因爲是守護進程,主進程運行結束,子進程就被清空,因此要等待操做系統建立子進程並運行
    print('主進程')

 

 

互斥鎖

互斥鎖
當多個進程操做同一份數據的時候 會形成數據的錯亂
這個時候必須加鎖處理
將併發變成串行
雖然下降了效率可是提升了數據的安全
注意:
1.鎖不要輕易使用 容易形成死鎖現象
2.只在處理數據的部分加鎖 不要在全局加鎖

鎖必須在主進程中產生 交給子進程去使用  

 

from multiprocessing import Process, Lock
import json
import time
​
# 查票
def search(name):
    with open('data', 'r', encoding='utf-8') as f:
        data = f.read()  # 讀出的是json模式的數據
        t = json.loads(data).get('ticket')
        print('用戶%s查詢餘票爲:%s'%(name, t))
# search('waller')
# 買票
def buy(name):
    with open('data', 'r', encoding='utf-8') as f:
        data_json = f.read()  # 讀出的是json模式的數據
        data = json.loads(data_json)
        t = data.get('ticket')
        # 模擬搶票時間
        time.sleep(2)
        if not t > 0 :
            print('已無票')
            return
        t -= 1
        data['ticket'] = t
        # 跟新數據
        with open('data', 'w', encoding='utf-8') as f:
            json.dump(data, f)  # 將更新後的數據序列化到數據庫中
        print('用戶%s,購票成功'%name)
​
# 在進程中調用 search 與 buy 函數
def run(name, mutex):
    search(name)
    mutex.acquire()  # 搶鎖
    buy(name)  # 被鎖的函數
    mutex.release()  # 釋放鎖
​
​
if __name__ == '__main__':
    # 生成一把鎖
    mutex = Lock()
    # 建立5個進程
    for i in range(5):
        p = Process(target=run, args=('waller',mutex))
        p.start()

 

 

進程間的通訊

Queue 模塊

# 建立隊列對象
q = Queue(5)  # 括號內能夠傳參數,表示是這個隊列的最大存儲,不傳參數默認最大存儲
q.put()  # 往隊列中添加數據  當隊列滿了以後,再放數據,不會報錯,會原地等待,知道隊列中有一個空位置出現
q.full()  # 判斷隊列是否滿了,返回bool值
q.get()  # 從隊列中取值  當隊列中的值取完後再取,不會報錯,程序會阻塞,直到隊列中再放入一個值
q.empty()  # 判斷隊列的值是否取完
q.get_nowait()  # 取值  當隊列中沒有值可取時,不等待直接報錯

 

from multiprocessing import Queue
​
​
# 建立隊列對象
q = Queue(5)  # 括號內能夠傳參數,表示是這個隊列的最大存儲,不傳參數默認最大存儲
# 往隊列中添加數據
q.put(1)
q.put(2)
# 判斷隊列是否滿了,返回bool值
print(q.full())  # >>> False
q.put(3)
q.put(4)
q.put(5)
print(q.full())  # >>> True
q.put(6)  # 當隊列滿了以後,再放數據,不會報錯,會原地等待,知道隊列中有一個空位置出現
# 從隊列中取值
print(q.get())
print(q.get())
# 判斷隊列的值是否取完
print(q.empty())  # >>> False
print(q.get())
print(q.get())
print(q.get())
print(q.get())  # 當隊列中的值取完後再取,不會報錯,程序會阻塞,直到隊列中再放入一個值
print(q.get_nowait())  # 取值 當隊列中沒有值可取時,不等待直接報錯

 

full()
get_nowait()
empty()
都不適合用於多進程的狀況判斷,
能夠把多進程之間的運行看作是異步,當在判斷的那一刻,拿到判斷結果的一瞬間,
隊列可能會又存入或取出一個值,不許確.

 

 

進程間通訊IPC機制

from multiprocessing import Process, Queue
​
# 狀況一: 子進程放數據,父進程取數據
def sub(q):
    q.put('hello')
​
if __name__ == '__main__':
    # 建立隊列對象
    q = Queue()
    p = Process(target=sub, args=(q,))
    p.start()
    print(q.get())  # >>> hello
# def sub(q):
#     print(q.get())  # >>> hello
#
# if __name__ == '__main__':
#     # 建立隊列對象
#     q = Queue()
#     p = Process(target=sub, args=(q,))
#     p.start()
#     q.put('hello')
​
​
# 狀況二: 兩個子進程間存取
def sub1(q):
    q.put('hello')
​
def sub2(q):
    print(q.get())  # >>> hello
if __name__ == '__main__':
    q = Queue()
    s1 = Process(target=sub1, args=(q,))
    s2 = Process(target=sub2, args=(q,))
    s1.start()
    s2.start()

 

生產者消費者模型

 1.生產者消費者模型
​
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
​
 2.爲何要使用生產者和消費者模式
​
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,
那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。 ​ 3.什麼是生產者消費者模式 ​ 生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,
直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。 ​ 基於隊列實現生產者消費者模型

 



生產者消費者模型總結#程序中有兩類角色
        一類負責生產數據(生產者)
        一類負責處理數據(消費者)
        
    #引入生產者消費者模型爲了解決的問題是:
        平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度
        
    #如何實現:
        生產者<-->隊列<——>消費者
    #生產者消費者模型實現類程序的解耦和

 

產生緣由及解決辦法 緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。 解決方式:
  普通方法:無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環
  
JoinableQueue 方法: 生產者生產的每一個數據上都作一個標記,消費者每 q.get() 取一個值,都用 q.task_done() 標記一次,q.join()感知隊列中的數據所有處理完畢,再最終結束
 

 

 
from multiprocessing import Process, Queue
import time
import random
​
# 生產者
def producer(name, food, q):
    for i in range(5):
        data = '%s生產%s%s'%(name, food, i+1)
        time.sleep(random.random())
        q.put(data)
        print(data)
​
# 消費者
def consumer(name, q):
    while True:
        data = q.get()  # 程序停在此處
        if data == None:break
        print('%s吃了%s'%(name, data))
        time.sleep(random.random())
​
​
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=('小明', '包子', q))
    p2 = Process(target=producer, args=('小劉', '饅頭', q))
    c1 = Process(target=consumer, args=('哈哈',q))
    c2 = Process(target=consumer, args=('嘻嘻',q))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    # 此時 生產者已經再也不生產數據,但消費者還在取數據,卡在data = q.get()
    # 解決辦法 在生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環
    p1.join()
    p2.join()  
    q.put(None)
    q.put(None)  # 有兩個消費者
代碼

 

 

調用 JoinableQueue 模塊

#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
#方法介紹:
    JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

 

 
from multiprocessing import Process, JoinableQueue
import time
import random
​
​
# 生產者
def producer(name, food, q):
    for i in range(5):
        data = '%s生產%s%s' % (name, food, i + 1)
        time.sleep(random.random())
        q.put(data)
        print(data)
​
​
# 消費者
def consumer(name, q):
    while True:
        data = q.get()  # 程序停在此處
        if data == None: break
        print('%s吃了%s' % (name, data))
        time.sleep(random.random())
        q.task_done()
​
​
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('小明', '包子', q))
    p2 = Process(target=producer, args=('小劉', '饅頭', q))
    c1 = Process(target=consumer, args=('哈哈', q))
    c2 = Process(target=consumer, args=('嘻嘻', q))
    p1.start()
    p2.start()
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    # 此時 生產者已經再也不生產數據,但消費者還在取數據,卡在data = q.get()
    # 解決辦法 在生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環
    p1.join()
    p2.join()
    q.join()  # 等到隊列中的數據全取出
代碼
相關文章
相關標籤/搜索