多任務處理方式之一:多進程

進程的理解:

  • 一、系統進行資源分配調度的基本單位,一個具備必定獨立功能程序關於某個數據集合的一次運行活動python

  • 二、它是一個動態的概念,一個活動的實體算法

    • 狹義定義:an instance of a computer program that is being executed正在運行的程序的實例化對象
    • 廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動,是操做系統進行資源分配和調度的基本單位,是操做系統動態執行的基本單元
  • 注:其概念的關鍵點在於安全

1)、進程是一個實體(動態的),具備本身獨立的地址空間,包括:
 	文本區域(text region):存儲處理器執行的代碼;
 	數據區域(data region):存儲變量與進程執行期間使用的動態分配的內存;
 	堆棧(stack region):存儲的是程序執行過程當中調用的指令與本地變量;
 	
注:正是因爲每一個進程是一個獨立的實體,其中以上所述的三個區域,即每一個進程的數據區域以及堆棧是獨立的,相互隔離的,因此在多進程中能夠保證數據的安全性
2)、編寫完的代碼,沒有運行時,稱爲程序,
    正在運行的代碼,稱爲進程
    程序是死的(靜態的),進程是活的(動態的)
  • 三、進程的三大狀態bash

    • (1) 就緒(Ready)狀態
      進程建立完成即其餘全部資源都已分配完畢,等待cpu調度執行時,稱爲就緒狀態。
    • (2) 執行(Running)狀態
      cpu開始執行該進程時稱爲執行狀態。
    • (3) 阻塞(Blocked)狀態
      因爲等待某個事件發生而沒法執行時,即是阻塞狀態,cpu執行其餘進程.例如,等待I/O完成input、申請緩衝區不能知足等等。

    如圖所示
    服務器


CPU 調度進程的方式

  • 先來先服務fcfs(first come first server):先來的先執行
  • 短做業優先算法:分配的cpu多,先把短的算完
  • 時間片輪轉算法:每個任務就執行一個時間片的時間.而後就執行其餘的
  • 多級反饋隊列算法
    • 越是時間長的,cpu分配的資源越少,優先級靠後
    • 越是時間短的,cpu分配的資源越多

建立進程

導入multiprocessing模塊中的Process類以供後續建立類的時候直接調用併發

p = Process(target = func, name = process01, args=(5,)) 實例化進程對象app

Process 類參數介紹
  • target = func 表示調用對象,即子進程要執行的任務 func
  • args 表示任務 func 的位置參數元組,args=(5, )
  • name = process01 爲子進程的名稱
Process 類常⽤⽅法
  • p.start( ): 啓動進程,並調用該子進程中的p.run( )
  • p.run( ): 進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要寫入該方法
  • p.terminate( ): 強制終止進程p,不會進行任何清理操做
  • p.is_alive( ): 若是p仍然運行,返回True。用來判斷進程是否還在運行
  • p.join([timeout]): 主進程等待子進程p終止,timeout是可選的等待時間
# 主進程速度快於子進程,join方法可使得子進程執行結束後,再繼續執行主進程中的代碼,能夠用來同步代碼的一致性

import multiprocessing


def func():
    print("發送第一份郵件")
    
    
if __name__ == "__main__":
    p = multiprocessing.Process(target=func)
    p.start()
    p.join()
    
    print("發送第二份郵件")
    
# 發送第一份郵件
# 發送第二份郵件
# 多個子進程配合 join 方法實現異步併發

import multiprocessing


def func(index):
    print(f"發送第{index}封郵件")
   

if __name__ == "__main__":
    process_list = []
    for i in range(10):
        p = multiprocessing.Process(target=func, args=(i, ))
        p.start()
        process_list.append(p)
        # p.join() 程序會變成同步阻塞
	
    for i in process_list:
        i.join()  # 異步併發
        
    print("主進程發最後一封郵件!")
Process類常⽤屬性
  • name: 當前進程實例別名, 默認爲Process-N, N爲從1開始遞增的整
  • pid: 當前進程實例的ID值
建立進程的兩種方法
# 建立進程的方法一:
# 利用multiprocessing模塊提供一個Process類來建立一個進程對象

from multiprocessing import Process
import time


def func(n):
	while n > 0:
		print(n)
		time.sleep(3)
		n -= 1
if __name__ == "__main__":
	p = Process(target = func, args=(5,))
	p.start()
	p.join()
# 建立進程的方法二:
# 建立新的進程能夠自定義一個類去繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象

import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
	def run(self):
		n = 5
		while n > 0:
			print(n)
			time.sleep(3)
			n -= 1
if __name__ == "__main__":
	p = ClockProcess()
	p.start()
	p.join()

守護進程

  • 守護 主進程 時,若是主進程執行結束了,意味着守護進程的壽命馬上終止.馬上殺死
  • 語法:
    • 進程對象.daemon = True 設置當前進程爲守護進程
  • 必須寫在start( )調用進程以前進行設置
  • 默認狀況下,主進程會等待全部子進程執行完畢以後,關閉程序,釋放資源。若不等待,子進程並不方便管理,容易形成殭屍進程,在後臺不停的佔用系統的資源(cpu和內存),不清楚進程的來源。
  • 守護主進程即在主進程代碼執行結束以後,無需等待子進程執行,當即殺死程序
import multiprocessing


def func():
    print("start 當前子進程")
    print("end   當前子進程")

    
if __name__ == "__main__":
    p = multiprocessing.Process(target=func)
    p.daemon = True
    p.start()
    print("主進程執行結束 ... ")
    
# 主進程執行結束 ...

多個子進程下,未守護主進程,主進程仍會等待子進程執行結束dom

  • 守護進程的實際用途:監控報活
import time


# 監控報活
def alive():
    while True:
        print("給監控服務器發消息, 當前5號服務器功能正常 i am ok ~")
        time.sleep(1)


# 當前服務器正常完成的功能
def func():
    time.sleep(5)
    print("當前5號服務器功能,統計財務報表~")


if __name__ == "__main__":
    p1 = Process(target=func)
    p2 = Process(target=alive)
    
    # 守護p2進程
    p2.daemon = True

    p1.start()
    p2.start()

    # 等待p1子進程執行結束以後,下面的主程序的代碼纔會放行;
    p1.join()
	
	# 未守護主進程,主進程會默認等待
    print("當前服務器狀態:統計財務報表功能異常.....")
    
    
# 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~
# 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~
# 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~
# 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~
# 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~
# 當前5號服務器功能,統計財務報表~
# 給監控服務器發消息, 當前5號服務器功能正常 i am ok ~
# 當前服務器狀態:統計財務報表功能異常.....

多任務處理方式一:多進程

建立多進程的兩種方式:
# 手動建立

from multiprocessing import Process
 
num = 1


def run1():
    global num
    num += 5
    print("子進程1運行中,num = %d" % (num))


def run2():
    global num
    num += 10
    print("子進程2運行中,num = %d" % (num))


if __name__ == "__main__":
    print("父進程啓動")
    p1 = Process(target=run1)
    p2 = Process(target=run2)
    print("子進程將要執行")
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("子進程結束")
# 藉助舊版進程池建立多進程

from multiprocessing import Pool
import random
import time


def work(num):
    print(random.random() * num)
    time.sleep(3)


if __name__ == "__main__":
    # 實例化進程池對象,設置同一時間內最多能夠執行的進程數爲3個
    # 題中的10個任務都由進程池中的這三個進程輪詢執行,不會建立額外		的進程數
    # 若不指定則同一時間內能夠執行的進程個數默認爲cpu邏輯核心數
    p = Pool(3)
    for i in range(10):
        
        # apply_async 選擇要調用的任務,每次循環出來的任務會用閒下來的子進程去執行
        # 使⽤⾮阻塞⽅式調⽤func(並⾏執⾏,阻塞⽅式必須爲等待上⼀個進程退出後才能執⾏下⼀個進程), args爲傳遞給func的參數列表,kwargs爲傳遞給func的關鍵字參數列表;

        p.apply_async(work, (i,))

    # 進程池關閉以後不會再接受新的請求
    p.close()
    # 等待進程池中的全部子進程都結束
    p.join()

# 多進程中,主進程通常用來等待子進程執行完畢,真正的任務都由子進程中執行
# 藉助新版進程池建立多進程

from concurrent.futures import ProcessPoolExecutor
import os
import time


def func(i):
    print("任務執行中... start", os.getpid())
    time.sleep(10)
    print("任務結束... end", i)
    return i


# ProcessPoolExecutor 進程池基本使用

"""
默認若是一個進程短期內能夠完成更多的任務,就不會建立額外的新的進程,以節省資源
"""
if __name__ == "__main__":
    lst = []
    print(os.cpu_count())  # cpu邏輯核心數
    # 建立進程池對象
    """進程池中默認最多建立cpu這麼多個進程,全部任務全由這幾個進程完成,不會額外建立進程"""
    p = ProcessPoolExecutor()

    # 異步提交任務
    for i in range(10):
        res = p.submit(func, i)
        lst.append(res)

    # 獲取當前進程池返回值
    for i in lst:
        print(i.result())

    # 等待全部子進程執行結束
    p.shutdown()  # join

    print("主程序執行結束....")

進程間通訊

進程間數據不共享,他們之間進行數據傳遞即爲通訊異步

from multiprocessing import Queueasync

藉助進程隊列Queue完成進程間的通訊

Queue 的基本使用
  • 消息隊列遵循 先進先出 的原則
初始化
  • 初始化Queue()對象時(q=Queue()),若括號中沒有指定最⼤可接收
    的消息數量
    , 或數量爲負值, 那麼就表明可接受的消息數量沒有上限
入隊操做(存數據)
  • q = Queue()
  • q.put(item, [block[, timeout]])將item消息寫⼊隊列
  • block 默認值爲True
    • 若是block 使⽤默認值,且沒有設置timeout(單位秒)時,若消息列隊已經沒有空間可寫⼊,此時程序將被阻塞(停在寫⼊狀態) ,直到從消息列隊騰出空間爲⽌,若是設置了True和timeout,則會等待timeout秒,若還沒空間,則拋 出"q.Full"的異常信息
  • 若是block值爲False, 消息列隊若是出現沒有空間可寫⼊的狀況, 則會⽴刻拋出"q.Full"滿了異常
  • q.put_nowait(item): 至關q.put(item, False)
出隊操做(取數據)
  • q.get([block[, timeout]]):獲取隊列中的⼀條消息, 而後將其從列隊中移除

  • block默認值爲True

  • 若是block使⽤默認值,且沒有設置timeout(單位秒),消息列隊若是爲空, 此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息爲⽌,

  • 若是設置了timeout, 則會等待timeout秒, 若還沒讀取到任何消息, 則拋
    出"q.Empty"異常

  • 若是block值爲False,消息列隊若是爲空,則會⽴刻拋出「q.Empty」空的異常

  • q.get_nowait():至關q.get(False)

其餘操做
  • q = Queue()
  • q.qsize(): 返回當前隊列包含的消息數量
  • q.empty(): 若是隊列爲空, 返回True, 反之False
  • q.full(): 若是隊列滿了, 返回True,反之False
python代碼實現
from multiprocessing import Queue, Process
import time


def write(q):
    for value in ["a", "b", "c"]:
        print("開始寫入:", value)
        q.put(value)
        time.sleep(2)


def read(q):
    while True:
        if not q.empty():
            print("讀取到的是", q.get())
            time.sleep(2)
        else:
            break


if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()
    pw.join()  #等待接收完畢

    pr.start()
    pr.join()
    print("接受完畢!")
# 三個進程間通訊

from multiprocessing import Process
from multiprocessing import Queue


def func1(q1):
    q1.put("你好!")
    print(f"子進程p1往隊列q1中放入的數據爲:你好!")


def func2(q1, q2):
    msg = q1.get()
    print(f"子進程p2從隊列q1中取出的數據爲:{msg}")
    q2.put(msg)
    print(f"子進程p2往隊列q2中放入的數據爲:{msg}")


def func3(q2):
    msg = q2.get()
    print(f"子進程p3從隊列q2中取出的數據爲:{msg}")


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()

    p1 = Process(target=func1, args=(q1,))
    p2 = Process(target=func2, args=(q1, q2))
    p3 = Process(target=func3, args=(q2,))

    p1.start()
    p2.start()
    p3.start()
JoinableQueue 的用法
# put 存儲
# get 獲取
# task_done 隊列計數減1
# join 阻塞

# task_done 配合 join 一塊兒使用
# [1,2,3,4,5]
# 隊列計數5 
# put 一次 每存放一個值,隊列計數器加1
# get 一次 經過task_done讓隊列計數器減1
# join 函數,會根據隊列中的計數器來斷定是阻塞仍是放行
# 若是計數器變量是0,意味着放行,其餘狀況阻塞;


from multiprocessing import Process,JoinableQueue


jq = JoinableQueue()

# put 會讓隊列計數器加1
jq.put("a")
print(jq.get())

# 經過task_done,讓隊列計數器減1
jq.task_done()

# 只有隊列計數器是0的時,纔會放行
jq.join() # 隊列.join
print("finish")
生產者——消費者模型
Queue下的生產者——消費者模型:
# 消費者模型
def consumer(q, name):
	while True:
		food = q.get()
		if food is None:
			break
		time.sleep(random.uniform(0.1, 1))
		print("%s 吃了一個%s" % (name, food))


# 生產者模型
def producer(q, name, food):
	for i in range(5):
		time.sleep(random.uniform(0.1, 1))
		print("%s 生產了 %s%s" % (name, food, i))
		q.put(food + str(i))


if __name__ == "__main__":
	q = Queue()
	# 消費者1
	p1 = Process(target=consumer, args=(q, "張三"))
	p1.start()
	# 消費者2
	a2 = Process(target=consumer, args=(q, "李四"))
	a2.start()

	# 生產者1
	p2 = Process(target=producer, args=(q, "王五", "黃金"))
	p2.start()

	# 生產者2
	b2 = Process(target=producer, args=(q, "小明", "鑽石"))
	b2.start()

	# 在生產完全部的數據以後,在隊列的末尾塞入一個None
	p2.join()
	b2.join()
	# 消費者模型若是獲取的是None,表明中止消費
	q.put(None)
	q.put(None)
JoinableQueue 下的生產者——消費者模型:
from multiprocessing import Process,JoinableQueue


# 消費者模型
def consumer(q, name):
    while True:
        food = q.get()
        time.sleep(random.uniform(0.1, 1))
        print("%s 吃了一個%s" % (name, food))
        q.task_done()
        

# 生產者模型
def producer(q, name, food):
    for i in range(5):
        time.sleep(random.uniform(0.1, 1))
        print("%s 生產了 %s%s" % (name, food, i))
        q.put(food + str(i))


if __name__ == "__main__":
    q = JoinableQueue()
    # 消費者1
    p1 = Process(target=consumer, args=(q, "張三"))
    p1.daemon = True
    p1.start()

    # 生產者1
    p2 = Process(target=producer, args=(q, "李四", "黃金"))
    p2.start()

    # 把生產者全部的數據都裝載到隊列中
    p2.join()

    # 當隊列計數器減到0的時候,會馬上放行
    # 必須等待消費者模型中全部的數據都task_done以後,變成0了就表明消費結束.
    q.join()

    print("程序結束....")
進程池中的進程之間的通訊
from multiprocessing import Manager, Pool
import time


def write(q):
    for i in "welcome":
        print("開始寫入", i)
        q.put(i)


def read(q):
    time.sleep(2)
    for i in range(q.qsize()):  # q.qsize()獲取到當前隊列的消息數量!
        print("獲得消息", q.get())


if __name__ == "__main__":
    print("主進程啓動!")
    q = Manager().Queue()
    po = Pool()
    po.apply_async(write, (q,))
    po.apply_async(read, (q,))
    po.close()
    po.join()
相關文章
相關標籤/搜索