15.1 multiprocessingpython
multiprocessing是多進程模塊,多進程提供了任務併發性,能充分利用多核處理器。避免了GIL(全局解釋鎖)對資源的影響。windows
有如下經常使用類:多線程
類併發 |
描述app |
Process(group=None, target=None, name=None, args=(), kwargs={}) | 派生一個進程對象,而後調用start()方法啓動 |
Pool(processes=None, initializer=None, initargs=())運維 |
返回一個進程池對象,processes進程池進程數量 |
Pipe(duplex=True) | 返回兩個鏈接對象由管道鏈接 |
Queue(maxsize=0) | 返回隊列對象,操做方法跟Queue.Queue同樣 |
multiprocessing.dummy | 這個庫是用於實現多線程 |
Process()類有如下些方法:異步
run() | |
start() | 啓動進程對象 |
join([timeout]) | 等待子進程終止,才返回結果。可選超時。 |
name | 進程名字 |
is_alive() | 返回進程是否存活 |
daemon | 進程的守護標記,一個布爾值 |
pid | 返回進程ID |
exitcode | 子進程退出狀態碼 |
terminate() | 終止進程。在unix上使用SIGTERM信號,在windows上使用TerminateProcess()。 |
Pool()類有如下些方法:async
apply(func, args=(), kwds={}) | 等效內建函數apply() |
apply_async(func, args=(), kwds={}, callback=None) | 異步,等效內建函數apply() |
map(func, iterable, chunksize=None) | 等效內建函數map() |
map_async(func, iterable, chunksize=None, callback=None) | 異步,等效內建函數map() |
imap(func, iterable, chunksize=1) | 等效內建函數itertools.imap() |
imap_unordered(func, iterable, chunksize=1) | 像imap()方法,但結果順序是任意的 |
close() | 關閉進程池 |
terminate() | 終止工做進程,垃圾收集鏈接池對象 |
join() | 等待工做進程退出。必須先調用close()或terminate() |
Pool.apply_async()和Pool.map_aysnc()又提供瞭如下幾個方法:ide
get([timeout]) | 獲取結果對象裏的結果。若是超時沒有,則拋出TimeoutError異常 |
wait([timeout]) | 等待可用的結果或超時 |
ready() | 返回調用是否已經完成 |
successful() |
舉例:函數
1)簡單的例子,用子進程處理函數
from multiprocessing import Process import os def worker(name): print name print 'parent process id:', os.getppid() print 'process id:', os.getpid() if __name__ == '__main__': p = Process(target=worker, args=('function worker.',)) p.start() p.join() print p.name # python test.py function worker. parent process id: 9079 process id: 9080 Process-1
Process實例傳入worker函數做爲派生進程執行的任務,用start()方法啓動這個實例。
2)加以說明join()方法
from multiprocessing import Process import os def worker(n): print 'hello world', n if __name__ == '__main__': print 'parent process id:', os.getppid() for n in range(5): p = Process(target=worker, args=(n,)) p.start() p.join() print 'child process id:', p.pid print 'child process name:', p.name # python test.py parent process id: 9041 hello world 0 child process id: 9132 child process name: Process-1 hello world 1 child process id: 9133 child process name: Process-2 hello world 2 child process id: 9134 child process name: Process-3 hello world 3 child process id: 9135 child process name: Process-4 hello world 4 child process id: 9136 child process name: Process-5 # 把p.join()註釋掉再執行 # python test.py parent process id: 9041 child process id: 9125 child process name: Process-1 child process id: 9126 child process name: Process-2 child process id: 9127 child process name: Process-3 child process id: 9128 child process name: Process-4 hello world 0 hello world 1 hello world 3 hello world 2 child process id: 9129 child process name: Process-5 hello world 4
能夠看出,在使用join()方法時,輸出的結果都是順序排列的。相反是亂序的。所以join()方法是堵塞父進程,要等待當前子進程執行完後纔會繼續執行下一個子進程。不然會一直生成子進程去執行任務。
在要求輸出的狀況下使用join()可保證每一個結果是完整的。
3)給子進程命名,方便管理
from multiprocessing import Process import os, time def worker1(n): print 'hello world', n def worker2(): print 'worker2...' if __name__ == '__main__': print 'parent process id:', os.getppid() for n in range(3): p1 = Process(name='worker1', target=worker1, args=(n,)) p1.start() p1.join() print 'child process id:', p1.pid print 'child process name:', p1.name p2 = Process(name='worker2', target=worker2) p2.start() p2.join() print 'child process id:', p2.pid print 'child process name:', p2.name # python test.py parent process id: 9041 hello world 0 child process id: 9248 child process name: worker1 hello world 1 child process id: 9249 child process name: worker1 hello world 2 child process id: 9250 child process name: worker1 worker2... child process id: 9251 child process name: worker2
4)設置守護進程,父進程退出也不影響子進程運行
from multiprocessing import Process def worker1(n): print 'hello world', n def worker2(): print 'worker2...' if __name__ == '__main__': for n in range(3): p1 = Process(name='worker1', target=worker1, args=(n,)) p1.daemon = True p1.start() p1.join() p2 = Process(target=worker2) p2.daemon = False p2.start() p2.join()
5)使用進程池
#!/usr/bin/python # -*- coding: utf-8 -*- from multiprocessing import Pool, current_process import os, time, sys def worker(n): print 'hello world', n print 'process name:', current_process().name # 獲取當前進程名字 time.sleep(1) # 休眠用於執行時有時間查看當前執行的進程 if __name__ == '__main__': p = Pool(processes=3) for i in range(8): r = p.apply_async(worker, args=(i,)) r.get(timeout=5) # 獲取結果中的數據 p.close() # python test.py hello world 0 process name: PoolWorker-1 hello world 1 process name: PoolWorker-2 hello world 2 process name: PoolWorker-3 hello world 3 process name: PoolWorker-1 hello world 4 process name: PoolWorker-2 hello world 5 process name: PoolWorker-3 hello world 6 process name: PoolWorker-1 hello world 7 process name: PoolWorker-2
進程池生成了3個子進程,經過循環執行8次worker函數,進程池會從子進程1開始去處理任務,當到達最大進程時,會繼續從子進程1開始。
在運行此程序同時,再打開一個終端窗口會看到生成的子進程:
# ps -ef |grep python root 40244 9041 4 16:43 pts/3 00:00:00 python test.py root 40245 40244 0 16:43 pts/3 00:00:00 python test.py root 40246 40244 0 16:43 pts/3 00:00:00 python test.py root 40247 40244 0 16:43 pts/3 00:00:00 python test.py
6)進程池map()方法
map()方法是將序列中的元素經過函數處理返回新列表。
from multiprocessing import Pool def worker(url): return 'http://%s' % url urls = ['www.baidu.com', 'www.jd.com'] p = Pool(processes=2) r = p.map(worker, urls) p.close() print r # python test.py ['http://www.baidu.com', 'http://www.jd.com']
7)Queue進程間通訊
multiprocessing支持兩種類型進程間通訊:Queue和Pipe。
Queue庫已經封裝到multiprocessing庫中,在第十章 Python經常使用標準庫已經講解到Queue庫使用,有須要請查看之前博文。
例如:一個子進程向隊列寫數據,一個子進程讀取隊列數據
#!/usr/bin/python # -*- coding: utf-8 -*- from multiprocessing import Process, Queue # 寫數據到隊列 def write(q): for n in range(5): q.put(n) print 'Put %s to queue.' % n # 從隊列讀數據 def read(q): while True: if not q.empty(): value = q.get() print 'Get %s from queue.' % value else: break if __name__ == '__main__': q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pw.join() pr.start() pr.join() # python test.py Put 0 to queue. Put 1 to queue. Put 2 to queue. Put 3 to queue. Put 4 to queue. Get 0 from queue. Get 1 from queue. Get 2 from queue. Get 3 from queue. Get 4 from queue.
8)Pipe進程間通訊
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() p.join() # python test.py [42, None, 'hello']
Pipe()建立兩個鏈接對象,每一個連接對象都有send()和recv()方法,
9)進程間對象共享
Manager類返回一個管理對象,它控制服務端進程。提供一些共享方式:Value()、Array()、list()、dict()、Event()等
建立Manger對象存放資源,其餘進程經過訪問Manager獲取。
from multiprocessing import Process, Manager def f(v, a, l, d): v.value = 100 a[0] = 123 l.append('Hello') d['a'] = 1 mgr = Manager() v = mgr.Value('v', 0) a = mgr.Array('d', range(5)) l = mgr.list() d = mgr.dict() p = Process(target=f, args=(v, a, l, d)) p.start() p.join() print(v) print(a) print(l) print(d) # python test.py Value('v', 100) array('d', [123.0, 1.0, 2.0, 3.0, 4.0]) ['Hello'] {'a': 1}
10)寫一個多進程的例子
好比:多進程監控URL是否正常
from multiprocessing import Pool, current_process import urllib2 urls = [ 'http://www.baidu.com', 'http://www.jd.com', 'http://www.sina.com', 'http://www.163.com', ] def status_code(url): print 'process name:', current_process().name try: req = urllib2.urlopen(url, timeout=5) return req.getcode() except urllib2.URLError: return p = Pool(processes=4) for url in urls: r = p.apply_async(status_code, args=(url,)) if r.get(timeout=5) == 200: print "%s OK" %url else: print "%s NO" %url # python test.py process name: PoolWorker-1 http://www.baidu.com OK process name: PoolWorker-2 http://www.jd.com OK process name: PoolWorker-3 http://www.sina.com OK process name: PoolWorker-4 http://www.163.com OK
博客地址:http://lizhenliang.blog.51cto.com
QQ羣:323779636(Shell/Python運維開發羣)
15.2 threading
threading模塊相似於multiprocessing多進程模塊,使用方法也基本同樣。threading庫是對thread庫進行二次封裝,咱們主要用到Thread類,用Thread類派生線程對象。
1)使用Thread類實現多線程
from threading import Thread, current_thread def worker(n): print 'thread name:', current_thread().name print 'hello world', n for n in range(5): t = Thread(target=worker, args=(n, )) t.start() t.join() # 等待主進程結束 # python test.py thread name: Thread-1 hello world 0 thread name: Thread-2 hello world 1 thread name: Thread-3 hello world 2 thread name: Thread-4 hello world 3 thread name: Thread-5 hello world 4
2)還有一種方式繼承Thread類實現多線程,子類能夠重寫__init__和run()方法實現功能邏輯。
#!/usr/bin/python # -*- coding: utf-8 -*- from threading import Thread, current_thread class Test(Thread): # 重寫父類構造函數,那麼父類構造函數將不會執行 def __init__(self, n): Thread.__init__(self) self.n = n def run(self): print 'thread name:', current_thread().name print 'hello world', self.n if __name__ == '__main__': for n in range(5): t = Test(n) t.start() t.join() # python test.py thread name: Thread-1 hello world 0 thread name: Thread-2 hello world 1 thread name: Thread-3 hello world 2 thread name: Thread-4 hello world 3 thread name: Thread-5 hello world 4
3)Lock
from threading import Thread, Lock, current_thread lock = Lock() class Test(Thread): # 重寫父類構造函數,那麼父類構造函數將不會執行 def __init__(self, n): Thread.__init__(self) self.n = n def run(self): lock.acquire() # 獲取鎖 print 'thread name:', current_thread().name print 'hello world', self.n lock.release() # 釋放鎖 if __name__ == '__main__': for n in range(5): t = Test(n) t.start() t.join()
衆所周知,Python多線程有GIL全局鎖,意思是把每一個線程執行代碼時都上了鎖,執行完成後會自動釋放GIL鎖,意味着同一時間只有一個線程在運行代碼。因爲全部線程共享父進程內存、變量、資源,很容易多個線程對其操做,致使內容混亂。
當你在寫多線程程序的時候若是輸出結果是混亂的,這時你應該考慮到在不使用鎖的狀況下,多個線程運行時可能會修改原有的變量,致使輸出不同。
由此看來Python多線程是不能利用多核CPU提升處理性能,但在IO密集狀況下,仍是能提升必定的併發性能。也沒必要擔憂,多核CPU狀況能夠使用多進程實現多核任務。Python多進程是複製父進程資源,互不影響,有各自獨立的GIL鎖,保證數據不會混亂。能用多進程就用吧!