對於操做系統來講, 一個任務就是一個進程(Process)python
進程內的這些「子任務」稱爲線程(Thread)linux
真正的並行執行多任務只能在多核CPU上實現web
多任務的實現有3種方式:編程
多進程模式;windows
多線程模式;服務器
多進程+多線程模式網絡
Python既支持多進程, 又支持多線程多線程
Unix/Linux操做系統提供了一個fork()系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次, 由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。併發
子進程永遠返回0, 而父進程返回子進程的ID, 進程只須要調用getppid()就能夠拿到父進程的IDapp
在python中能夠經過導入os模塊來完成一些系統的調用
os.getpid()能夠返回當前進程的pid
os.fork()能夠調用fork系統調用, 只不過只是支持linux系列的系統
因爲在windows上沒法使用fork(), 因此在python中提供了模塊multiprocessing來造成子進程
導入multiprocessing模塊的方法是使用from multiprocessing import導入
利用process函數來建立一個子進程
第一個參數能夠是用target用於傳遞一個函數, 用於生成進程以後調用該方法
第二個參數是args傳遞的剩餘參數
使用start()方法來啓動子進程
join()方法表示父進程要等待子進程執行完畢以後才能繼續往下執行, 一般用於進程間的同步
具體的使用實例以下
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
要建立大量的進程就須要使用進程池
一樣是multiprocessing模塊下的, 可是使用的函數是Pool
具體是Pool()能夠傳入一個值用於設定子進程同時執行的數量, 返回一個進程池
Pool默認的大小是CPU的內核數量
進程池能夠調用apply_async()函數來建立子進程, 一樣第一個參數能夠綁定一個方法, 第二個參數args
對Pool
對象調用join()
方法會等待全部子進程執行完畢,調用join()
以前必須先調用close()
,調用close()
以後就不能繼續添加新的Process
了
具體建立代碼
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
若是不只要建立執行子進程, 還須要控制進程的輸入和輸出, 那就須要使用subprocess模塊
具體代碼以下
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
進程之間還須要通訊, python經過Queue和Pipes來交換數據
下面是建立兩個進程, 一個是往Queue裏寫入數據, 一個是從Queue裏讀數據
具體代碼以下
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父進程建立Queue,並傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啓動子進程pw,寫入:
pw.start()
# 啓動子進程pr,讀取:
pr.start()
# 等待pw結束:
pw.join()
# pr進程裏是死循環,沒法等待其結束,只能強行終止:
pr.terminate()
一個進程至少有一個線程
線程是操做系統直接支持的執行單元
在python中提供兩個模塊進程線程的操做, 一個是_thread, 一個是threading
其中_thread是低級模塊, threading是高級模塊, 對_thread進程了封裝, 通常只使用threading就行
啓動一個線程就是把一個函數傳入並建立Thread實例, 而後調用start()開始執行
因爲任何進程默認就會啓動一個線程,咱們把該線程稱爲主線程, 主線程又能夠啓動新的線程
Python的threading模塊有個current_thread()函數,它永遠返回當前線程的實例
主線程實例的名字叫MainThread,子線程的名字在建立時指定,咱們用LoopThread命名子線程
名字僅僅在打印時用來顯示,徹底沒有其餘意義,若是不起名字Python就自動給線程命名爲Thread-1,Thread-2……
具體代碼以下
import time, threading
# 新線程執行的代碼:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
多線程和多進程的區別
多進程中, 同一個變量, 各自有一份拷貝, 互相不影響
多線程中, 全部變量都是有全部線程共享, 任何一個變量均可以被任何一個線程修改, 因此必定要注意同時修改一個變量的狀況
所以可使用鎖來實現對併發修改的控制
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要獲取鎖:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了必定要釋放鎖:
lock.release()
通常地, 一個死循環線程會100%佔用一個CPU, 若是有兩個死循環線程的話, 就會監控到佔用200%的CPU
可是在Python中, 因爲GIL的限制, 一個進行當前的線程只能有一個
在多線程環境下, 每一個線程都有本身的數據, 且這些數據都是局部變量
可是大多時候, 一個進程的多個線程可能須要共用有個數據, 這個時候若是不斷傳遞參數就顯得臃腫, 建立一個全局變量經過鍵值對來保存儘管能夠解決這一問題, 可是代碼不夠美觀
所以能夠在多線程中, 使用threading.local()建立一個ThreadLocal對象來當作那個全局變量
import threading
# 建立全局ThreadLocal對象:
local_school = threading.local()
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
def process_student():
# 獲取當前線程關聯的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
通常的多任務, 一般會設計 Master-Worker 模式來處理
Master用於分配任務, Worker用於執行任務, 通常多任務環境下有一個Master多個Worker
穩定性上:
多進程: 穩定性好, 一個子進程崩潰了不會影響主進程, 通常Master進程很低可能崩潰, Apache就是使用的多線程
多線程: 穩定性不如多進程, 一個子線程崩潰程序就會掛掉
資源開銷上:
多進程: 進程開銷大, 一個操做系統可以同時運行的進程是有限的
多線程: 線程開銷小, 所以通常地處理速度也較快, IIS就是使用的多線程
關於線程切換
不管是多線程仍是多進程, 一旦數據過量, 效率就會下降
所以進程或者線程的切換, 都是須要時間的, 若是數量過多, 切換花費的時間就更多了
關於計算密集型和IO密集型
計算密集型主要消耗CPU資源, 所以任務切換的越頻繁, 效率就越低, 通常計算密集型同時進行的數量至關於CPU核心數
相對的IO密集型就有所不一樣, 因爲IO操做(網絡, 磁盤IO等)比較浪費時間, 此時python就頗有優點
關於異步IO
若是是同步IO的話, 那麼在IO沒有執行完畢以前程序是沒法繼續往下執行的
異步IO可使得程序在不用等待IO操做完成程序能夠繼續往下執行
現代操做系統對IO操做的支持已經作了巨大的改進, 利用異步IO可使得單線程模型執行多任務, 這也就是事件驅動模型
常見的異步IO的web服務器是Nginx, 單核CPU採用單線程進行, 多核CPU通常運行與CPU核心相同數量的進程數
在Python中, 單線程的異步編程模型就是協程
通常在Python中, 線程和進程通常會選擇進程來編寫代碼
同時multiprocessing模塊不但支持多進程, 還支持多進程分佈到多臺機器當中
共享消息隊列的多線程的使用方法以下
import random, queue
from multiprocessing.managers import BaseManager
# 建立兩個隊列, 發送任務的隊列 和 接受消息隊列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
# 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啓動Queue:
manager.start()
# 得到經過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 關閉:
manager.shutdown()
print('master exit.')
獲取消息隊列的任務並執行的多進程以下
import time, sys, queue
from multiprocessing.managers import BaseManager
# 建立相似的QueueManager:
class QueueManager(BaseManager):
pass
# 因爲這個QueueManager只從網絡上獲取Queue,因此註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 鏈接到服務器,也就是運行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與task_master.py設置的徹底一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網絡鏈接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 處理結束:
print('worker exit.')