多線程多進程
多進程
概念
進程是程序在計算機上的一次執行活動。當你運行一個程序,你就啓動了一個進程。顯然,程序是死的(靜態的),進程是活的(動態的)。進程能夠分爲系統進程和用戶進程。凡是用於完成操做系統的各類功能的進程就是系統進程,它們就是處於運行狀態下的操做系統自己;用戶進程就沒必要我多講了吧,全部由你啓動的進程都是用戶進程。進程是操做系統進行資源分配的單位。
它的思想簡單介紹以下:在操做系統的管理下,全部正在運行的進程輪流使用CPU,每一個進程容許佔用CPU的時間很是短(好比10毫秒),這樣用戶根本感受不出來CPU是在輪流爲多個進程服務,就好象全部的進程都在不間斷地運行同樣。但實際上在任何一個時間內有且僅有一個進程佔有CPU。python
多進程和多線程的區別: 多線程使用的是cpu的一個核,適合io密集型 多進程使用的是cpu的多個核,適合運算密集型 組件 Python提供了很是好用的多進程包,multiprocessing,咱們在使用的時候,只須要導入該模塊就能夠了。 Multiprocessing支持子進程,通訊,共享數據,執行不一樣形式的同步,提供了Process,Pipe, Lock等組件bootstrap
Process 1. 建立一個Process對象 p = multiprocessing.Process(target=worker_1, args=(2, )) target = 函數名字 args = 函數須要的參數,以tuple的形式傳入 注意: 單個元素的tuple的表現形式 multprocessing用到的兩個方法 cpu_count() 統計cpu總數 active_children() 得到全部子進程數組
Process 1. Process的經常使用方法 is_alive() 判斷進程是否存活 run() 啓動進程 start() 啓動進程,會自動調用run方法,這個經常使用 join(timeout) 等待進程結束或者直到超時 2. Process的經常使用屬性 name 進程名字 pid 進程的pid多線程
代碼例子:app
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # @Time : 2017/10/20 21:32# 4 # @File : demon1.py 5 import multiprocessing 6 import time 7 def work(interval, method): 8 print("start work_" + method) 9 time.sleep(interval) 10 print("end work_" + method) 11 if __name__ == "__main__": 12 p1 = multiprocessing.Process(target=work, args=(1, "1")) 13 p2 = multiprocessing.Process(target=work, args=(2, "2")) 14 p3 = multiprocessing.Process(target=work, args=(3, "3")) 15 p1.start() 16 p2.start() 17 p3.start() 18 print("The number of CPU is:") + str(multiprocessing.cpu_count()) 19 for p in multiprocessing.active_children(): 20 print("The name of active child is: " + p.name + ", pid is: " + str(p.pid) + "is alive: " + str(p.is_alive())) 21 print("MAIN IS END!")
結果:dom
C:\Python27\python.exe E:/python/12process/demon1.py
The number of CPU is:4
The name of active child is: Process-3, pid is9788is alive: True
The name of active child is: Process-1, pid is6936is alive: True
The name of active child is: Process-2, pid is3052is alive: True
MAIN IS END!
start work_2
start work_1
start work_3
end work_1
end work_2
end work_3async
Lock組件函數
當咱們用多進程來讀寫文件的時候,若是一個進程是寫文件,一個進程是讀文件,若是兩個文件同時進行,確定是不行的,必須是文件寫結束之後,才能夠進行讀操做。或者是多個進程在共享一些資源的時候,同時只能有一個進程進行訪問,那就要有一個鎖機制進行控制。學習
需求:ui
一個進程寫入一個文件,一個進程追加文件,一個進程讀文件,同時啓動起來 咱們能夠經過進程的join()方法來實現,可是爲了學習Lock,
用Lock來實現。 先看不加鎖程序,在看加鎖程序,最後比較兩個程序的區別
代碼例子:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import multiprocessing 4 import time 5 def add1(lock, value, number): 6 with lock: 7 print("start add1 number = {0}".format(number)) 8 for i in xrange(1, 5): 9 number += value 10 time.sleep(0.3) 11 print("number = {0}".format(number)) 12 def add3(lock, value, number): 13 lock.acquire() 14 print("start add3 number = {0}".format(number)) 15 try: 16 for i in xrange(1, 5): 17 number += value 18 time.sleep(0.3) 19 print("number = {0}".format(number)) 20 finally: 21 lock.release() 22 if __name__ == "__main__": 23 lock = multiprocessing.Lock() 24 number = 0 25 pw = multiprocessing.Process(target=add1, args=(lock, 1, number)) 26 pa = multiprocessing.Process(target=add3, args=(lock, 3, number)) 27 pw.start() 28 pa.start() 29 print("main process end.")
結果:
main process end.
start add1 number = 0
number = 1
number = 2
number = 3
number = 4
start add3 number = 0
number = 3
number = 6
number = 9
number = 12
共享內存
python的multiprocessing模塊也給咱們提供了共享內存的操做
通常的變量在進程之間是無法進行通信的,multiprocessing給咱們提供了Value和Array模塊,他們能夠在不通的進程中共同使用
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Value, Array 4 def f(n, a,m): 5 n.value = 3.1415927 6 m = 20 7 for i in range(len(a)): 8 a[i] = -a[i] 9 print(m) 10 if __name__ == '__main__': 11 num = Value('d', 0.0) 12 arr = Array('i', range(10)) 13 m = 10 14 p = Process(target=f, args=(num, arr, m)) 15 p.start() 16 p.join() 17 18 print(num.value) 19 print(arr[:]) 20 print(m)
結果:
20
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
10
強大的Manage
以上實現的數據共享的方式只有兩種結構Value和Array。Python中提供了強大的Manage專門用來作數據共享的,其支持的類型很是多,包括,Value, Array,list,dict, Queue, Lock等。
下面看個例子:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 from multiprocessing import Process, Manager 5 def func(dt, lt): 6 for i in range(10): 7 key = 'arg' + str(i) 8 dt[key] = i * i 9 lt += range(11, 16) 10 if __name__ == "__main__": 11 manager = Manager() 12 dt = manager.dict() 13 lt = manager.list() 14 15 p = Process(target=func, args=(dt, lt)) 16 p.start() 17 p.join() 18 print(dt) 19 print(lt)
結果:
{'arg8': 64, 'arg9': 81, 'arg0': 0, 'arg1': 1, 'arg2': 4, 'arg3': 9, 'arg4': 16, 'arg5': 25, 'arg6': 36, 'arg7': 49}
[11, 12, 13, 14, 15]
進程池
Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程。
下面咱們先來看一個進程池非阻塞的例子:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import multiprocessing 4 import time 5 def fun(msg): 6 print("######start###### {0}".format(msg)) 7 time.sleep(3) 8 print("######end###### {0}".format(msg)) 9 if __name__ == "__main__": 10 pool = multiprocessing.Pool(processes=3) 11 for i in xrange(1, 6): 12 msg = "hello {0}".format(i) 13 pool.apply_async(fun, (msg,)) 14 15 print("##########start main#########") 16 pool.close() 17 pool.join() # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 18 19 print("##########end main#########")
結果:
##########start main#########
######start###### hello 1
######start###### hello 2
######start###### hello 3
######end###### hello 2
######end###### hello 1
######start###### hello 4
######start###### hello 5
######end###### hello 3
######end###### hello 5
######end###### hello 4
##########end main#########
進程池
阻塞和非阻塞的區別:
Pool.apply_async 非阻塞,定義的進程池進程最大數能夠同時執行。
Pool.apply 一個進程結束,釋放回進程池,下一個進程才能夠開始
Python中提供了threading模塊來對多線程的操做,
1. 多線程實例
線程是應用程序中工做的最小單元。
多線程是現實有兩種方式:
方法一:將要執行的方法做爲參數傳給Thread的構造方法(和多進程相似)
t = threading.Thread(target=action, args=(i,))
方法二:從Thread繼承,並重寫run()
看源碼:
P = threading.Thread
p.start() _start_new_thread(self.__bootstrap, ()) self.__bootstrap_inner()
self.run()
try:
if self.__target:
self.__target(*self.__args, **self.__kwargs)
因此若是重寫了run,就直接調用run的函數了,若是run沒有從新,就調用target函數。
2. 線程鎖
經過threading.Lock()來建立鎖,函數在執行的只有先要得到鎖,左後執行完之後要釋放鎖:
要釋放鎖:
with lock:
lock.acquire()
lock.release()
實例:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 def worker(name, lock): 6 with lock: 7 print("start {0}".format(name)) 8 time.sleep(5) 9 print("end {0}".format(name)) 10 11 if __name__ == "__main__": 12 lock = threading.Lock() 13 t1 = threading.Thread(target=worker, args=("worker1", lock)) 14 t2 = threading.Thread(target=worker, args=("worker2", lock)) 15 t1.start() 16 t2.start()
結果:
start worker1
end worker1
start worker2
end worker2
3. 線程共享變量
線程和多進程不一樣之處在於多線程自己就是能夠和父進程共享內存的,這也是爲何其中一個線程掛掉之後,爲何其餘線程也會死掉的道理。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import threading 5 6 l = list() 7 l += range(1, 10) 8 9 def worker(): 10 l.append("ling") 11 l.append("shang") 12 l.append("hello") 13 if __name__ == "__main__": 14 t = threading.Thread(target=worker) 15 t.start() 16 print(l)
結果:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 'ling', 'shang', 'hello']
4. 線程池
經過傳入一個參數組來實現多線程,而且它的多線程是有序的,順序與參數組中的參數順序保持一致。
安裝包:
pip install threadpool
調用格式:
from threadpool import *
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # @File : test14.py 4 import threadpool 5 def hello(m, n, o): 6 """""" 7 print "m = %s, n = %s, o = %s" % (m, n, o) 8 9 if __name__ == '__main__': 10 # 方法1 11 lst_vars_1 = ['1', '2', '3'] 12 lst_vars_2 = ['4', '5', '6'] 13 func_var = [(lst_vars_1, None), (lst_vars_2, None)] 14 # 方法2 15 dict_vars_1 = {'m': '1', 'n': '2', 'o': '3'} 16 dict_vars_2 = {'m': '4', 'n': '5', 'o': '6'} 17 func_var = [(None, dict_vars_1), (None, dict_vars_2)] 18 19 pool = threadpool.ThreadPool(2) 20 requests = threadpool.makeRequests(hello, func_var) 21 [pool.putRequest(req) for req in requests] 22 pool.wait()
1 #!/usr/bin/env python# 2 # -*- coding:utf-8 -*- 3 # # @File : demon6.py#!/usr/bin/env python 4 from multiprocessing import Process, Queue 5 import os, time, random 6 # 寫數據進程執行的代碼: 7 def write(q): 8 print('Process to write: %s' % os.getpid()) 9 for value in ['A', 'B', 'C']: 10 print('Put %s to queue...' % value) 11 q.put(value) 12 time.sleep(random.random()) 13 #讀數據進程執行的代碼: 14 def read(q): 15 print('Process to read: %s' % os.getpid()) 16 while True: 17 value = q.get(True) 18 print('Get %s from queue.' % value) 19 if __name__=='__main__': 20 # 父進程建立Queue,並傳給各個子進程: 21 q = Queue() 22 pw = Process(target=write, args=(q,)) 23 pr = Process(target=read, args=(q,)) 24 # 啓動子進程pw,寫入: 25 pw.start() 26 # 啓動子進程pr,讀取: 27 pr.start() 28 #等待pw結束: 29 pw.join() 30 # pr進程裏是死循環,沒法等待其結束,只能強行終止: 31 pr.terminate()
消息隊列
Python提供了Queue模塊來專門實現消息隊列
Queue對象
Queue對象實現一個fifo隊列(其餘的還有lifo、priority隊列,這裏再也不介紹)。queue只有maxsize一個構造參數,用來指定隊列容量,指定爲0的時候表明容量無限。主要有如下成員函數:
Queue.qsize():返回消息隊列的當前空間。返回的值不必定可靠。
Queue.empty():判斷消息隊列是否爲空,返回True或False。一樣不可靠。
Queue.full():相似上邊,判斷消息隊列是否滿
Queue.put(item, block=True, timeout=None):往消息隊列中存放消息。block能夠控制是否阻塞,timeout指定阻塞時候的等待時間。若是不阻塞或者超時,會引發一個full exception。
Queue.put_nowait(item):至關於put(item, False).
Queue.get(block=True, timeout=None):獲取一個消息,其餘同put。
如下兩個函數用來判斷消息對應的任務是否完成。
Queue.task_done():接受消息的線程經過調用這個函數來講明消息對應的任務已完成。
Queue.join(): 實際上意味着等到隊列爲空,再執行別的操做