python — 進程

目錄css

1. 進程

  • 1.進程就是一個運行中的程序(是對正在運行程序的一個抽象)。python

  • 2.程序和進程之間的區別:linux

    • 程序只是一個文件
    • 進程是這個文件被CPU運行起來了
    • 程序是永久的,進程是暫時的。
  • 3.進程—是計算機中最小的資源分配單位
    在操做系統中的惟一標識符 :pidios

    注意:同一個程序執行兩次,就會在操做系統中出現兩個進程,因此咱們能夠同時運行一個軟件,分別作不一樣的事情也不會混亂。web

  • 4.進程是數據隔離的,進程的數據隔離是由操做系統完成的。redis

2. 進程調度

要想多個進程交替運行,操做系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是須要遵循必定的法則,由此就有了進程的調度算法。算法

進程的調度是由操做系統完成的(咱們不能干預)。數據庫

操做系統調度進程的算法:json

  • 1.短做業優先算法
  • 2.先來先服務算法
  • 3.時間片輪轉
  • 4.多級反饋算法(融合了前三種算法)

3. 進程的並行與併發

  • 並行:windows

    兩個程序、兩個CPU,每一個程序分別佔用一個CPU本身執行本身的

    看起來是同時執行,實際在每個時間點上都在各自執行着

  • 併發:

    兩個程序、一個cpu,每一個程序交替的在一個cpu上執行

    看起來在同時執行,可是實際上仍然是串行

4. 同步異步阻塞非阻塞

  • 1.同步

    同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。

    簡單說:調用一個方法,要等待這個方法結束。

    如:燒水 和 吹頭髮
        正在燒水
        停下燒水這個動做去吹頭髮
        吹完頭髮以後繼續燒水
  • 2.異步

    異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列

    簡單說:調用一個方法,不等待這個方法結束,也不關心這個方法作了什麼。

    如:燒水 和 吹頭髮
        正在燒水
        開始吹頭髮,但燒水也在繼續進行
  • 3.阻塞

    cpu不工做

    阻塞影響了程序運行的效率。

  • 4.非阻塞

    cpu工做

  • 5.同步阻塞

    效率最低

    例1:conn.recv

    ​ socket 阻塞的tcp協議的時候

    例2:你專心排隊,什麼別的事都不作。

  • 6.同步非阻塞

    其實是效率低下的

    例1:func() 沒有io操做

    ​ socket 非阻塞的tcp協議的時候

    ​ 調用函數(這個函數內部不存在io操做)

    例2:你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的。

  • 7.異步非阻塞

    效率更高

    例:把func扔到其餘任務裏去執行了

    ​ 我自己的任務和func任務各自執行各自的,沒有io操做

  • 8.異步阻塞

    異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。

    若是在銀行等待辦理業務的人採用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間裏他不能離開銀行作其它的事情,那麼很顯然,這我的被阻塞在了這個等待的操做上面;

5. 進程的三狀態圖

在瞭解其餘概念以前,咱們首先要了解進程的幾個狀態。在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。

  • 1.就緒(Ready)狀態

    當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。

  • 2.執行/運行(Running)狀態當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。

  • 3.阻塞(Blocked)狀態正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。

6. multiprocessing模塊

multiprocessing—— multi:multiple 多元的 processing 進程

import os
import time
print('start')
time.sleep(20)
print(os.getpid(),os.getppid(),'end')

os.getpid() 獲取當前進程的pid

pid process id 子進程

ppid parent process id 父進程

父進程:在父進程中建立子進程

  • 在pycharm中啓動的全部py程序都是pycharm的子進程
import os
import time
from multiprocessing import Process

def func():
    print('start',os.getpid())
    time.sleep(1)
    print('end',os.getpid())

if __name__ == '__main__':
    p = Process(target=func)
    p.start()   # 異步 調用開啓進程的方法,可是並不等待這個進程真的開啓
    print('main :',os.getpid())
    
    
import os
import time
from multiprocessing import Process

def eat():
    print('start eating',os.getpid())
    time.sleep(1)
    print('end eating',os.getpid())

def sleep():
    print('start sleeping',os.getpid())
    time.sleep(1)
    print('end sleeping',os.getpid())

if __name__ == '__main__':
    p1 = Process(target=eat)    # 建立一個即將要執行eat函數的進程對象
    p1.start()                  # 開啓第一個子進程
    p2 = Process(target=sleep)  # 建立一個即將要執行sleep函數的進程對象
    p2.start()                  # 開啓第二個子進程
    print('main :',os.getpid())

1.小知識補充:

if __name__ == '__main__':

  • 控制當這個py文件被看成腳本直接執行的時候,就執行這裏面的代碼
  • 當這個py文件被看成模塊導入的時候,就不執行這裏面的代碼

__name__有兩種狀況:

  • __name__ == '__main__'

    執行的文件就是__name__所在的文件

  • `name == '文件名'``

    `__name__所在的文件被導入執行的時候

2.操做系統建立進程的方式不一樣:

  • windows操做系統執行開啓進程的代碼

    實際上新的子進程須要經過import父進程的代碼來完成數據的導入工做(再一次執行父進程文件中的代碼來獲取父進程中的數據),因此有一些內容咱們只但願在父進程中完成,就寫在if __name__ == '__main__':下面

  • ios linux操做系統建立進程 fork

    正常的寫就能夠,不須要寫if __name__ == '__main__'

3.主進程和子進程之間的關係:

  • 主進程沒結束 :等待子進程結束
  • 主進程負責回收子進程的資源
  • 若是子進程執行結束,父進程沒有回收資源,那麼這個子進程會變成一個殭屍進程

主進程的結束邏輯:

  • 主進程的代碼結束,等待全部的子進程結束,給子進程回收資源,主進程結束。

主進程怎麼知道子進程結束了的呢? —— 基於網絡、文件

4.join方法

join方法:同步阻塞,直到子進程結束就結束

把一個進程的結束事件封裝成一個join方法

執行join方法的效果:就是 阻塞,直到這個子進程執行結束就結束阻塞。

# 開起一個子進程
import time
from multiprocessing import Process
def send_mail():
    time.sleep(3)
    print('發送了一封郵件')
if __name__ == '__main__':
    p = Process(target=send_mail)
    p.start()   # 異步 非阻塞
    # time.sleep(5)
    print('join start')
    p.join()    # 同步 阻塞,直到p對應的進程結束以後才結束阻塞
    print('5000封郵件已發送完畢')
    
# 開起多個子進程
# 開啓10個進程,給公司的5000我的發郵件,發送完郵件以後,打印一個消息「5000封郵件已發送完畢」
import time
import random
from multiprocessing import Process
def send_mail(a):
    time.sleep(random.random())
    print('發送了一封郵件',a)

if __name__ == '__main__':
    l = []
    for i in range(10):
        p = Process(target=send_mail,args=(i,))
        p.start()
        l.append(p)
    print(l)
    for p in l:p.join()
    # 阻塞 直到上面的十個進程都結束
    print('5000封郵件已發送完畢') # 全部的子進程都結束以後要執行的代碼寫在這裏

7. 守護進程

1.有一個參數能夠把一個子進程設置爲一個守護進程

import time
from multiprocessing import Process

def son1(a,b):
    while True:
        print('is alive')
        time.sleep(0.5)

def son2():
    for i in range(5):
        print('in son2')
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target=son1,args=(1,2))
    p.daemon = True
    p.start()      # 把p子進程設置成了一個守護進程
    p2 = Process(target=son2)
    p2.start()
    time.sleep(2)  # son1只執行了2S

守護進程是隨着主進程的代碼結束而結束的

應用場景:

  • 生產者消費者模型的時候
  • 和守護線程作對比的時候

全部的子進程都必須在主進程結束以前結束,由主進程來負責回收資源

p.is_alive() 判斷一個子進程是否還活着

p.terminate() 強制結束一個子進程

import time
from multiprocessing import Process

def son1():
    while True:
        print('is alive')
        time.sleep(0.5)

if __name__ == '__main__':
    p = Process(target=son1)
    p.start()      # 異步 非阻塞
    print(p.is_alive())
    time.sleep(1)
    p.terminate()   # 異步的 非阻塞
    print(p.is_alive())   # 進程還活着 由於操做系統還沒來得及關閉進程
    time.sleep(0.01)
    print(p.is_alive())   # 操做系統已經響應了咱們要關閉進程的需求,再去檢測的時候,獲得的結果是進程已經結束了

什麼是異步非阻塞?—— terminate

2.Process類使用面向對象的方式:

def start(self) ——> None :……開起了一個子進程,調用run。

def run(self) ——> None :……

import os
import time
from multiprocessing import Process

class MyProcecss1(Process):
    def __init__(self,x,y):  # 傳參數,不要忘記寫init方法
        self.x = x
        self.y = y
        super().__init__()
    def run(self):  # 子進程中要作的事情寫在run方法中
        print(self.x,self.y,os.getpid())
        while True:
            print('is alive')
            time.sleep(0.5)

class MyProcecss2(Process):
    def run(self):
        for i in range(5):
            print('in son2')
            time.sleep(1)

if __name__ == '__main__':
    mp = MyProcecss1(1,2)
    mp.daemon = True
    mp.start()
    print(mp.is_alive())
    mp.terminate()
    mp2 = MyProcecss2()
    mp2.start()
    print('main :',os.getpid())
    time.sleep(1)

Process類的總結:

1.開啓進程的方式

  • 1.面向函數:

    def 函數名:要在子進程中執行的代碼
    p = Process(target= 函數名,args=(參數1,))

  • 2.面向對象:

    class 類名(Process):
    def init(self,參數1,參數2): # 若是子進程不須要參數能夠不寫
    self.a = 參數1
    self.b = 參數2
    super().__init__()
    def run(self): # 要在子進程中執行的代碼
    p = 類名(參數1,參數2)

2.Process提供的操做進程的方法

  • p.start() 開啓進程 異步非阻塞
  • p.terminate() 結束進程 異步非阻塞
  • p.join() 同步阻塞
  • p.isalive() 獲取當前進程的狀態
  • daemon = True 設置爲守護進程,守護進程永遠在主進程的代碼結束以後自動結束

8. 鎖

併發 可以作的事兒,如:

[TOC]

  • 1.實現可以響應多個client端的server
  • 2.搶票系統

1.若是在一個併發的場景下,涉及到某部份內容,是須要修改一些全部進程共享的數據資源,須要加鎖來維護數據的安全。

2.在數據安全的基礎上,才考慮效率問題

3.同步存在的意義:數據的安全性

在主進程中實例化 lock = Lock(),把這把鎖傳遞給子進程,在子進程中,對須要加鎖的代碼 進行 with lock:

  • with lock至關於lock.acquire()和lock.release()
import time
import json
from multiprocessing import Process,Lock

def search_ticket(user):
    with open('ticket_count') as f:
        dic = json.load(f)
        print('%s查詢結果  : %s張餘票'%(user,dic['count']))

def buy_ticket(user,lock):
    # with lock:  # 至關於lock.acquire() + lock.release()
    # lock.acquire()   # 給這段代碼加上一把鎖
        time.sleep(0.02)
        with open('ticket_count') as f:
            dic = json.load(f)
        if dic['count'] > 0:
            print('%s買到票了'%(user))
            dic['count'] -= 1
        else:
            print('%s沒買到票' % (user))
        time.sleep(0.02)
        with open('ticket_count','w') as f:
            json.dump(dic,f)
    # lock.release()   # 給這段代碼解鎖

def task(user, lock):
    search_ticket(user)
    with lock:
        buy_ticket(user, lock)

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=task,args=('user%s'%i,lock))
        p.start()

在進程中須要加鎖的場景:

  • 共享的數據資源(文件、數據庫)
  • 對資源進行修改、刪除操做

加鎖以後可以保證數據的安全性 可是也下降了程序的執行效率

9. 進程之間的通訊

進程之間的數據是隔離的

from multiprocessing import Process
n = 100
def func():
    global n
    n -= 1

if __name__ == '__main__':
    p_l = []
    for i in range(10):
        p = Process(target=func)
        p.start()
        p_l.append(p)
    for p in p_l:p.join()
    print(n)
# 主進程數據中的變量的值 不會隨着 子進程數據中變量值的修改而改變

進程之間的通訊 - IPC(inter process communication)

Queue 隊列

  • Queue 天生就是數據安全的

  • Queue 基於文件家族的socket執行的,基於 pickle操做的,基於 lock實現的。

    from multiprocessing import Queue,Process
    # 先進先出
    def func(exp,q):
        ret = eval(exp)
        q.put({ret,2,3})
        q.put(ret*2)
        q.put(ret*4)
    
    if __name__ == '__main__':
        q = Queue()
        Process(target=func,args=('1+2+3',q)).start()
        print(q.get())
        print(q.get())
        print(q.get())

Pipe 管道

  • pipe 管道(不安全的) —— 基於文件家族的socket 、pickle

    from multiprocessing import Pipe
    pip = Pipe()
    pip.send()
    pip.recv()

隊列 = 管道 + 鎖

import queue
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)   # 當隊列爲滿的時候再向隊列中放數據 隊列會阻塞
print('5555555')
try:
    q.put_nowait(6)  # 當隊列爲滿的時候再向隊列中放數據,會報錯而且會丟失數據(不安全,不建議使用)
except queue.Full:
    pass
print('6666666')

print(q.get())
print(q.get())
print(q.get())   # 在隊列爲空的時候會發生阻塞
print(q.get())   # 在隊列爲空的時候會發生阻塞
print(q.get())   # 在隊列爲空的時候會發生阻塞
try:
    print(q.get_nowait())   # 在隊列爲空的時候 直接報錯
except queue.Empty:pass

IPC機制:

  • 內置模塊(基於文件):Queue隊列、Pipe管道
  • 第三方工具(軟件)提供給咱們的IPC機制(基於網絡):redis / memcache / kafka / rabbitmq 發揮的都是消息中間件的功能

第三方IPC的優勢:

  • 併發需求
  • 高可用
  • 斷電保存數據
  • 解耦

隊列:進程之間數據安全

管道:進程之間數據不安全

10. 解耦

解耦 :修改 複用 可讀性

程序的解耦:把寫在一塊兒的大的功能分開成多個小的功能處理(如:登錄 註冊)

生產者消費者模型:

什麼是生產者消費者模型?

  • 把一個產生數據而且處理數據的過程解耦,讓生產數據的過程和處理數據的過程達到一個工做效率上的平衡。

  • 中間的容器,在多進程中咱們使用隊列或者可被join的隊列,作到控制數據的量:

    • 當數據過剩的時候,隊列的大小會控制這生產者的行爲
    • 當數據嚴重不足的時候,隊列會控制消費者的行爲
    • 而且咱們還能夠經過按期檢查隊列中元素的個數來調節生產者消費者的個數
  • 好比說:一個爬蟲或者一個web程序的server端

    • 爬蟲:

      請求網頁的平均時間是0.3s

      處理網頁代碼的時候是0.003s

      100倍,每啓動100個線程生產數據

      啓動一個線程來完成處理數據

    • web程序的server端:

      每秒鐘有6w條請求

      一個服務每s中只能處理2000條

      先寫一個web程序,只負責一件事情,就是接收請求,而後把請求放到隊列中

      再寫不少個server端,從隊列中獲取請求,而後處理,而後返回結果

import time
import random
from multiprocessing import Process,Queue

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.random())
        fd = '%s%s'%(food,i)
        q.put(fd)
        print('%s生產了一個%s'%(name,food))

def consumer(q,name):
    while True:
        food = q.get()
        if not food:break
        time.sleep(random.randint(1,3))
        print('%s吃了%s'%(name,food))

def cp(c_count,p_count):
    q = Queue(10)
    for i in range(c_count):
        Process(target=consumer, args=(q, 'alex')).start()
    p_l = []
    for i in range(p_count):
        p1 = Process(target=producer, args=(q, 'wusir', '燒餅'))
        p1.start()
        p_l.append(p1)
    for p in p_l:p.join()
    for i in range(c_count):
        q.put(None)
if __name__ == '__main__':
    cp(2,3)
# 用生產者消費者模式實現爬蟲的例子
from multiprocessing import Process,Queue
import requests

import re
import json
def producer(q,url):
    response = requests.get(url)
    q.put(response.text)

def consumer(q):
    while True:
        s = q.get()
        if not s:break
        com = re.compile(
            '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>'
            '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)評價</span>', re.S)
        ret = com.finditer(s)
        for i in ret:
            print({
                "id": i.group("id"),
                "title": i.group("title"),
                "rating_num": i.group("rating_num"),
                "comment_num": i.group("comment_num")}
            )

if __name__ == '__main__':
    count = 0
    q = Queue(3)
    p_l = []
    for i in range(10):
        url = 'https://movie.douban.com/top250?start=%s&filter='%count
        count+=25
        p = Process(target=producer,args=(q,url,)).start()
        p_l.append(p)
    p = Process(target=consumer, args=(q,)).start()
    for p in p_l:p.join()
    q.put(None)

11. JoinableQueue類

JoinableQueue 與 Queue使用成本同樣,但JoinableQueue 更嚴緊一些

import time
import random
from  multiprocessing import JoinableQueue,Process

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.random())
        fd = '%s%s'%(food,i)
        q.put(fd)
        print('%s生產了一個%s'%(name,food))
    q.join()

def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.random())
        print('%s吃了%s'%(name,food))
        q.task_done()

if __name__ == '__main__':
    jq = JoinableQueue()
    p =Process(target=producer,args=(jq,'wusir','燒餅'))
    p.start()
    c = Process(target=consumer,args=(jq,'alex'))
    c.daemon = True
    c.start()
    p.join()

12. 進程之間的數據共享-Manager類

multiprocessing中有一個manager類,封裝了全部和進程相關的(包括:數據共享、數據傳遞)、與共享相關的數據類型,可是對於 字典 、列表這一類的數據操做的時候會產生數據不安全,須要加鎖解決問題,而且須要儘可能少的使用這種方式。

```python
from multiprocessing import Manager,Process,Lock
def func(dic,lock):
with lock:
dic['count'] -= 1

if name == 'main': # m = Manager() with Manager() as m: l = Lock() dic = m.dict({'count':100}) p_l = [] for i in range(100): p = Process(target=func,args=(dic,l)) p.start() p_l.append(p) for p in p_l:p.join() print(dic) ``

相關文章
相關標籤/搜索