python基礎-線程和進程

1、多進程html

  一、multiprocessing:提供跨平臺的多進程支持python

  二、Pool:進程池.編程

  三、進程間通訊:multiprocessing.Queue;multiprocessing.Pipeswindows

2、多線程服務器

  一、Lock:線程鎖.網絡

  二、多核CPU多線程

  三、ThreadLocal:每一個線程使用本身的局部變量.提升性能.併發

3、線程 vs 進程app

  一、線程切換:效率,性能問題.
dom

  二、計算密集型 vs. IO密集型

  三、異步IO:協程.

4、分佈式進程

------------------

1、多進程

  線程是最小的執行單元,而進程由至少一個線程組成。

  python中os模塊中的fork()能夠複製當前進程,而且子進程返回0,父進程返回子進程的PID。那麼子進程能夠用getppid()獲得父進程的PID :

# multiprocessing.py
import os

print 'Process (%s) start...' % os.getpid()
pid = os.fork()      #windows系統下沒有fork()調用,so,只能在posix的系統下執行(Unix,Linux,Mac,BSD...) if pid==0:
    print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

  常見的Apache服務就是由父進程監聽端口,而後fork()出子進程來處理http請求.

  一、multiprocessing:提供跨平臺的多進程支持

    Python中有multiprocessing模塊提供跨平臺的多進程支持:

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()    #啓動進程
    p.join()     #等待子進程結束後再繼續往下運行,一般用於進程間的同步。 print 'Process end.'

  二、Pool :進程池.

    要啓動大量的子進程,能夠用進程池的方式批量建立子進程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print 'Run task %s (%s)...' % (name,os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)  #隨機暫停
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name,(end-start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()      #建立一個進程池. for i in range(5):
        p.apply_async(long_time_task,args=(i,))  #將要執行的函數加入進程池 print 'Waiting for all subprocesses done....'
    p.close()
    p.join()    #等待全部子進程執行完畢,以前必須先調用close(),而後不能添加新的Process.
    print 'All subprocessesdone.'
Parent process 1356.
Waiting for all subprocesses done....
Run task 3 (6296)...
Task 3 runs 0.52 seconds.
Run task 2 (4244)...
Task 2 runs 1.58 seconds.
Run task 0 (5720)...
Task 0 runs 1.48 seconds.
Run task 1 (7692)...
Task 1 runs 2.55 seconds.
Run task 4 (1860)...
Task 4 runs 2.96 seconds.
All subprocessesdone.
[Finished in 3.3s]

  Pool的默認大小是CPU的核數,固然,你也能夠改變:

p = Pool(5)

  三、進程間通訊:

    multiprocessing模塊包裝了底層的機制,提供了QueuePipes等多種方式來交換數據,以Queue爲例:參考:這裏

from multiprocessing import Process,Queue
import os, time, random

#寫數據進程執行的代碼:
def write(q):
    for value in ['A','B','C']:
        print 'Put %s to queue...'  % value
        q.put(value)
    time.sleep(random.random())

#讀數據進程執行的代碼:
def read(q):
    while True:
        if q.empty():
            return
        else:
            value = q.get()
            print value,'from Queue'

if __name__=='__main__':
    #父進程建立Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    #啓動子進程pw,寫入:
    pw.start()
    #啓動子進程pr,讀取:
    pr.start()
    #等待pw,pr結束:    
    pw.join()
    pr.terminate()

2、多線程:

   任務能夠由多進程完成,也能夠由一個進程內的多線程完成.

   python 提供threadtheading兩個線程模塊,thread較底層,threading較方便.大多數狀況下使用theading就足夠了.

#theading_test
import time, threading

# 新線程執行的代碼:
def loop():
    print 'thread %s is running...' % threading.current_thread().name #返回當前線程的實例
    n = 0
    while n < 5:
        n = n + 1
        print 'thread %s >>> %s' % (threading.current_thread().name, n)
        time.sleep(1)
    print 'thread %s ended.' % threading.current_thread().name

print 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')  #在建立線程時指定名字LoopThread,不然默認爲Thread-1,Thread-2....
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name

  一、Lock

    多進程中,每一個進程都有變量的拷貝,而多線程中,共享進程中的變量,因此須要lock來限定變量的修改.

     Threading.Lock()能夠實現同一時間只能有一個線程擁有鎖,而後能夠修改變量,由於鎖只有一把,其餘線程則須要等待該線程釋放鎖後才能進行操做:

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()   #當多個線程同時執行時,只有一個線程能成功地獲取鎖,而後繼續執行代碼,其餘線程就繼續等待直到得到鎖爲止 try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了必定要釋放鎖:
            lock.release()lock.acquire()

    鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭至尾完整地執行.

    可是同時也阻止了多線程併發執行,

    其次,因爲能夠存在多個鎖,操做不當可能致使多個線程所有掛起(pause),只能又操做系統強制終止.

  二、多核CPU:

    若是你不幸擁有一個多核CPU,你確定在想,多核應該能夠同時執行多個線程。

    若是寫一個死循環的話,會出現什麼狀況呢?

    打開Mac OS X的Activity Monitor,或者Windows的Task Manager,均可以監控某個進程的CPU使用率。

    咱們能夠監控到一個死循環線程會100%佔用一個CPU。

    若是有兩個死循環線程,在多核CPU中,能夠監控到會佔用200%的CPU,也就是佔用兩個CPU核心。

    要想把N核CPU的核心所有跑滿,就必須啓動N個死循環線程。

    試試用Python寫個死循環:

import threading, multiprocessing

def loop():
    x = 0
    while True:
        x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()

    啓動與CPU核心數量相同的N個線程,在4核CPU上能夠監控到CPU佔用率僅有160%,也就是使用不到兩核。

    即便啓動100個線程,使用率也就170%左右,仍然不到兩核。

    可是用C、C++或Java來改寫相同的死循環,直接能夠把所有核心跑滿,4核就跑到400%,8核就跑到800%,爲何Python不行呢?

    由於Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先得到GIL鎖,而後,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把全部線程的執行代碼都給上了鎖,因此,多線程在Python中只能交替執行,即便100個線程跑在100核CPU上,也只能用到1個核。

    GIL是Python解釋器設計的歷史遺留問題,一般咱們用的解釋器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。

    因此,在Python中,可使用多線程,但不要期望能有效利用多核。若是必定要經過多線程利用多核,那隻能經過C擴展來實現,不過這樣就失去了Python簡單易用的特色。

    不過,也不用過於擔憂,Python雖然不能利用多線程實現多核任務,但能夠經過多進程實現多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。

  三、ThreadLocal:

    每一個線程使用本身的局部變量.提升性能.

import threading

# 建立全局ThreadLocal對象:
local_school = threading.local()

def process_student():
    print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name)

def process_thread(name):
    # 綁定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
[Finished in 0.1s]

    全局變量local_school就是一個ThreadLocal對象,每一個Thread對它均可以讀寫student屬性,但互不影響。能夠把local_school當作全局變量,但每一個屬性如local_school.student都是線程的局部變量,能夠任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。

 3、進程 vs. 線程:

    多進程和多線程,是實現多任務最經常使用的兩種方式。

    首先,要實現多任務,一般會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,所以,多任務環境下,一般是一個Master,多個Worker

    若是用多進程實現Master-Worker,主進程就是Master,其餘進程就是Worker

    若是用多線程實現Master-Worker,主線程就是Master,其餘線程就是Worker

    多進程模式最大的優勢就是穩定性高,由於一個子進程崩潰了,不會影響主進程和其餘子進程。(固然主進程掛了全部進程就全掛了,可是Master進程只負責分配任務,掛掉的機率低)著名的Apache最先就是採用多進程模式。

    多進程模式的缺點是建立進程的代價大,在Unix/Linux系統下,用fork調用還行,在Windows下建立進程開銷巨大。另外,操做系統能同時運行的進程數也是有限的,在內存和CPU的限制下,若是有幾千個進程同時運行,操做系統連調度都會成問題。

    多線程模式一般比多進程快一點,可是也快不到哪去,並且,多線程模式致命的缺點就是任何一個線程掛掉均可能直接形成整個進程崩潰,由於全部線程共享進程的內存。在Windows上,若是一個線程執行的代碼出了問題,常常能夠看到這樣的提示:「該程序執行了非法操做,即將關閉」,其實每每是某個線程出了問題,可是操做系統會強制結束整個進程。

    在Windows下,多線程的效率比多進程要高,因此微軟的IIS服務器默認採用多線程模式。因爲多線程存在穩定性的問題,IIS的穩定性就不如Apache。爲了緩解這個問題,IISApache如今又有多進程+多線程的混合模式。

  一、線程切換:

    不管是多進程仍是多線程,只要數量一多,效率確定上不去,爲何?

    打個比方,假設你不幸正在準備中考,天天晚上須要作語文、數學、英語、物理、化學這5科的做業,每項做業耗時1小時。

    若是你先花1小時作語文做業,作完了,再花1小時作數學做業,這樣,依次所有作完,一共花5小時,這種方式稱爲單任務模型,或者批處理任務模型。

    假設你打算切換到多任務模型,能夠先作1分鐘語文,再切換到數學做業,作1分鐘,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多任務是同樣的了,以幼兒園小朋友的眼光來看,你就正在同時寫5科做業。

    可是,切換做業是有代價的,好比從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫保存現場),而後,打開數學課本、找出圓規直尺(這叫準備新環境),才能開始作數學做業。操做系統在切換進程或者線程時也是同樣的,它須要先保存當前執行的現場環境(CPU寄存器狀態、內存頁等),而後,把新任務的執行環境準備好(恢復上次的寄存器狀態,切換內存頁等),才能開始執行。這個切換過程雖然很快,可是也須要耗費時間。若是有幾千個任務同時進行,操做系統可能就主要忙着切換任務,根本沒有多少時間去執行任務了,這種狀況最多見的就是硬盤狂響,點窗口無反應,系統處於假死狀態。

    因此,多任務一旦多到一個限度,就會消耗掉系統全部的資源,結果效率急劇降低,全部任務都作很差。

  二、計算密集型 vs. IO密集型:

    是否採用多任務的第二個考慮是任務的類型。能夠把任務分爲計算密集型和IO密集型。

    計算密集型任務的特色是要進行大量的計算,消耗CPU資源,好比計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也能夠用多任務完成,可是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,因此,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。

    計算密集型任務因爲主要消耗CPU資源,所以,代碼運行效率相當重要。Python這樣的腳本語言運行效率很低,徹底不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。

    第二種任務的類型是IO密集型,涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特色是CPU消耗不多,任務的大部分時間都在等待IO操做完成(由於IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,好比Web應用。

    IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間不多,所以,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,徹底沒法提高運行效率。對於IO密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。

  三、異步IO:

    考慮到CPUIO之間巨大的速度差別,一個任務在執行的過程當中大部分時間都在等待IO操做,單進程單線程模型會致使別的任務沒法並行執行,所以,咱們才須要多進程模型或者多線程模型來支持多任務併發執行。

    現代操做系統對IO操做已經作了巨大的改進,最大的特色就是支持異步IO。若是充分利用操做系統提供的異步IO支持,就能夠用單進程單線程模型來執行多任務,這種全新的模型稱爲事件驅動模型,Nginx就是支持異步IO的Web服務器,它在單核CPU上採用單進程模型就能夠高效地支持多任務。在多核CPU上,能夠運行多個進程(數量與CPU核心數相同),充分利用多核CPU。因爲系統總的進程數量十分有限,所以操做系統調度很是高效。用異步IO編程模型來實現多任務是一個主要的趨勢。

    對應到Python語言,單進程的異步編程模型稱爲協程,有了協程的支持,就能夠基於事件驅動編寫高效的多任務程序

4、分佈式進程:

  在ThreadProcess中,應當優選Process,由於Process更穩定,並且,Process能夠分佈到多臺機器上,而Thread最多隻能分佈到同一臺機器的多個CPU上。

  Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分佈到多臺機器上。一個服務進程能夠做爲調度者,將任務分佈到其餘多個進程中,依靠網絡通訊。因爲managers模塊封裝很好,沒必要了解網絡通訊的細節,就能夠很容易地編寫分佈式多進程程序。

  舉個例子:若是已經有一個經過Queue通訊的多進程程序在同一臺機器上運行,如今,因爲處理任務的進程任務繁重,但願把發送任務的進程和處理任務的進程分佈到兩臺機器上。怎麼用分佈式進程實現?

  原有的Queue能夠繼續使用,可是,經過managers模塊把Queue經過網絡暴露出去,就可讓其餘機器的進程訪問Queue了。

  先看服務進程,服務進程負責啓動Queue,把Queue註冊到網絡上,而後往Queue裏面寫入任務:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 發送任務的隊列:
task_queue = Queue.Queue()
# 接收結果的隊列:
result_queue = Queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 啓動Queue:
manager.start()
# 得到經過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 關閉:
manager.shutdown()

    請注意,當在一臺機器上寫多進程程序時,建立的Queue能夠直接拿來用,可是,在分佈式多進程環境下,添加任務到Queue不能夠直接對原始的task_queue進行操做,那樣就繞過了QueueManager的封裝,必須經過manager.get_task_queue()得到的Queue接口添加。

    而後,在另外一臺機器上啓動任務進程(本機上啓動也能夠):

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 建立相似的QueueManager:
class QueueManager(BaseManager):
    pass

# 因爲這個QueueManager只從網絡上獲取Queue,因此註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 鏈接到服務器,也就是運行taskmanager.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與taskmanager.py設置的徹底一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 從網絡鏈接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')

    任務進程要經過網絡鏈接到服務進程,因此要指定服務進程的IP。

    如今,能夠試試分佈式進程的工做效果了。先啓動taskmanager.py服務進程:

$ python taskmanager.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

    taskmanager進程發送完任務後,開始等待result隊列的結果。如今啓動taskworker.py進程:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

     taskworker進程結束,在taskmanager進程中會繼續打印出結果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

    Queue對象存儲在哪?注意到taskworker.py中根本沒有建立Queue的代碼,因此,Queue對象存儲在taskmanager.py進程中。

    而Queue之因此能經過網絡訪問,就是經過QueueManager實現的。因爲QueueManager管理的不止一個Queue,因此,要給每一個Queue的網絡調用接口起個名字,好比get_task_queue

    authkey有什麼用?這是爲了保證兩臺機器正常通訊,不被其餘機器惡意干擾。若是taskworker.pyauthkeytaskmanager.pyauthkey不一致,確定鏈接不上。

    分佈式進程接口簡單,封裝良好,適合須要把繁重任務分佈到多臺機器的環境下。

    注意Queue的做用是用來傳遞任務和接收結果,每一個任務的描述數據量要儘可能小。好比發送一個處理日誌文件的任務,就不要發送幾百兆的日誌文件自己,而是發送日誌文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。  

相關文章
相關標籤/搜索