Python:進程

因爲GIL的存在,python一個進程同時只能執行一個線程。所以在python開發時,計算密集型的程序經常使用多進程,IO密集型的使用多線程python

1.多進程建立:

#建立方法1:將要執行的方法做爲參數傳給Process

from multiprocessing import Process

def f(name):
    print 'hello',name

if __name__ == '__main__':      #須要注意的是,多進程只能在main中執行
  p = Process(target=f,args=('pingy',))  #target=f指執行函數f,args=('pingy',)是指以元組方式傳入函數的參數
  p.start()   #執行進程
  p.join()    #父進程中止,等待子進程執行完

 

#建立方法2:從Process繼承,並重寫run()
from multiprocessing import Process

class MyProcess(Process):
    def run(self):
        print("MyProcess extended from Process")

if __name__ == '__main__':    #須要注意的是,多進程只能在main中執行
    p2=MyProcess()
    p2.start()

 實例方法:數據結構

run():  #默認的run()函數調用target的函數,你也能夠在子類中覆蓋該函數
start() : #啓動該進程
daemon(): #中止子進程,只執行父進程
join([timeout]) : #父進程被中止,直到子進程被執行完畢。當timeout爲None時沒有超時,不然有超時
is_alive(): #返回進程是否在運行。正在運行指啓動後、終止前
terminate(): #結束進程

 例:多線程

from multiprocessing import Process
from threading import Thread
import time
import os

def foo(n):
    time.sleep(2)
    print 'Number:',n
    print '子進程ID:',os.getpid(),'父進程ID:',os.getpid()

def main1():
    for i in range(2):
        foo(i)

def main2():
    for i in range(2):
        p = Process(target=foo,args=(i,))
        print p.name,'準備執行...'    #p.name爲進程名
        p.start()
        print p.pid,'開始執行...'     #在進程start前,進程號p.pid爲None
        p.join(1)    #join([timeout])        父進程被中止,直到子進程被執行完畢。


if __name__ == '__main__':
    print '主進程ID:',os.getpid()
    print '++++++++++++++++++++++++++++++++++++++++++'
    main1()
    print '------------------------------------------'
    main2()

輸出結果:app

主進程ID: 84792
++++++++++++++++++++++++++++++++++++++++++
Number: 0
子進程ID: 84792 父進程ID: 84792
Number: 1
子進程ID: 84792 父進程ID: 84792
------------------------------------------
Process-1 準備執行...
123316 開始執行...
Process-2 準備執行...
85716 開始執行...
Number: 0
子進程ID: 123316 父進程ID: 123316
Number: 1
子進程ID: 85716 父進程ID: 85716

 

 

設置daemon屬性:async

#不加daemon:

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"

 

執行結果:函數

end!
work start:Thu Oct 20 16:46:12 2016
work end:Thu Oct 20 16:46:15 2016
#加上daemon後:

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"

輸出結果:ui

end!

 

設置daemon執行完結束的方法:spa

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()   #
    print "end!"

輸出結果:線程

work start:Thu Oct 20 16:49:34 2016
work end:Thu Oct 20 16:49:37 2016 end!

 

將進程定義爲類:設計

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()  

輸出結果:

the time is Thu Oct 20 16:42:21 2016
the time is Thu Oct 20 16:42:24 2016
the time is Thu Oct 20 16:42:27 2016
the time is Thu Oct 20 16:42:30 2016
the time is Thu Oct 20 16:42:33 2016

 

多進程與多線程的區別:

from multiprocessing import Process
import threading
import time

li = []
def run(li1,n): li1.append(n) print li1 if __name__ == '__main__':for i in range(10): #建立多進程,每一個進程佔用單獨內存 p = Process(target=run,args=[li,i]) p.start() time.sleep(1) print '我是分割線'.center(50,'*') for i in range(10): #建立多線程,全部線程共享內存 t = threading.Thread(target=run,args=[li,i]) t.start()

執行結果:

[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
*****************我是分割線******************
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
 [0, 1, 2[, 03, , 14, , 25, ]3
, 4, 5]
 [0, 1, 2[, 03, , 14, , 25, , 36, , 47, ][5
0, , 61, , 72, , 83], 
4, 5, 6, [70, , 81, , 92], 
3, 4, 5, 6, 7, 8, 9]

 

2.多進程之間資源共享方法

(1)鎖:做用是當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突。它又分爲Lock和rLock,rLock中除了狀態locked和unlocked外還記錄了當前lock的owner和遞歸層數,使得RLock能夠被同一個線程屢次acquire()。若是使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣。

#鎖的使用方法:
import multiprocessing 
lock = multiprocessing.Lock() #Lock對象
lock.acquire(([timeout]))  #鎖定。timeout爲可選項,若是設定了timeout,則在超時後經過返回值能夠判斷是否獲得了鎖
lock.release()  #解鎖

rLock = multiprocessing.RLock()  #RLock對象
rLock.acquire(([timeout])) #鎖定。timeout爲可選項,若是設定了timeout,則在超時後經過返回值能夠判斷是否獲得了鎖
 rLock.release() #解鎖

 例:

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()
        
def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()
    
if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

輸出結果:

Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 (2)multiprocess.Queue:實現進/線程間的同步

  • FIFO(先進先出)隊列
  • LIFO(後進先出)隊列
  • PriorityQueue(優先級)隊列

注意:Queue.Queue是進程內非阻塞隊列,multiprocess.Queue是跨進程通訊隊列。多進程前者是各自私有,後者是各子進程共有。

實例方法:

Queue.qsize() 返回隊列的大小
Queue.empty() 若是隊列爲空,返回True,反之False
Queue.full() 若是隊列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取隊列,timeout等待時間
Queue.get_nowait() 至關Queue.get(False)
Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 至關Queue.put(item, False)
Queue.task_done() 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join() 實際上意味着等到隊列爲空,再執行別的操做

 

FIFO(先進先出)隊列:

import Queue

q = Queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()

輸出結果: 0
1 2 3 4

LIFO後進先出隊列:

import Queue

q = Queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()
輸出結果:
4 3 2 1 0

優先級隊列:

import Queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
        else:
            queueLock.release()
        time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# 建立新線程
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# 填充隊列
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# 等待隊列清空
while not workQueue.empty():
    pass

# 通知線程是時候退出
exitFlag = 1

# 等待全部線程完成
for t in threads:
    t.join()
print "Exiting Main Thread"

輸出結果:

Starting Thread-1
Starting Thread-2
Starting Thread-3
Thread-1 processing One
Thread-2 processing Two
Thread-3 processing Three
Thread-1 processing Four
Thread-2 processing Five
Exiting Thread-3
Exiting Thread-1
Exiting Thread-2
Exiting Main Thread

 

 multiprocess.Queue:實現進程間的同步:

例1:

from multiprocessing import Process,Queue             
def foo(q,n):
    q.put(n)

if __name__ == '__main__':
    que=Queue()
    for i in range(5):
        p=Process(target=foo,args=(que,i))
        p.start()
        p.join()

    print(que.qsize())

輸出結果:

5    

 例2:

import multiprocessing

def writer_proc(q):
    try:
        q.put(1, block = False)
    except:
        pass

def reader_proc(q):
    try:
        print q.get(block = False)
    except:
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))
    writer.start()

    reader = multiprocessing.Process(target=reader_proc, args=(q,))
    reader.start()

輸出結果:

1   

(3)multiprocessing.Value與multiprocessing.Array:進行數據共享

from multiprocessing import Process,Value,Array

def foo1(n,a):
    n.value = 3
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d',0.0)    #d的意思是小數.建立0.0
    arr = Array('i',range(10))   #i的意思是整數.建立一個0-9的整數
    p = Process(target=foo1,args=(num,arr))
    p.start()
    p.join()
    print num.value
    print arr[:]

輸出結果:

3.0
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

 

(4)multiprocessing.Manager:數據共享

from multiprocessing import Manager,Process

def f(d,l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
if __name__ == '__main__':
      manage = Manager()
      d = manage.dict()   #建立一個進程間可共享的dict
      l = manage.list(range(10))   #建立一個進程間可共享的list
      p = Process(target=f,args=(d,l))
      p.start()
      p.join()
      print d
      print l

輸出結果:

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

 

3.pool:若是須要多個子進程時能夠考慮使用進程池(pool)來管理

Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來它。

(1)使用進程池(非阻塞)

#使用進程池(非阻塞)
import time
import multiprocessing

def fun(msg):
    print 'MSG:',msg
    time.sleep(3)
    print 'end'
if __name__ == '__main__':
    pool =  multiprocessing.Pool(processes=3)
    for i in xrange(4):
        msg = 'hello,%d' %(i)
        pool.apply_async(fun,(msg,))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
    print '-----------------------'
    pool.close()
    pool.join()
    print 'Sub-processes done'

輸出結果:

-----------------------
MSG: hello,0
MSG: hello,1
MSG: hello,2
end
MSG: hello,3
end
end
end
Sub-processes done

 

(2)使用進程池(阻塞)

import time,multiprocessing

def fun(msg):
    print 'MSG:',msg
    time.sleep(3)
    print 'end'
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    for i in xrange(4):
        msg = 'hello,%d' %i
        pool.apply(fun,(msg,))
    print '-------------------------------'
    pool.close()
    pool.join()
    print 'Sub-processes done'

輸出結果:

MSG: hello,0
end
MSG: hello,1
end
MSG: hello,2
end
MSG: hello,3
end
-------------------------------
Sub-processes done

 

4.Pipe:用於具備親緣關係進程間的通訊,有名的管道克服了管道沒有名字的限制,所以,除具備管道所具備的功能外,它還容許無親緣關係進程間的通訊;

實現機制:

管道是由內核管理的一個緩衝區,至關於咱們放入內存中的一個紙條。管道的一端鏈接一個進程的輸出。這個進程會向管道中放入信息。管道的另外一端鏈接一個進程的輸入,這個進程取出被放入管道的信息。一個緩衝區不須要很大,它被設計成爲環形的數據結構,以便管道能夠被循環利用。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另外一端的進程放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另外一端的進程取出信息。當兩個進程都終結的時候,管道也自動消失。

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in range(10000):
            print 'send:%s' %i
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print 'proc2 recv:',pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print 'proc3 recv:',pipe.recv()
        time.sleep(1)

if __name__ == '__main__':
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1,args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
    p1.start()
    p2.start()
    #p3.start()
    p1.join()
    p2.join()
    #p3.join()

輸出結果:

send:0
proc2 recv: 0
send:1
proc2 recv: 1
send:2
proc2 recv: 2
send:3
proc2 recv: 3
send:4
proc2 recv: 4
send:5
proc2 recv: 5
send:6
proc2 recv: 6
相關文章
相關標籤/搜索