進程與multiprocessing模塊

一 進程 html

  進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。它是操做系統動態執行的基本單元,在傳統的操做系統中,進程既是基本的分配單元,也是基本的執行單元。  ————百度百科python

  PS   os模塊的getpid方法就是獲取當前進程的進程號(id)。json

  多道技術產生的背景:針對單核,實現併發。windows

  多路複用分爲時間上的複用和空間上的複用。緩存

  空間上的複用:將內存分爲幾個部分,互不干擾。安全

  時間上的複用:網絡

         1 遇到I/O阻塞時切換任務。併發

         2 任務執行固定時間後主動切換。app

 

壹:Process類,建立子進程dom

一 建立子進程

  建立子進程的方法一

import multiprocessing
import time import os def foo(): time.sleep(1) print('子進程 %s 父進程 %s' %(os.getpid(),os.getppid())) if __name__ == '__main__': #在windows下必須加上這一句代碼 p1=multiprocessing.Process(target=foo) p2=multiprocessing.Process(target=foo) p1.start() p2.start() p1.join() #主進程等待子進程完成,在執行主進程 p2.join() print('主進程 %s 主進程的父進程是 %s,這是pycharm的進程'%(os.getpid(),os.getppid()))

  輸出:

子進程 1260 父進程 12808
子進程 2256 父進程 12808 主進程 12808 主進程的父進程是 8804,這是pycharm的進程

  建立子進程的方法二

import multiprocessing
import os class Pro(multiprocessing.Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('進行姓名',self.name) print('子進程 %s 父進程 %s'%(os.getpid(),os.getppid())) if __name__ == '__main__': p=Pro('egon') p.start() print('主進程 %s' %os.getpid())

  輸出:

主進程 12632
進行姓名 egon
子進程 10300  父進程 12632

  

  建立子進程方法二的應用

import multiprocessing
import socket
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
class Myprocess(multiprocessing.Process):
    def __init__(self,conn):
        super().__init__()
        self.conn = conn
    def run(self):
        while True:
            try:
                while True:
                    date=self.conn.recv(1024)
                    if date==b'q':
                        break
                    self.conn.send(date.upper())
            except  Exception:
                break
if __name__ == '__main__':
    while True:
        conn,addr=server.accept()
        print('conn',conn,'addr',addr)
        s=Myprocess(conn)
        s.start()
server.close()

  在服務端,用multiprocessing模塊開啓多個子進程時,格式是這樣的:

  重要!

import multiprocessing
import socket
class Myprocess(multiprocessing.Process):
    def __init__(self,conn):
        self.conn=conn
    def run(self):
        pass           #核心代碼
if __name__ == '__main__':
    server=socket.socket()            #
    server.bind(('127.0.0.1',8080))   #
    server.listen()                   #這三行代碼固定不動
    while True:                      
        conn,addr=server.accept()     #其實一個子進程就是一個conn
        p=Myprocess(conn)             #服務端建立多個子進程本應該就是把conn當作參數傳給Myprocess
        p.start()                     #生成p對象,p.start()子進程開啓,conn有了一個屬於本身的子進程。

 

 

 

二 Process類的其餘方法

  1  join方法

  官方文檔的意思是:阻塞當前進程,直到調用join方法的那個進程執行完,再繼續執行當前進程。

import multiprocessing
class Myprocess(multiprocessing.Process):
    def __init__(self,x):
        super().__init__()
        self.x=x
    def run(self):
        print('子進程','----')
if __name__ == '__main__':
    p1=Myprocess(1)
    p1.start()
    print('主進程','====')

  輸出:

主進程 ====
子進程 ----

  在加入p1.join()代碼以後,p1子進程會先執行完,在執行主進程。

import multiprocessing
class Myprocess(multiprocessing.Process):
    def __init__(self,x):
        super().__init__()
        self.x=x
    def run(self):
        print('子進程','----')
if __name__ == '__main__':
    p1=Myprocess(1)
    p1.start()
    p1.join()
    print('主進程','====')

  輸出:

子進程 ----
主進程 ====

  2 daemon() 守護進程

  守護進程(daemon)是一類在後臺運行的特殊進程,用於執行特定的系統任務。不少守護進程在系統引導的時候啓動,而且一直運行直到系統關閉。

  守護進程會在主進程代碼執行完畢後終止。

  守護進程內沒法再開啓子進程,若是這樣作,會報錯。

  使用方法  

    p1.daemon=True,放在start()方法以前。

import multiprocessing
import time
class Myprocess(multiprocessing.Process):
    def __init__(self,x):
        super().__init__()
        self.x=x
    def run(self):
        print('子進程{}'.format(self.x),'----')
        time.sleep(2)
        print('子進程{}'.format(self.x),'=====')
if __name__ == '__main__':
    p1=Myprocess(1)
    p2=Myprocess(2)
    p1.daemon=True      #p1是守護進程,主進程代碼執行完畢後,立馬結束。
    p1.start()
    p2.start()
    time.sleep(1)       #一秒鐘,足骨歐p1,p2開啓子進程
    print('主進程','====')

  輸出:

子進程1 ----
子進程2 ----
主進程 ====
子進程2 =====           #由於p1是守護進程,主進程代碼執行完畢後,就立馬結束了。因此沒有打印‘子進程1 ===’

 

貳:  Lock類,建立互斥鎖。只能一次acquire,而後release才能使用。

   Rlock類,建立遞歸所。解決死鎖問題。遞歸全部個引用計數,能夠屢次acquire,release。

  同步可以保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖爲資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態爲「鎖定」,其餘線程不能更改;直到該釋放資源,將資源的狀態變成「非鎖定」,其餘的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個進行寫入操做,從而保證了多狀況下數據的正確性。

  與join方法相似,做用是併發變爲串行。

  應用:

  搶票的過程,分爲查票的餘量和買票。查票的餘量應該是併發,買票應該是串行。

import json
import time
import random
import multiprocessing
def search():
    date=json.load(open('db.txt','r'))
    print('票數:{}'.format(date['count']))
def get(i):
    date = json.load(open('db.txt', 'r'))
    if date['count']>0:
        date['count']-=1
        time.sleep(random.randint(1,3))      #模擬網絡延遲
        json.dump(date,open('db.txt','w'))
        print('{} 搶票成功!'.format(i))

def rob_ticket(i,lock):         #其實能夠沒有get,search函數。徹底能夠合併在一塊兒。可是,分爲兩個小函數,邏輯很是清晰。加鎖也變得更加容易,不出錯。
    search()
    lock.acquire()  #加鎖                       #with lock:  和文件操做同樣,也能夠簡化。
    get(i)                                      #   get(i)
    lock.release()  #釋放鎖,解鎖


if __name__ == '__main__':
    lock = multiprocessing.Lock()
    print('lock',lock)         #主進程建立了一個互斥鎖,做爲參數傳給子進程。
    for i in range(1,21):      #建立20個子進程
        p=multiprocessing.Process(target=rob_ticket,args=(i,lock))
        p.start()

  輸出:

lock <Lock(owner=None)>
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
票數:2
1 搶票成功!
3 搶票成功!

  總結:互斥鎖限定了在同一時刻只有一個進程可以對共享資源進行修改。弊端是涉及到文件的修改,文件在硬盤上,效率不可避免的會很低。

     並且還須要本身進行加鎖解鎖處理。因此,若是能夠,儘可能尋求更好的方法。IPC機制即是答案。

 

 

叄:進程間通訊(IPC,Inter-Process Communication)指至少兩個進程或線程間傳送數據或信號的一些技術或方法。

python中提供了隊列(Queue)和管道(Pipe)兩種方法。

  1 隊列和管道都是將數據存放在內存中,比起硬盤,速度會快不少。

  2 隊列是基於 管道+鎖 實現的,能夠幫咱們從加鎖的繁瑣代碼中解脫出來。推薦使用

 

肆 Queue類。隊列。

  Queue([maxsize]), 建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。

  put方法  put_mowait()方法

  get方法  get_nowait()方法  

import multiprocessing

q=multiprocessing.Queue(3)
print(q,)
q.put('1')
q.put('2')
q.put('3')
print(q.get())
print(q.get())
print(q.get())

  輸出:

<multiprocessing.queues.Queue object at 0x000002132885A048>
1
2
3

 

伍  JoinableQueue類

  q=JoinableQueue()

  提供了Queue類兩個沒有的方法。

    join():阻塞,直到隊列q中沒有item。  

    task_done():必須跟在get()方法後面。

    

  from multiprocessing import JoinableQueue
  q = JoinableQueue()
  q.task_done() # Signal task completion
  q.join() # Wait for completion。
  
  

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        time.sleep(random.randint(1,2))
        res=q.get()
        if res is None:break
        print('%s 吃了 %s'%(name,res))
        q.task_done()           #一個get()跟着一個task_done()       q.task_done()是放在消費者模型這邊的。
def produce(name,q):
    for i in range(10):
        time.sleep(random.randint(1,2))
        res='包子%s'%i
        q.put(res)
        print('%s 生產了 %s '%(name,res))
    q.join()                   #若是註釋掉,最後顯示的時間是15秒左右,由於p1,p2代碼執行完後,無論隊列q中
if __name__ == '__main__':     #有沒有item,p1,p2的完成表明着主進程的完成,c又是守護進程。c儘管沒有消費完全部數據,也會終結。  q.join()是放在生產者模型這邊的。
    start_time=time.time()     #加上join,即是阻塞狀態,知道q隊列中的item被c進程所有完,這樣主進程代碼執行完畢。c做爲守護進程,也會隨之終結。用時大約30秒
    q=JoinableQueue()
    p1=Process(target=produce,args=('egon',q))
    p2=Process(target=produce,args=('wupeoqi',q))
    c=Process(target=consumer,args=('alex',q))
    c.daemon=True
    c.start()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(time.time()-start_time)

 

陸 Manager() 

  Python中進程間共享數據,處理基本的queue,pipe和value+array外,還提供了更高層次的封裝。使用multiprocessing.Manager能夠簡單地使用這些高級接口。 

  Manager()返回的manager對象控制了一個server進程,此進程包含的python對象能夠被其餘的進程經過proxies來訪問。從而達到多進程間數據通訊且安全。

  Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。 

  Manager()也是建立的內存中的空間。

import time
def foo(d,lock):
    with lock:
        temp=d['x']
        time.sleep(0.001)
        d['x']=temp-1

from multiprocessing import Manager,Process,Lock
if __name__ == '__main__':
    m=Manager()
    lock=Lock()
    d=m.dict({'x':10})
    l=[]
    for i in range(10):
        p=Process(target=foo,args=(d,lock))
        l.append(p)
        p.start()
    for p in l:
        p.join()
    print(d)

 

柒 Pool()         !!!!!http://www.cnblogs.com/Tour/p/4564710.html  !!!很好的博客地址

   在使用Python進行系統管理時,特別是同時操做多個文件目錄或者遠程控制多臺主機,並行操做能夠節約大量的時間。若是操做的對象數目不大時,還能夠直接使用Process類動態的生成多個進程,十幾個還好,可是若是上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。 
Pool類能夠提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,若是池尚未滿,就會建立一個新的進程來執行請求。若是池滿,請求就會告知先等待,直到池中有進程結束,纔會建立新的進程來執行這些請求。 
下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法。

  apply():

    該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束,不建議使用。同步調用

  apply_async():

    apply_async(func[, args=()[, kwds={}[, callback=None]]])

    非阻塞且支持結果返回進行回調

  

    首先來看apply_async方法,源碼以下:

複製代碼
def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result
func表示執行此任務的方法
args、kwds分別表func的位置參數和關鍵字參數
callback表示一個單參數的方法,當有結果返回時,callback方法會被調用,參數即爲任務執行後的結果
複製代碼

    每調用一次apply_result方法,實際上就向_taskqueue中添加了一條任務,注意這裏採用了非阻塞(異步)的調用方式,即apply_async方法中新建的任務只是被添加到任務隊列中,還並未執行,不須要等待,直接返回建立的ApplyResult對象,注意在建立ApplyResult對象時,將它放入進程池的緩存_cache中。

    任務隊列中有了新建立的任務,那麼根據上節分析的處理流程,進程池的_task_handler線程,將任務從taskqueue中獲取出來,放入_inqueue中,觸發worker進程根據args和kwds調用func,運行結束後,將結果放入_outqueue,再由進程池中的_handle_results線程,將運行結果從_outqueue中取出,並找到_cache緩存中的ApplyResult對象,_set其運行結果,等待調用端獲取。

  close():

    關閉進程池,使其再也不接收新的任務。

  join():

    主進程阻塞等待子進程的退出,join方法必須用在close()方法以後,二者搭配使用。

  PS 回調函數:

    回調函數就是一個經過函數指針調用的函數。若是你把函數的指針(地址)做爲參數傳遞給另外一個函數,當這個指針被用來調用其所指向的函數時,咱們就說這是回調函數。回調函數不是由該函數的實現方直接調用,而是在特定的事件或條件發生時由另外的一方調用的,用於對該事件或條件進行響應。                ——百度百科

    你到一個商店買東西,恰好你要的東西沒有貨,因而你在店員那裏留下了你的電話,過了幾天店裏有貨了,店員就打了你的電話,而後你接到電話後就到店裏去取了貨。在這個例子裏,你的電話號碼就叫回調函數,你把電話留給店員就叫登記回調函數,店裏後來有貨了叫作觸發了回調關聯的事件,店員給你打電話叫作調用回調函數,你到店裏去取貨叫作響應回調事件。  ——知乎回答
    

    callback函數是一個以參數形式傳遞給另外一個函數的函數,而且該函數(指callback函數)必須等另外一個函數執行完纔會被調用!(當被調用時,另外一個函數就是callback函數的父函數)。

    理解起來可能有點繞,通俗點的例子:

    函數a有一個參數,這個參數是個函數b,當函數a執行完之後執行函數b。那麼這個過程就叫回調。

    這裏必須強調的一點:函數b是你以參數形式傳給函數a的,那麼函數b被調用時就叫回調函數。


  PS 同步與異步   

    同步和異步關注的是消息通訊機制 (synchronous communication/ asynchronous communication)
  所謂同步,就是在發出一個*調用*時,在沒有獲得結果以前,該*調用*就不返回。可是一旦調用返回,就獲得返回值了。
  換句話說,就是由*調用者*主動等待這個*調用*的結果。

    而異步則是相反,*調用*在發出以後,這個調用就直接返回了,因此沒有返回結果。換句話說,當一個異步過程調用發出後,調用者不會馬上獲得結果。而是在*調用*發出後,*被調用者*經過狀態、通知來通知調用者,或經過回調函數處理這個調用。

    同步I/O操做:致使請求進程阻塞,直到I/O操做完成;

    異步I/O操做:不致使請求進程阻塞。

   
from multiprocessing import Process,Pool
import time,os
def foo(n):
    print(n)
    time.sleep(5)
    print('%s is working '%os.getpid())
    return  n**2

if __name__ == '__main__':
    p=Pool(5)
    objs=[]
    for i in range(10):
        obj=p.apply_async(foo,args=(i,))
        objs.append(obj)
    p.close()
    p.join()
    print(objs)
    for obj in objs:
        print(obj.get())

  輸出: 

0
1
2
3
4
14344 is working 
5
15624 is working 
6
5312 is working 
7
16000 is working 
8
11868 is working 
9
14344 is working 
15624 is working 
5312 is working 
16000 is working 
11868 is working 
[<multiprocessing.pool.ApplyResult object at 0x000001B9953AF860>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF908>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF9B0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFA90>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFB70>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFC50>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFD30>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFE10>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFEF0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFFD0>]
0
1
4
9
16
25
36
49
64
81

  注意:看obj是什麼,用get()方法取其值。

 

  Pool類的異步以及回調函數。回調函數能夠用在爬蟲上。

import requests,os
from multiprocessing import Pool,Process
def get(url):
    r=requests.get(url)
    print('進程%s get %s'%(os.getpid(),url))
    return {'url':url,'text':len(r.text)}
def search(dic):
    with open('db.txt','a')as f:             # a 模式 也能夠建立不存在的文件名
        date='url: %s lenth: %s\n'%(dic['url'],dic['text'])
        f.write(date)
if __name__ == '__main__':
    p=Pool(3)
    l=[]
    url_l=['http://cn.bing.com/','http://www.cnblogs.com/wupeiqi/','http://www.cnblogs.com/654321cc/',
           'https://www.cnblogs.com/','http://society.people.com.cn/n1/2017/1012/c1008-29581930.html',
           'http://www.xilu.com/news/shaonianxinzangyou5gedong.html',]
    for url in url_l:
        obj=p.apply_async(get,(url,),callback=search)   #在這裏,apply_async,建立了進程。search是回調函數,有且惟一參數是get函數的返回值,
        l.append(obj)                                   #obj一直是ApplyResult object
    p.close()
    p.join()
    print(l)
    for obj in l:
        print(obj.get())                                #obj.get()一直是get()函數的返回值,無論有沒有回調函數。

  輸出:

進程14044 get http://www.cnblogs.com/wupeiqi/
進程13000 get http://www.cnblogs.com/654321cc/
進程15244 get http://cn.bing.com/
進程15244 get http://www.xilu.com/news/shaonianxinzangyou5gedong.html
進程14044 get https://www.cnblogs.com/
進程13000 get http://society.people.com.cn/n1/2017/1012/c1008-29581930.html
[<multiprocessing.pool.ApplyResult object at 0x0000027D4C893BE0>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893C88>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893D30>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893DD8>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893E80>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893F60>]
{'url': 'http://cn.bing.com/', 'text': 127210}
{'url': 'http://www.cnblogs.com/wupeiqi/', 'text': 21292}
{'url': 'http://www.cnblogs.com/654321cc/', 'text': 13268}
{'url': 'https://www.cnblogs.com/', 'text': 40331}
{'url': 'http://society.people.com.cn/n1/2017/1012/c1008-29581930.html', 'text': 23641}
{'url': 'http://www.xilu.com/news/shaonianxinzangyou5gedong.html', 'text': 51247}
相關文章
相關標籤/搜索