day20-多併發編程基礎(一)

從新寫一下吧,系統奔潰了,之前寫的徹底沒了,悲催,今日主要寫進程python

1. 進程的理論知識linux

2. python中的進程操做nginx

開始今日份整理,加油,你是最胖的!!!web

 

1. 進程的理論知識redis

1.1 操做系統的背景知識算法

  顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。數據庫

  進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。操做系統的其餘全部內容都是圍繞進程的概念展開的。macos

PS:即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。編程

必備的理論基礎:json

#一 操做系統的做用:
    1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口
    2:管理、調度進程,而且將多個進程對硬件的競爭變得有序

#二 多道技術:
    1.產生背景:針對單核,實現併發
    ps:
    如今的主機通常是多核,那麼每一個核都會利用多道技術
    有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個
    cpu中的任意一個,具體由操做系統調度算法決定。
    
    2.空間上的複用:如內存中同時有多道程序
    3.時間上的複用:複用一個cpu的時間片
       強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣
            才能保證下次切換回來時,能基於上次切走的位置繼續運行

#三 I/O操做
    #i = input 輸入
    從任何其餘地方到--->內存
    讀文件,json.load,從網絡上接收信息
    #o = output 輸出
    內存--->放到任何其餘地方
    寫文件,json.dump,向網絡上發送數據 send sendto

1.2 進程的概念以及特性

  進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。

  • 狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。
  • 廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動。它是操做系統動態執行的基本單元,在傳統的操做系統中,進程既是基本的分配單元,也是基本的執行單元。

進程的概念

第一,進程是一個實體。每個進程都有它本身的地址空間,通常狀況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲着活動過程調用的指令和本地變量。
第二,進程是一個「執行中的程序」。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操做系統執行之),它才能成爲一個活動的實體,咱們稱其爲進程。[3] 
進程是操做系統中最基本、重要的概念。是多道程序系統出現後,爲了刻畫系統內部出現的動態狀況,描述系統內部各道程序的活動規律引進的一個概念,全部多道程序設計操做系統都創建在進程的基礎上。

操做系統引入進程的概念的緣由

從理論角度看,是對正在運行的程序過程的抽象;
從實現角度看,是一種數據結構,目的在於清晰地刻畫動態系統的內在規律,有效管理和調度進入計算機系統主存儲器運行的程序。

進程的特徵

動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。
併發性:任何進程均可以同其餘進程一塊兒併發執行
獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;
異步性:因爲進程間的相互制約,使進程具備執行的間斷性,即進程按各自獨立的、不可預知的速度向前推動
結構特徵:進程由程序、數據和進程控制塊三部分組成。
多個不一樣的進程能夠包含相同的程序:一個程序在不一樣的數據集裏就構成不一樣的進程,能獲得不一樣的結果;可是執行過程當中,程序不能發生改變。

進程與程序的區別

程序是指令和數據的有序集合,其自己沒有任何運行的含義,是一個靜態的概念。
而進程是程序在處理機上的一次執行過程,它是一個動態的概念。
程序能夠做爲一種軟件資料長期存在,而進程是有必定生命期的。
程序是永久的,進程是暫時的。

注意:同一個程序執行兩次,就會在操做系統中出現兩個進程,因此咱們能夠同時運行一個軟件,分別作不一樣的事情也不會混亂。

1.3 進程的調用方法

要想多個進程交替進行,操做系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是須要遵循必定的法則,由此就有了進程的調度算法。主要有

  • 先來先服務調度方法
  • 短做業優先調度方法
  • 時間片輪轉法,現代操做系統經常使用
  • 多級反饋隊列,多種調度方法的結合體

1.4 進程的併發與並行

並行 : 並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )

併發 : 併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。

區別:

並行是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
併發是從宏觀上,在一個時間段上能夠看出是同時執行的,好比一個服務器同時處理多個session。

1.5 同步\異步\阻塞\非阻塞

  1.5.1 進程的狀態

在瞭解其餘概念以前,咱們首先要了解進程的幾個狀態。在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。

  (1)就緒(Ready)狀態

  當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。

  (2)執行/運行(Running)狀態當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。

  (3)阻塞(Blocked)狀態正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。

下面這些就是我最蒙圈的一部份內容

  1.5.2 同步與異步

  • 所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。
  • 所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列

  1.5.3 阻塞與非阻塞

  阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的

  1.5.4 同步/異步與阻塞/非阻塞

  1. 同步阻塞形式

  效率最低。拿上面的例子來講,就是你專心排隊,什麼別的事都不作。

  1. 異步阻塞形式

  若是在銀行等待辦理業務的人採用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間裏他不能離開銀行作其它的事情,那麼很顯然,這我的被阻塞在了這個等待的操做上面;

  異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。

  1. 同步非阻塞形式

  其實是效率低下的。

  想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的。

  1. 異步非阻塞形式

  效率更高,

  由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換

  好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。

  不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來,一樣的,不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞

1.6 進程的建立與結束

  1.6.1 進程的建立

  但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。

  而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程:

  1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)

  2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)

  3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)

  4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)

  不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的。

  1.6.2 進程的結束

       1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出錯退出(自願,python a.py中a.py不存在)

  3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)

  4. 被其餘進程殺死(非自願,如kill -9)

2. python中的進程操做

  2.1 進程建立的倆種方式

進程的實例化

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:
1 group參數未使用,值始終爲None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name爲子進程的名稱

方式一:調用直接模塊實現

import time
from multiprocessing import Process

def funk(name):
    print('%s is running!'%name)
    time.sleep(2)
    print('%s is done!'%name)


if __name__ == '__main__':
    p1 = Process(target= funk,args=('子進程1',))
    p1.start()
    print('')

方式二:自建類,

import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):#這裏必須是RUN函數,不然沒法調用
        print('%s is running!'%self.name)
        time.sleep(2)
        print('%s is done!'%self.name)

if __name__ == '__main__':
    p1 = MyProcess('子進程1')

    p1.start()
    print('主進程!')

  2.2 進程id的查詢

第一種方式

from multiprocessing import Process

def func1(name):
    print('%s is running'%name)
    print('%s is done'%name)

if __name__ =='__main__':
    p1 = Process(target= func1,args=('子進程1',))
    p1.start()
    print(p1.pid)
    print('')

第二種方式

from multiprocessing import Process
import os

def func2(name):
    print('%s is running'%name)
    print('子進程爲%s,父進程爲%s'%(os.getpid(),os.getppid()))

if __name__ == '__main__':
    p2 = Process(target= func2,args=('子進程1',))
    p2.start()
    print('')
    print('主進程爲',os.getpid())

注意:補充內容,暫時沒想到調整放在哪裏

1.幾個概念:
    父進程:父進程執行的過程當中建立了子進程
    子進程:由父進程建立
    主進程:通常咱們直接執行的那個程序就是主進程,在pycharm中主進程的父進程就是pycharm的進程pid

2.windows爲何要有 if __name__ == '__main__':
    windows在開子進程的時候,會從父進程中引入文件,並執行一遍,會變成嵌套
    linux macos中只是複製一分內存空間,並無執行

3.如何 開啓多個子進程

4.如何給子進程傳參

from multiprocessing import Process
import os

def func(name):
    print('%s is start,id is %s,father id is %s'%(name,os.getpid(),os.getppid()))

if __name__ == '__main__':
    for i in range(10):
        p = Process(target= func,args= ('子進程%s'%i,))#這個時候傳參裏面必須是集合
        p.start()#非阻塞,只要告訴系統開啓就好,開啓並非真正的開啓

5.子進程有沒有返回值
不能有返回值
由於子進程函數中的函數返回值不能傳遞給父進程,由於內存隔離的問題

  2.3 孤兒進程與殭屍進程

殭屍進程

  • 有害
  • 父進程對子進程收屍,而子進程一直未關閉,父進程一直在等待

孤兒進程

  • 無害
  • 父進程掛了,子進程未掛,子進程會被__init__接管,一段時候後將子進程關閉

  2.4 進程對象的其餘屬性

p.join():主進程等待子進程結束,這個時候其餘子進程也是能夠運行,是併發操做的
主進程會默認等待子進程結束後才結束,父進程會回收子進程佔用的資源,join是單阻塞,n個進程用join控制
from multiprocessing import Process

def func(n):
print('發送郵件%s'%n)

if __name__ == '__main__':
p_list =[]
for i in range(10):
p = Process(target= func,args=(i,))
p_list.append(p)
p.start()
for p in p_list:
p.join()
print('所有郵件已經發送成功!')

p.pid():查看進程的id號

p.is_alive():查看進程是否活着

p.terminate(): 進程回收,強制結束一個進程,也是非阻塞的

p.name :進程名,在開進程的時候也能夠加name參數,對子進程重命名

  2.5 守護進程

主進程建立進程,有三個特色

  • 守護進程會在主進程代碼結束後終止
  • 守護進程沒法再開啓子進程
  • 守護進程只守護主進程的代碼結束,不守護子進程,若是真要守護子進程,則對子進程join

守護進程須要在進程開啓前設置

#守護進程小測試,設定守護進程對主函數以及子進程都守護!
from multiprocessing import Process
import time

def eye():
    while True:
        print('SERVER,我很好!')
        time.sleep(1)

def func2():
    print('進程2要作的事情!')
    time.sleep(8)
    print('進程2已經結束!')

def main():
    print('作我主要的事情')
    time.sleep(5)
    print('done')

if __name__ == '__main__':
    p = Process(target= eye,)
    p2 = Process(target=func2)
    p.daemon = True
    p.start()
    p2.start()
    main()
    p2.join()

  2.6 進程的同步控制--互斥鎖

將本來共享的內容進行加鎖,保證這個資源一次性只有一個在運行。加鎖後將本來併發的進程變爲串行,父進程創建鎖,子進程調用鎖。

首先看未加鎖的狀況。

from multiprocessing import Process,Lock
import time

def func(name):
    print('1,%s is running'%name)
    time.sleep(1)
    print('2,%s is runing!'%name)
    time.sleep(1)
    print('3,%s is running'%name)


if __name__ =='__main__':
    for i in range(3):
        ret = Process(target= func,args=('%s子進程'%i,))
        ret.start()

加鎖後

from multiprocessing import Process,Lock
import time

def func(name,mutex):
    mutex.acquire()#進程加鎖
    print('1,%s is running'%name)
    time.sleep(1)
    print('2,%s is runing!'%name)
    time.sleep(1)
    print('3,%s is running'%name)
    mutex.release()#進程解鎖


if __name__ =='__main__':
    mutex = Lock()
    for i in range(3):
        ret = Process(target= func,args=('%s子進程'%i,mutex))
        ret.start()

join與互斥鎖的區別

  • join是主進程總體堵塞,
  • 互斥鎖是對須要修改的地方作修改,保證數據的安全

模擬一次搶票的過程

import json
import time
from multiprocessing import Process,Lock

def show_ticket(name):
    with open('test.txt','r')as f1:
        file =json.load(f1)
    if file['number'] >=0:
        print('%s 查詢餘票數量,餘票數量爲%s'%(name,file['number']))
    else:
        print('%s 查詢餘票數量,餘票爲0')

def get_ticket(name):
    with open('test.txt','r')as f1:
        file = json.load(f1)
    time.sleep(0.2)
    if file['number'] >0:
        file['number'] -= 1
        print('%s 購票成功!'%name)
    else:
        print('餘票不足,%s 購票失敗'%name)
    time.sleep(0.2)
    with open('test.txt', 'w')as f2:
        json.dump(file, f2)

def func(name,lock):
    show_ticket(name)
    lock.acquire()
    get_ticket(name)
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    p_list = []
    for i in range(10):
        p = Process(target=func,args= ('購票者%s'%i,lock))
        p.start()
  • 多個進程搶佔同一個數據資源會形成數據不安全
  • 咱們必須犧牲效率來保證數據的安全性

  2.7 進程間的通行--隊列與管道

   2.7.1 隊列就是基於(管道+鎖)實現的,隊列中內存通訊

q =Queue():實例化一個管道

q.full():判斷隊列中是否滿,隊列是先進先進的體現

q.empty():清空隊列

q.put():添加到隊列中

q.get():從隊列中得到一個對象

隊列維護了一個秩序,先進先出(FIFO)

進程之間的通訊

IPC:Iter Process Communication

Pipe:管道,沒有鎖是進程之間數據不安全的機制,而管道+鎖==隊列,

隊列:是一個黑箱模塊,而隊列就是進程之間數據安全機制

第三方工具(消息中間件):memcache、redis、kafka、rabbitm

   2.7.2 隊列的模型---生產者消費者模型

   2.7.2.1 爲何要使用生產者消費者模型

生產者指的是生產數據的任務,消費者指的是處理數據的任務,在併發編程中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

   2.7.2.2 什麼是生產者消費者模型

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這個阻塞隊列就是用來給生產者和消費者解耦的

生產者:是進程

消費者:是進程

生產者和消費者之間,傳遞數據,是須要一個盤子(IPC)

以爬蟲爲例,使用生產者消費者模型是爲了平衡供需之間的關係

TIM圖片20190210095950

以吃包子爲例,看如下代碼

from multiprocessing import Queue,Process
import time

def consumer(name,q):
    while True:
        msg = q.get()#get 會阻塞,直到隊列中有一個數據
        if not msg :break
        time.sleep(0.7)
        print('%s 搶着吃了%s'%(name,msg))


def producer(name,produce,q):
    for i in range(20):
        msg = '%s %s'%(produce,i)
        q.put(msg)#因爲整個隊列大小隻有5,在最大數量時,阻塞
        print('%s 生產了%s'%(name,msg))
        time.sleep(0.2)


if __name__ == '__main__':
    q = Queue(5)
    p1 = Process(target=consumer,args=('飯桶1',q))
    p2 = Process(target=consumer,args=('飯桶2',q))
    p3 = Process(target=producer,args=('生產者','包子',q))
    p3.start()
    p1.start()
    p2.start()
    p3.join()#確認全部的生產者已經所有生產所有數據
    q.put(None)#發送最終標誌位,確認消費者收到明確的標誌
    q.put(None)

在隊列中還有一個JoinaLequeue的使用

這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

一些經常使用方法

q =JoinableQueue()#實例化一個對象

q.join()#生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

q.task_done() #使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

針對這一個能夠能夠對上面的吃包子進行一個多生產者進行改進

#2.0版本,使用JoinableQueue()
from multiprocessing import JoinableQueue,Process
import time

def consumer(name,q):
    while True:
        msg = q.get()#get 會阻塞,直到隊列中有一個數據
        if not msg :break
        time.sleep(0.7)
        print('%s 搶着吃了%s'%(name,msg))
        q.task_done()#消費者告訴生產者他已經吃了一個


def producer(name,produce,q):
    for i in range(20):
        msg = '%s %s'%(produce,i)
        q.put(msg)#因爲整個隊列大小隻有5,在最大數量時,阻塞
        print('%s 生產了%s'%(name,msg))
        time.sleep(0.2)
    q.join()#確保生產者生產的東西所有被吃完


if __name__ == '__main__':
    q = JoinableQueue(5)
    c1 = Process(target=consumer,args=('飯桶1',q))
    c1.daemon = True#生產者生產完以後,消費者也沒有存在的必要
    c2 = Process(target=consumer,args=('飯桶2',q))
    c2.daemon = True#生產者生產完以後,消費者也沒有存在的必要
    p1 = Process(target=producer,args=('生產者','包子',q))
    p2 = Process(target=producer,args=('生產者','叉燒包',q))
    p_list =[c1,c2,p1,p2]
    for p in p_list:
        p.start()
    p1.join()#確認全部的生產者已經所有生產所有數據
    p2.join()#確認全部的生產者已經所有生產所有數據

  2.8 進程間的數據共享(瞭解)

進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。

之後咱們會嘗試使用數據庫來解決如今進程之間的數據共享問題。

進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此

下面代碼示例圖

TIM圖片20190210113833

#目的:開啓100個進程,分別對共享數據減一操做
#方式一(失敗):
from multiprocessing import Manager,Process

def work(dict):
    dict['count']-=1

if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'count':100})
    p_list =[]
    for i in range(100):
        p = Process(target=work,args=(dic,))
        p_list.append(p)
        p.start()
    for p in p_list:
        p.join()

    print(dic['count'])
#結果會發現數據不是咱們預期的0,pc的內核越多,結果會越大


#方式二
from multiprocessing import Manager,Process,Lock
#Manager會提供不少數據共享的機制,可是對於一些基礎數據類型,他是數據不安全的
#須要咱們本身提供加鎖操做

def work(dict,lock):
    # lock.acquire()#加鎖操做
    # dict['count']-=1
    # lock.release()#解鎖操做
    with lock:#也能夠用這個,上下文管理機制
        dict['count']-=1#上文就是lock.acquire(),下文就是lock.release()

if __name__ == '__main__':
    lock = Lock()
    m = Manager()
    dic = m.dict({'count':100})
    p_list =[]
    for i in range(100):
        p = Process(target=work,args=(dic,lock))
        p_list.append(p)
        p.start()
    for p in p_list:
        p.join()

    print(dic['count'])

雖然說有manager這個管理機制,可是咱們平常中仍是不會使用這個,通常是使用第三方工具庫,來操做數據共享,畢竟不加鎖的數據共享是數據不安全的。

最後的結論就是平常工做中就不要使用manager這個機制,純粹就是給本身找麻煩。

注意:在多進程中或者是多線程中,數據共享,若是沒有加鎖,+=,-=,*=,/=都是數據不安全的,必須加鎖!

  2.9 進程池的概念以及使用

   2.9.1 進程池

進程池的概念:

  在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?

  在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。

multiprocess.Pool模塊

#1.實例化進程池的方法
Pool([numprocess  [,initializer [, initargs]]]):建立進程池

#2.參數介紹
1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
3 initargs:是要傳給initializer的參數組

#3.主要方法
1 p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
2 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
3 
4 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
5 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
6    
7 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
8 
9 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

注意:開啓進程池的時候,若是在開啓時沒有填寫數字,默認都是cpu的核心數量!

比較經常使用的就是p.map(funcation,iterable)、p.apply_async(funcation,args=()),不一樣的是p.apply_async須要用close和join來保證進程池的結束!

實際開啓一個進程池,而且看一下每一個進程的id

from multiprocessing import Pool
import os
import time

def func(i):
    print(i,os.getpid())
    time.sleep(0.2)

if __name__ == '__main__':
    p =Pool(4)#設定池子最大進程數,通常爲cpu+1/cpu數量
    for i in range(20):
        p.apply_async(func,args=(i,))#異步的提交任務
    p.close()#關閉池子,不是阻止進程池中已經有的進程,而是阻止再向池中異步的提交任務
    p.join()#阻塞池子

   2.9.2 進程池以及多進程的性能測試

  以上面的例子在加上開啓多個進程咱們能夠明顯看出多進程與進程池之間有明顯的效率

#進程池以及多進程的效率測試
from multiprocessing import Pool,Process
import os
import time

def func(i):
    print(i,os.getpid())


if __name__ == '__main__':
    start_time = time.time()
    p_list =[]
    for i in range(100):
        p = Process(target= func,args=(i,))
        p_list .append(p)
        p.start()
    for p in p_list:
        p.join()
    end_time = time.time()
    process_time = end_time-start_time

    start_time = time.time()
    p =Pool(4)#設定池子最大進程數,通常爲cpu+1/cpu數量
    for i in range(20):
        p.apply_async(func,args=(i,))#異步的提交任務
    p.close()#關閉池子,不是阻止進程池中已經有的進程,而是阻止再向池中異步的提交任務
    p.join()#阻塞池子
    end_time = time.time()
    pool_time = end_time-start_time

    print(process_time,pool_time)

#結果

7.379263877868652 0.4368298053741455,咱們能夠發現效率是有明顯的差異

若是咱們有多個任務,就開啓多少個進程,實際上對於咱們來講,是很是不划算的,因爲計算機的cpu數量時有限的,因此咱們起的進程數量是徹底和cpu的個數成比例的。

起多進程的意義

  • 爲了更好的利用cpu,因此若是咱們的程序中都是網絡i/o以及文件i/o,就不適合起多進程
  • 爲了數據隔離,若是咱們的程序中老是要用到數據共享,那麼就不適用使用多進程
  • 超過了cpu個數的任務數,都應該用進程池來解決問題,不能無限的開啓子進程

    2.9.3 進程池的其餘機制

對於開啓進程池還有其餘方法,方法以下

#直接用map函數,倆步搞定,哈哈哈
from multiprocessing import Pool
import os
import time

def func(i):
    print(i,os.getpid())
    time.sleep(0.2)

if __name__ == '__main__':
    p =Pool(4)#設定池子最大進程數,通常爲cpu+1/cpu數量
    p.map(func,range(20))
    # for i in range(20):
    #     p.apply_async(func,args=(i,))#異步的提交任務
    # p.close()#關閉池子,不是阻止進程池中已經有的進程,而是阻止再向池中異步的提交任務
    # p.join()#阻塞池子

2.9.4 進程池的返回值

2.9.4.1 普通函數實現進程的返回值

#普通函數方法
from multiprocessing import Pool

def func(i):
    print('爬取的網頁內容%s'%i)
    return len('網頁大小!')*i

if __name__ == '__main__':
    p = Pool(5)
    ret_l =[]
    for i in range(10):
        ret =p.apply_async(func,args =(i,))
        ret_l.append(ret)
    for ret in ret_l:
        print(ret.get())
    p.close()
    p.join()

2.9.4.2 map函數實現進程的返回值

#map函數方法
from multiprocessing import Pool

def func(i):
    print('爬取網頁的大小%s'%i)
    return len('網頁大小!')*i

if __name__ == '__main__':
    p = Pool(5)
    ret =p.map(func,range(10))
    for i in ret:
        print(i)

2.9.4.3 回調函數

模擬一個爬蟲爬網頁的狀況,

正常獲取網頁,一個一個獲取後進行統計

#正常版
from multiprocessing import Pool
import time
import random


def func(i):
    print('爬取的網頁的內容%s'%i)
    time.sleep(random.random())
    return len('網頁大小!')*i

if __name__ == '__main__':
    p = Pool(5)
    ret_l = []
    for i in range(10):
        ret =p.apply_async(func,args =(i,))
        ret_l.append(ret)
    for ret in ret_l:
        print(ret.get())
    p.close()
    p.join()

第二個版本

#採用回調函數的版本
from multiprocessing import Pool
import time
import random


def func(i):#子進程調用
    time.sleep(random.random())
    print('爬取的網頁的內容%s'%i)
    return len('網頁大小!')*i

def call_back(contact):#主進程調用
    print(contact)

if __name__ == '__main__':
    p = Pool(5)
    for i in range(20):
        ret =p.apply_async(func,args =(i,),callback=call_back)
    p.close()
    p.join()
#結果
爬取的網頁的內容2
10
爬取的網頁的內容3
15
爬取的網頁的內容1
5
爬取的網頁的內容0
0
爬取的網頁的內容6
30
爬取的網頁的內容4
20
爬取的網頁的內容5
25
爬取的網頁的內容7
35
爬取的網頁的內容9
45
爬取的網頁的內容8
40

將n個任務交給n個進程去執行

每個進程在執行完畢後會返回一個值,這個返回值會直接交給callback參數指定的那個函數去進行處理,這樣的話,全部的進程,哪個執行的最快,哪個就能夠最早進行統計或者其餘工做,能夠在最短期內獲取到結果!

相關文章
相關標籤/搜索