第十六章-進程和線程

對於操做系統來講, 一個任務就是一個進程(Process)python

進程內的這些「子任務」稱爲線程(Thread)linux

真正的並行執行多任務只能在多核CPU上實現web

多任務的實現有3種方式:編程

  多進程模式;windows

  多線程模式;服務器

  多進程+多線程模式網絡

Python既支持多進程, 又支持多線程多線程

1 多進程

  Unix/Linux操做系統提供了一個fork()系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次, 由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。併發

  子進程永遠返回0, 而父進程返回子進程的ID, 進程只須要調用getppid()就能夠拿到父進程的IDapp

  在python中能夠經過導入os模塊來完成一些系統的調用

  os.getpid()能夠返回當前進程的pid

  os.fork()能夠調用fork系統調用, 只不過只是支持linux系列的系統

1.1  multiprocessing

  因爲在windows上沒法使用fork(), 因此在python中提供了模塊multiprocessing來造成子進程

  導入multiprocessing模塊的方法是使用from multiprocessing import導入

  利用process函數來建立一個子進程

  第一個參數能夠是用target用於傳遞一個函數, 用於生成進程以後調用該方法

  第二個參數是args傳遞的剩餘參數

  使用start()方法來啓動子進程

  join()方法表示父進程要等待子進程執行完畢以後才能繼續往下執行, 一般用於進程間的同步

  具體的使用實例以下

from multiprocessing import Process
import os

def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

1.2 Pool

  要建立大量的進程就須要使用進程池

  一樣是multiprocessing模塊下的, 可是使用的函數是Pool

  具體是Pool()能夠傳入一個值用於設定子進程同時執行的數量, 返回一個進程池

  Pool默認的大小是CPU的內核數量

  進程池能夠調用apply_async()函數來建立子進程, 一樣第一個參數能夠綁定一個方法, 第二個參數args

  對Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process

  具體建立代碼

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

1.3 子進程

  若是不只要建立執行子進程, 還須要控制進程的輸入和輸出, 那就須要使用subprocess模塊

  具體代碼以下

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

1.4 進程間的通訊

  進程之間還須要通訊, python經過Queue和Pipes來交換數據

  下面是建立兩個進程, 一個是往Queue裏寫入數據, 一個是從Queue裏讀數據    

  具體代碼以下

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父進程建立Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啓動子進程pw,寫入:
    pw.start()
    # 啓動子進程pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr進程裏是死循環,沒法等待其結束,只能強行終止:
    pr.terminate()

2 多線程

  一個進程至少有一個線程

  線程是操做系統直接支持的執行單元    

  在python中提供兩個模塊進程線程的操做, 一個是_thread, 一個是threading

  其中_thread是低級模塊, threading是高級模塊, 對_thread進程了封裝, 通常只使用threading就行

  啓動一個線程就是把一個函數傳入並建立Thread實例, 而後調用start()開始執行

  因爲任何進程默認就會啓動一個線程,咱們把該線程稱爲主線程, 主線程又能夠啓動新的線程

  Python的threading模塊有個current_thread()函數,它永遠返回當前線程的實例

  主線程實例的名字叫MainThread,子線程的名字在建立時指定,咱們用LoopThread命名子線程

  名字僅僅在打印時用來顯示,徹底沒有其餘意義,若是不起名字Python就自動給線程命名爲Thread-1,Thread-2……

  具體代碼以下

import time, threading

# 新線程執行的代碼:
def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

2.1 Lock

  多線程和多進程的區別

  多進程中, 同一個變量, 各自有一份拷貝, 互相不影響

  多線程中, 全部變量都是有全部線程共享, 任何一個變量均可以被任何一個線程修改, 因此必定要注意同時修改一個變量的狀況

  所以可使用鎖來實現對併發修改的控制

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了必定要釋放鎖:
            lock.release()

2.2 多核CPU

  通常地, 一個死循環線程會100%佔用一個CPU, 若是有兩個死循環線程的話, 就會監控到佔用200%的CPU

  可是在Python中, 因爲GIL的限制, 一個進行當前的線程只能有一個

3 ThreadLocal

  在多線程環境下, 每一個線程都有本身的數據, 且這些數據都是局部變量

  可是大多時候, 一個進程的多個線程可能須要共用有個數據, 這個時候若是不斷傳遞參數就顯得臃腫, 建立一個全局變量經過鍵值對來保存儘管能夠解決這一問題, 可是代碼不夠美觀

  所以能夠在多線程中, 使用threading.local()建立一個ThreadLocal對象來當作那個全局變量

import threading

# 建立全局ThreadLocal對象:
local_school = threading.local()


def process_thread(name):
    # 綁定ThreadLocal的student:
    local_school.student = name
    process_student()


def process_student():
    # 獲取當前線程關聯的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))


t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

4 進程和線程

  通常的多任務, 一般會設計 Master-Worker 模式來處理

  Master用於分配任務, Worker用於執行任務, 通常多任務環境下有一個Master多個Worker

  穩定性上:

  多進程: 穩定性好, 一個子進程崩潰了不會影響主進程, 通常Master進程很低可能崩潰, Apache就是使用的多線程

  多線程: 穩定性不如多進程, 一個子線程崩潰程序就會掛掉

  資源開銷上:

  多進程: 進程開銷大, 一個操做系統可以同時運行的進程是有限的

  多線程: 線程開銷小, 所以通常地處理速度也較快, IIS就是使用的多線程

  關於線程切換

  不管是多線程仍是多進程, 一旦數據過量, 效率就會下降

  所以進程或者線程的切換, 都是須要時間的, 若是數量過多, 切換花費的時間就更多了

  關於計算密集型和IO密集型

  計算密集型主要消耗CPU資源, 所以任務切換的越頻繁, 效率就越低, 通常計算密集型同時進行的數量至關於CPU核心數

  相對的IO密集型就有所不一樣, 因爲IO操做(網絡, 磁盤IO等)比較浪費時間, 此時python就頗有優點

  關於異步IO

  若是是同步IO的話, 那麼在IO沒有執行完畢以前程序是沒法繼續往下執行的

  異步IO可使得程序在不用等待IO操做完成程序能夠繼續往下執行

  現代操做系統對IO操做的支持已經作了巨大的改進, 利用異步IO可使得單線程模型執行多任務, 這也就是事件驅動模型

  常見的異步IO的web服務器是Nginx, 單核CPU採用單線程進行, 多核CPU通常運行與CPU核心相同數量的進程數

  在Python中, 單線程的異步編程模型就是協程

5 分佈式進程

  通常在Python中, 線程和進程通常會選擇進程來編寫代碼

  同時multiprocessing模塊不但支持多進程, 還支持多進程分佈到多臺機器當中

  共享消息隊列的多線程的使用方法以下

import random, queue
from multiprocessing.managers import BaseManager

# 建立兩個隊列, 發送任務的隊列 和 接受消息隊列
task_queue = queue.Queue()
result_queue = queue.Queue()


# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass


# 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啓動Queue:
manager.start()
# 得到經過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 關閉:
manager.shutdown()
print('master exit.')

  獲取消息隊列的任務並執行的多進程以下

import time, sys, queue
from multiprocessing.managers import BaseManager


# 建立相似的QueueManager:
class QueueManager(BaseManager):
    pass


# 因爲這個QueueManager只從網絡上獲取Queue,因此註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 鏈接到服務器,也就是運行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與task_master.py設置的徹底一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網絡鏈接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n * n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')
相關文章
相關標籤/搜索