併發編程——多進程

一、什麼是進程?
      程序:堆代碼
      進程:正在運行的程序
      進程是一個實體,每一個進程都本身的獨立內存空間python

進程的三個狀態:json

3677678505

multiprocessing模塊介紹

 

二、同步和異步:針對的程序運行的狀態
      同步:提交任務後原地等待任務返回結果,期間不作任何事情
      異步:提交任務後,不等待任務返回結果,直接運行下一行代碼
三、阻塞和非阻塞:針對程序運行的狀態
     阻塞:遇到 IO 操做》》》阻塞狀態
     非阻塞:就緒或者運行狀態 >>>>就緒狀態安全

multiprocessing模塊介紹
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
1 group參數未使用,值始終爲None
2
3 target表示調用對象,即子進程要執行的任務
4
5 args表示調用對象的位置參數元組,args=(1,2,'egon',)
6
7 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
8
9 name爲子進程的名稱
方法介紹:
1 p.start():啓動進程,並調用該子進程中的p.run()
 2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
 3
 4 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
 5 p.is_alive():若是p仍然運行,返回True
 6
 7 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

四、開啓進程的兩種方式:
form multiprocessing import Pocess
import time

def task(name)
	print('%s is running '%name)
	time.sleep(3)
	print('%s is over' %name)

if __name__ == '__main__':
	p1=Process(target=task,args=())
	p1.strat()
	print('主程序')
方式1
from multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    #必須寫run方法
    def run(self):
        print('%s is running'%self.name)
        time.sleep(3)
        print('%s is over'%self.name)

if __name__ == '__main__':
    obj = MyProcess('egon')
    obj.start()
    print('主')
方式2

五、進程對象的join方法:併發

# join的做用僅僅只是讓主進程等待子進程的結束,不會影響子進程的運行
from multiprocessing import Process
import time


def task(name, n):
    print('%s is running'%name)
    time.sleep(n)
    print('%s is over'%name)


if __name__ == '__main__':
    start_time = time.time()
    p_list = []
    for i in range(3):
        p = Process(target=task,args=('子進程%s'%i,i))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()

    print('主',time.time()-start_time)
join

六、進程之間空間獨立app

from multiprocessing import Process

x = 100


def task():
    global x
    x = 1
    print(x)


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print('主', x)

'''
F:\python36\python3.exe F:/python_s7/week08/day02/進程間數據是隔離的.py
1
主 100
'''
空間獨立

七、進程對象其餘相關方法dom

from multiprocessing import Process,current_process
import time
import os

def task():
    print('%s is running'%os.getpid())
    time.sleep(3)
    print('%s is over'%os.getppid())

if __name__ == '__main__':
    p1 = Process(target=task)
    p1.start()
    # p1.terminate()# 殺死子程序
    time.sleep(3)
    print(p1.is_alive())#判斷子程序
    print('主')
其餘方法

八、守護進程異步

主進程建立守護進程ide

  其一:守護進程會在主進程代碼執行結束後就終止函數

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have childrenui

from multiprocessing import Process
import time

def task(name):
    print('%s 正活着'%name)
    time.sleep(3)
    print('%s 正常死亡'%name)


if __name__ == '__main__':
    p = Process(target=task,args=('egon總管',))
    p.daemon = True  # 必須在p.start開啓進程命令以前聲明
    p.start()
    print('皇帝jason正在死亡')

join

九、互斥鎖:

犧牲了效率可是保證了數據的安全


鎖必定要在主進程中建立,給子進程去用
解決多個進程操做同一份數據,形成數據不安全的狀況
加鎖會將併發變成串行
鎖一般用在對數據操做的部分,並非對進程全程加鎖

mutex.acquire()  # 搶鎖   一把鎖不能同時被多我的使用,沒有搶到的人,就一直等待鎖釋放
buy(i)
mutex.release()  # 釋放鎖

import json
import time
import random
from multiprocessing import Process, Lock


# 查票
def search(i):
    with open('info', 'r', encoding='utf-8')as f:
        data = json.load(f)
    print('用戶查詢餘票%s' % data.get('ticket'))


def buy(i):
    # 購買前想查詢餘票
    with open('info', 'r', encoding='utf-8') as f:
        data = json.load(f)
    # 模擬延遲
    time.sleep(random.randint(1, 3))

    if data.get('ticket') > 0:
        data['ticket'] -= 1
        with open('info', 'w', encoding='utf-8')as f:
            json.dump(data, f)
        print('用戶搶票成功%s' % i)
    else:
        print('用戶%s搶票失敗' % i)


def run(i, mutex):
    search(i)
    mutex.acquire()
    buy(i)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=run, args=(i, mutex))
        p.start()
互斥鎖


10 、進程之間的通信(隊列)
隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
2 q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
3
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6
7 q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
8 q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

例子:

from multiprocessing import Queue

# 產生一個隊列能放5個值
q = Queue(5)

q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
print(q.full())

# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.empty())# 判斷隊列是否爲空,須要注意的是,在併發的狀況下,這個方法判斷不許確!

print(q.get_nowait())# 和get同樣取值,可是當隊列沒有值能夠取的時候回報錯
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
進程之間通信

十一、生產者與消費者模

"""
生產者消費者模型
    生產者:作包子的  生產數據的
    消費者:買包子的  處理數據的

    解決供需不平衡的問題
        定義一個隊列,用來存放固定數量的數據
        解決一個生產者和消費者不需直接打交道,二者都經過隊列實現數據傳輸

    Queue:管道+鎖
"""
from multiprocessing import Queue,Process,JoinableQueue
import time
import random


def producer(name,food,q):
    for i in range(5):
        data = '%s生產了%s%s'%(name,food,i)
        time.sleep(random.randint(1,3))
        print(data)
        q.put(data)  # 將生產的數據放入隊列中


def consumer(name,q):
    while True:
        data = q.get()
        if data is None:break
        time.sleep(random.randint(1, 3))
        print('%s吃了%s'%(name,data))
        q.task_done()  # 告訴你的隊列,你已經將數據取出而且處理完畢


if __name__ == '__main__':
    q = JoinableQueue()  # 生成一個隊列對象
    p1 = Process(target=producer,args=('大廚egon','包子',q))
    p2 = Process(target=producer,args=('靚仔tank','生蠔',q))
    c1 = Process(target=consumer,args=('吃貨owen',q))
    c2 = Process(target=consumer,args=('坑貨kevin',q))
    p1.start()
    p2.start()
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    # 等待生產者生產完全部的數據
    p1.join()
    p2.join()
    # 在生產者生產完數據以後,往隊列裏面放一個提示性的消息,告訴消費者已經沒有數據,你走吧,不要等了
    # q.put(None)
    # q.put(None)
    q.join()  # 等待隊列中數據所有取出
    print('主')
相關文章
相關標籤/搜索