一、基本概念html
二、多線程內容方法python
三、多進程內容方法編程
線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,安全
一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。A thread is an execution context, which is all the information a CPU needs to多線程
execute a stream of instructions.(一個線程是一個execution context(執行上下文),即一個cpu執行時所須要的一串指令。)併發
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped.One way toapp
achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.異步
If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back,socket
and resume it from where you were.async
Threads work in the same way. A CPU is giving you the illusion(幻覺) that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can
do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.
一個程序的執行實例就是一個進程。每個進程提供執行程序所需的全部資源。(進程本質上是資源的集合)
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique
process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread,
often called the primary thread, but can create additional threads from any of its threads.
Python的標準庫提供了兩個模塊:_thread和threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數狀況下,咱們只須要使用threading這個高級模塊。
Python中使用線程有兩種方式:函數或者用類來包裝線程對象。
啓動一個線程就是把一個函數傳入並建立Thread
實例,語法以下:
t = threading.Thread(target=function, args=(kwargs,))
參數說明:
- t - 賦值變量
- function - 線程函數。
- args - 傳遞給線程函數的參數,他必須是個tuple類型。
- kwargs - 可選參數。
而後調用start()開始執行:
import threading import time def task(n): print('task---%s' % n) time.sleep(2) print('End') t1 = threading.Thread(target=task, args=('t1',)) t2 = threading.Thread(target=task, args=('t2',)) t1.start() t2.start()
執行結果以下:
task---t1 task---t2 End End
threading 模塊提供的一些方法:
除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法:
方法 | 註釋 |
run() | 線程被cpu調度後自動執行線程對象的run方法,若是想自定義線程類,直接重寫run方法就好了 |
start() | 啓動線程活動 |
join() | 逐個執行每一個線程,執行完畢後繼續往下執行 |
isAlive() | 返回線程是否活動的 |
getName() | 返回線程名 |
setName() | 設置線程名 |
setDaemon(True) | 設置爲守護線程 |
使用Threading模塊建立線程,直接從threading.Thread繼承,而後重寫__init__方法和run方法:
import threading import time class MyThreading(threading.Thread): #繼承父類threading.Thread def __init__(self, n): super(MyThreading, self).__init__() self.n = n def run(self): #重構run()方法 print('running task', self.n) time.sleep(1) print('End') if __name__ == '__main__': t1 = MyThreading('t1') t2 = MyThreading('t2') t1.start() #啓動線程後,會自動執行run()方法 t2.start() print('The %s has finished' % threading.current_thread) #打印當前線程變量
執行結果以下:
running task t1 running task t2 The <_MainThread(MainThread, started 13204)> has finished End End
補充:
import threading import time class MyThreading(threading.Thread): def __init__(self, n): super(MyThreading, self).__init__() self.n = n def run(self): print('running task', self.n) time.sleep(1) print('End') if __name__ == '__main__': t1 = MyThreading('t1') t2 = MyThreading('t2') t1.start() t2.start() t1.join() #主線程等待t1的線程完成再往下執行 t2.join() print('The %s has finished' % threading.current_thread()) # 執行結果以下(可對比上一個程序的結果,join()後主線程等待子線程完成再打印): running task t1 running task t2 End End The <_MainThread(MainThread, started 11868)> has finished
下面經過一個記錄線程的例子,進一步瞭解join()方法和threading.current_thread()、threading.active_count()方法:
import threading import time def task(n): print('task---%s' % n) # 打印子線程 time.sleep(1) print('task %s done,the number of active threads is:%s' % (threading.current_thread(), threading.active_count())) # 打印當前線程變量和活躍線程個數 start_time = time.time() # 記錄開始時刻 t_obj = [] # 定義列表存放線程實例 for i in range(5): # 設置5個線程實例 t = threading.Thread(target=task, args=('%s' % i,)) t.start() t_obj.append(t) # 將全部線程存放在列表中 for t in t_obj: # 爲全部子線程添加join方法,使主線程等待全部線程完成 t.join() stop_time = time.time() # 記錄結束時刻 print('------all threads has finished------', threading.current_thread(), threading.active_count()) # 打印主線程和線程個數 print('const:', stop_time - start_time) # 打印總耗時
執行結果以下:
task---0 task---1 task---2 task---3 task---4 task <Thread(Thread-4, started daemon 11868)> done,the number of active threads is:6 task <Thread(Thread-2, started daemon 7596)> done,the number of active threads is:5 task <Thread(Thread-3, started daemon 8488)> done,the number of active threads is:4 task <Thread(Thread-5, started daemon 9648)> done,the number of active threads is:3 task <Thread(Thread-1, started daemon 15396)> done,the number of active threads is:2 ------all threads has finished------ <_MainThread(MainThread, started 15196)> 1 const: 1.0155258178710938
注意到這裏的threading.active_count()方法返回的:線程數量=子線程的數量 + 主線程數量(1)
來看一個簡單的守護線程的例子,這裏使用setDaemon(True)把全部的子線程都變成了主線程的守護線程,所以當主線程結束後,子線程也會隨之結束。因此當主線程結束後,整個程序就退出了。
import threading import time def task(n): print('task-%s starts' % n) time.sleep(2) print('task-%s end' % n) for i in range(3): t = threading.Thread(target=task, args=('%s' % i,)) t.setDaemon(True) # 應在線程啓動前設置守護線程 t.start() print('The main thread end')
執行結果以下:
task-0 starts task-1 starts task-2 starts The main thread end
對比不設置守護進程:
import threading import time def task(n): print('task-%s starts' % n) time.sleep(2) print('task-%s end' % n) for i in range(3): t = threading.Thread(target=task, args=('%s' % i,)) t.start() print('The main thread end') # 執行結果以下: task-0 starts task-1 starts task-2 starts The main thread end task-0 end task-1 end task-2 end
因此,將子線程設置守護線程後,主線程結束,殺死未執行完的子線程,程序退出;未設置守護線程,主線程結束,可是並無殺死子線程,子線程依然能夠繼續執行,直到子線程所有結束,程序退出。
關於更多join()和守護線程可移步:python多線程中join()的理解
在非python環境中,單核狀況下,同時只能有一個任務執行。多核時能夠支持多個線程同時執行。可是在python中,不管有多少核,同時只能執行一個線程。
究其緣由,這就是因爲GIL的存在致使的。
GIL的全稱是Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,爲了數據安全所作的決定。某個線程想要執行,必須先拿到GIL,咱們
能夠把GIL看做是「通行證」,而且在一個python進程中,GIL只有一個。拿不到通行證的線程,就不容許進入CPU執行。GIL只在cpython中才有,由於cpython
調用的是c語言的原生線程,因此他不能直接操做cpu,只能利用GIL保證同一時間只能有一個線程拿到數據。而在pypy和jpython中是沒有GIL的。
Python多線程的工做過程:
python在使用多線程的時候,調用的是c語言的原生線程。
多線程和多進程最大的不一樣在於,多進程中,同一個變量,各自有一份拷貝存在於每一個進程中,互不影響,而多線程中,全部變量都由全部線程共享,因此,任何
一個變量均可以被任何一個線程修改,所以,線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。
舉個小例子看下:
import threading # 你開始在座標原點 position = 0 def move(x): # 定義你的移動 # 向右爲正,向左爲負 global position position = position + x position = position - x # 回到原點 def run_thread(x): for i in range(500000): move(x) t1 = threading.Thread(target=run_thread, args=(3,)) t2 = threading.Thread(target=run_thread, args=(2,)) t1.start() t2.start() t1.join() t2.join() print('your position:', position)
這裏定義了一個position,在move()方法中讓它左右移動,理論上不管調用多少次move()方法,position的值最終都是0,可是,因爲線程的調度是由操做系統決定的,
當t一、t2交替執行時,只要循環次數足夠多,position的結果就不必定是0了。
究竟爲何會出現結果不爲0的狀況呢?由於cpu在執行move()方法時,須要執行若干條語句已達到修改的目的,而此過程當中,線程可能隨時中斷(線程交替執行),
好比,position = position + 2,執行時會分步執行:
能夠寫爲:
y = position + x
position = y
因爲y是局部變量,兩個線程各自都有本身的y,t1和t2是交替運行的,若是操做系統如下面的順序執行t一、t2:
# 初始值 position = 0 t1: y1 = position + 3 # y1 = 0 + 3 = 3 t2: y2 = position + 2 # y2 = 0 + 2 = 2 t2: position = y2 # position = 2 t1: position = y1 # position = 3 t1: y1 = position - 3 # y1 = 3 - 3 = 0 t1: position = y1 # position = 0 t2: y2 = position - 8 # y2 = 0 - 2 = -2 t2: position = y2 # position = -2 # 結果 position = -2
(注:雖然解釋器有GIL,同一時間只能有一個線程執行,但數據會被copy成不少份,線程切換過程當中,仍是會有數據出錯)
若是要確保position計算正確,就要給move()上一把鎖,當某個線程開始執行move()時,就稱該線程由於得到了鎖,所以其餘線程不能同時執行move(),只能等待,
直到鎖被釋放後,得到該鎖之後才能改。因爲鎖只有一個,不管多少線程,同一時刻最多隻有一個線程持有該鎖,因此,不會形成修改的衝突。建立一個鎖就是
經過threading.Lock()來實現:
import threading position = 0 lock = threading.Lock() # 實例化一把鎖 def move(x): lock.acquire() # 獲取鎖 global position position = position + x position = position - x lock.release() # 釋放鎖 def run_thread(x): for i in range(500000): move(x) t1 = threading.Thread(target=run_thread, args=(3,)) t2 = threading.Thread(target=run_thread, args=(2,)) t1.start() t2.start() t1.join() t2.join() print('your position:', position)
當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,而後繼續執行代碼,其餘線程就繼續等待直到得到鎖爲止。
鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭至尾完整地執行,壞處就是阻止了多線程併發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就
大大地降低了。
RLcok類的用法和Lock類如出一轍,但它支持嵌套,在多個鎖沒有釋放的時候通常會使用使用RLcok類。
import threading lock = threading.RLock() # 實例化一個遞歸鎖 num = 0 def task(): print('task is working') lock.acquire() global num num += 1 lock.release() return num def foo(): print('foo is working') lock.acquire() res = task() print('acquire num from task:',res) lock.release() for i in range(3): t = threading.Thread(target=foo) t.start()
有鎖的嵌套,必定要使用遞歸鎖,若是還使用互斥鎖,會出現鎖死的狀況。
線程(互斥)鎖同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏
面有人出來了才能再進去。
import threading, time def run(n): semaphore.acquire() # 獲取信號量 time.sleep(1) print("run the thread: %s\n" % n) semaphore.release() # 釋放信號量 if __name__ == '__main__': semaphore = threading.BoundedSemaphore(5) # 最多容許5個線程同時運行 for i in range(20): t = threading.Thread(target=run, args=(i,)) t.start()
This class represents an action that should be run only after a certain amount of time has passed
Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling the cancel() method.
The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.
from threading import Timer def wait(): print('I have wait 2 seconds') t = Timer(2, wait) t.start() # 2秒以後執行wait()
python線程的事件用於主線程控制其餘線程的執行,事件是一個簡單的線程同步對象,對象包含一個可由線程設置的信號標誌(flag),它容許線程等待某些事件的發生。
在初始狀況下,Event對象中的信號標誌(flag)被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌(flag)爲假,那麼這個線程將會被一直阻塞直至該
標誌(flag)爲真。一個線程若是將一個Event對象的信號標誌(flag)設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對
象,那麼它將忽略這個事件, 繼續執行。
Event的主要有如下幾個方法:
方法 | 註釋 |
set | 將flag設置爲「True」(設置標誌) |
clear | 將flag設置爲「False」(清除標誌) |
isSet | 判斷是否設置了flag("True" or "False") |
wait | 會一直監聽flag,若是沒有檢測到flag(即:flag爲"Flase")就一直處於阻塞狀態 |
經過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則:
import threading import time # 定義紅綠信號燈 def lighter(): count = 1 event.set() # 初始狀態設置flag,wait()不阻塞,即爲綠燈 while True: if 5 < count <= 10: event.clear() # 將flag清除,即變紅燈 print('---red light on---') elif count > 10: event.set() # 設置flag,即變綠燈 count = 0 else: print('---green light on---') count += 1 time.sleep(1) # 定義行駛車輛 def car(): while True: if event.is_set(): # 檢測flag設置,返回"True"即綠燈 print('car is running go go go...') time.sleep(2) else: print('car is waiting light...') event.wait() # flag被清除,等待從新設置,即紅燈 print('The green light is on, start going...') if __name__ == '__main__': event = threading.Event() # 建立一個event實例 t = threading.Thread(target=lighter,) c = threading.Thread(target=car,) t.start() c.start()
Queue的種類:
Queue.Queue(maxsize=0)
FIFO即First in First Out,先進先出。Queue提供了一個基本的FIFO容器,使用方法很簡單,maxsize是個整數,指明瞭隊列中能存放的數據個數的上限。一旦達到上
限,插入會致使阻塞,直到隊列中的數據被消費掉。若是maxsize小於或者等於0,隊列大小沒有限制。
Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,後進先出。與棧的相似,使用也很簡單,maxsize用法同上
class Queue.PriorityQueue(maxsize=0)
構造一個優先隊列。maxsize用法同上。
經常使用方法:
方法 | 註釋 |
Queue.qsize() | 返回隊列的大小 |
Queue.empty() | 若是隊列爲空,返回True,反之False |
Queue.full() | 若是隊列滿了,返回True,反之False |
Queue.put(item,block=True, timeout=None) | 寫入隊列,timeout等待時間 |
Queue.get(block=True, timeout=None) | 獲取隊列,timeout:等待時間 |
Queue.put_nowait(item) | 至關Queue.put(item, False) |
Queue.get_nowait() | 至關Queue.get(False) |
Queue.task_done() | 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號 |
Queue.join() | 實際上意味着等到隊列爲空,再執行別的操 |
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼
生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問
題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者
生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了
生產者和消費者的處理能力。
來看一個模型例子:
import threading,queue,time count = 0 # 定義生產者 def producer(name): global count while q.qsize() < 20: # 隊列的<20時 count += 1 q.put(count) # 將數據放入隊列 print('[%s] has produced %s bones...' % (name, count)) time.sleep(0.5) else: #隊列達到20時,暫停生產25秒 time.sleep(25) return producer(name) # 定義消費者 def consumer(name): while True: data = q.get() # 獲取隊列賦值給data print('[%s] has eaten the number of %s bone...' % (name, data)) time.sleep(3) if __name__ == '__main__': q = queue.Queue() # 實例化一個隊列 t = threading.Thread(target=producer, args=('Eric',)) c1 = threading.Thread(target=consumer, args=('yuanyuan',)) c2 = threading.Thread(target=consumer, args=('fangfang',)) t.start() c1.start() c2.start()
multiprocessing模塊就是跨平臺版本的多進程模塊(Unix/Linux操做系統提供了一個fork()系統調用),multiprocessing模塊提供了一個Process類來表明一個進程對象,
用法和threading相似:
from multiprocessing import Process import os, time def run_pro(name): print('Run child Process: %s[%s]' % (name, os.getpid())) # 獲取進程ID if __name__ == '__main__': p = Process(target=run_pro, args=('num1',)) print('Process will start...') time.sleep(1) print('Main Process is:',os.getpid()) p.start() p.join()
Process之間確定是須要通訊的,Python的multiprocess模塊提供了Queue、Pipes等多種方式來交換數據。
這裏以Queue爲例,在父進程中建立兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據(使用方法跟threading裏的queue差很少)
#!/usr/bin/python # -*- coding: UTF-8 -*- from multiprocessing import Queue,Process import os # 定義寫入方法 def write(n): print('Process to write %s' % os.getpid()) for i in ['X', 'Y', 'Z']: n.put(i) # 將數據寫入隊列 print('Put %s to queue' % i) # 定義讀取方法 def read(n): print('Process to read %s' % os.getpid()) while True: print('Get %s to queue' % n.get()) # 從隊列讀取數據 if __name__ == '__main__': q = Queue() p1 = Process(target=write, args=(q,)) p2 = Process(target=read, args=(q,)) p1.start() p2.start() p1.join() p2.terminate() #讀取是死循環,強制結束
執行結果以下:
Process to write 8492 Process to read 9860 Put X to queue Get X to queue Put Y to queue Get Y to queue Put Z to queue Get Z to queue
Pipe的本質是進程之間的數據傳遞,而不是數據共享,這和socket有點像。pipe()返回兩個鏈接對象分別表示管道的兩端,每端都有send()
和recv()方法。若是兩個進程試圖在同一時間的同一端進行讀取和寫入那麼,這可能會損壞管道中的數據
from multiprocessing import Process, Pipe import time # 發送數據 def parent(conn): for i in range(5): conn.send(i) print('\033[41;1m send [%s] from parent\033[0m' % i) time.sleep(0.1) # 爲了更清晰看出發送和接收的狀態 # 接收數據 def child(conn): while True: print('\033[46;1m child has received [%s] \033[0m' % conn.recv()) if __name__ == '__main__': child_pipe, parent_pipe = Pipe() # 實例化一個管道(一個管道有兩端) p = Process(target=parent, args=(parent_pipe,)) c = Process(target=child, args=(child_pipe,)) p.start() c.start() p.join() c.terminate() # 強制結束
執行結果以下:
send [0] from parent child has received [0] send [1] from parent child has received [1] send [2] from parent child has received [2] send [3] from parent child has received [3] send [4] from parent child has received [4]
(Queue and Pipe objects should only be shared between processes through inheritance.)
經過Manager可實現進程間數據的共享。Manager()返回的manager對象會經過一個服務進程,來使其餘進程經過代理的方式操做python對象。
manager對象支持list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array.
from multiprocessing import Manager,Process import os def fanc(d): d[os.getpid()]= os.getpid() # 在字典中添加鍵值均爲進程ID print(d) if __name__ == '__main__': with Manager() as manager: # 等同於manager = Manager() d = manager.dict() # 建立一個字典,能夠在多個進程之間共享和傳遞 p_list = [] # 建立一個保存進程的列表,方便後面join for i in range(10): p = Process(target=fanc, args=(d,)) p.start() p_list.append(p) for res in p_list: res.join()
經過上面的設置,代碼中的10個進程都可以對字典d作修改,達到數據共享的目的,執行結果以下:
{13704: 13704} {13704: 13704, 15824: 15824} {13704: 13704, 15824: 15824, 4460: 4460} {13704: 13704, 15824: 15824, 4460: 4460, 1292: 1292} {13704: 13704, 15824: 15824, 4460: 4460, 1292: 1292, 14252: 14252} {13704: 13704, 15824: 15824, 4460: 4460, 1292: 1292, 14252: 14252, 2716: 2716} {13704: 13704, 15824: 15824, 4460: 4460, 1292: 1292, 14252: 14252, 2716: 2716, 1752: 1752} {13704: 13704, 15824: 15824, 4460: 4460, 1292: 1292, 14252: 14252, 2716: 2716, 1752: 1752, 11956: 11956} {13704: 13704, 15824: 15824, 4460: 4460, 1292: 1292, 14252: 14252, 2716: 2716, 1752: 1752, 11956: 11956, 15048: 15048} {13704: 13704, 15824: 15824, 4460: 4460, 1292: 1292, 14252: 14252, 2716: 2716, 1752: 1752, 11956: 11956, 15048: 15048, 7028: 7028}
數據輸出的時候保證不一樣進程的輸出內容在同一塊屏幕正常顯示,防止數據亂序的狀況。
Without using the lock output from the different processes is liable to get all mixed up.
因爲進程啓動的開銷比較大,使用多進程的時候會致使大量內存空間被消耗。爲了防止這種狀況發生可使用進程池,(因爲啓動線程的
開銷比較小,因此不須要線程池這種概念,多線程只會頻繁得切換cpu致使系統變慢,並不會佔用過多的內存空間)
進程池主要有兩個方法:
apply:同步執行(串行)
apply_acync:異步執行(並行)
from multiprocessing import Process,Pool import time,os def foo(i): time.sleep(1) print('in the process:' % os.getpid()) return i+100 def bar(arg): time.sleep(0.1) print('-->exec done:', arg, os.getpid()) if __name__ == '__main__': pool = Pool(5) # 定義一個容量爲5的進程池 for i in range(10): # 寫入十個進程,多出的會自動掛起 # pool.apply(func=foo, args=(i,)) # 串行方式 pool.apply_async(func=foo, args=(i,), callback=bar) # 並行方式,func子進程執行完後,纔會執行callback(回調函數),不然callback不執行(並且callback是由父進程來執行了) pool.close() # 注意這裏結束時必定要先close,後join。 pool.join() print('end')
對Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close()調用close()以後就不能繼續添加新的Procsee了。
進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到
進程池中有可用進程爲止。在上面的程序中產生了10個進程,可是隻能有5同時被放入進程池,剩下的都被暫時掛起,並不佔用內存空間,
等前面的五個進程執行完後,再執行剩下5個進程。