python數據結構和GIL及多進程

一 數據結構和GIL

1 queue

標準庫queue模塊,提供FIFO的queue、LIFO的隊列,優先隊列
Queue 類是線程安全的,適用於多線程間安全的交換數據,內部使用了Lock和Condition python


爲何說容器的大小不許確,其緣由是若是不加鎖,是不可能獲取到準確的大小的,由於你剛讀取了一個大小,還沒取走,有可能被就被其餘線程修改了,queue類的size雖然加了鎖,可是依然不能保證當即get,put就能成功,由於讀取大小和get,put方法是分來的。nginx

2 GIL

1 簡介

全局解釋器鎖,進程級別的鎖GIL
Cpython在解釋器進程中有一把鎖,叫作GIL全局解釋器鎖。編程

GIL 保證Cpython進程中,當前時刻只有一個線程執行代碼,甚至在多核狀況下,也是如此。安全

2 IO 密集型和CPU密集型

Cpython中
IO 密集型,因爲線程阻塞,就會調度其餘線程
CPU密集型,當前線程可能連續獲取GIL,致使其餘線程幾乎沒法使用CPU,若要喚醒其餘線程,則須要準備數據,其代價是高昂的。服務器


IO 密集型,多線程解決,CPU密集型,多進程解決,繞開GIL。markdown

python中絕大多數內置數據結構的讀寫操做都是原子操做網絡


因爲GIL 的存在,python的內置數據類型在多線程編程的時候就變得安全了,可是實際上他們自己不是線程安全類型的數據結構

3 保留GIL 緣由

Guido堅持的簡單哲學,對於初學者門檻低,不須要高深的系統知識也能安全,簡單的使用python。
而移除GIL。會下降Cpython單線程的執行效率。多線程

4 驗證其是不是單線程

相關實例併發

import  logging
import datetime
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc():
    sum=0
    for _ in range(1000000000):
        sum+=1

calc()
calc()
calc()
calc()
calc()
delta=(datetime.datetime.now()-start).total_seconds()
logging.info(delta)

python數據結構和GIL及多進程

多線程模式下的計算結果

import  logging
import datetime
import threading
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc():
    sum=0
    for _ in range(1000000000):
        sum+=1
lst=[]
for _ in range(5):
    t=threading.Thread(target=calc)
    t.start()
    lst.append(t)

for t in lst:
    t.join()

delta=(datetime.datetime.now()-start).total_seconds()

print (delta)

結果以下

python數據結構和GIL及多進程

從這兩個程序來看,Cpython中多線程根本沒有優點,和一個線程執行的時間至關,由於存在GIL

二 多進程

1 概念

1 多進程描述

因爲python中的GIL ,多線程不是CPU密集型程序的最好選擇

多進程能夠在徹底獨立的進程中運行程序,能夠充分利用多處理器

可是進程自己的隔離帶來數據不共享也是一個問題,且線程比進程輕量的多

多進程也是解決併發的一種手段

2 進程和線程的異同

相同點:

進程是能夠終止的,線程是不能經過命令終止的,線程的終止要麼拋出異常,要麼程序自己執行完成。

進程間同步提供了和線程同步同樣的類,使用方式也是同樣的,使用效果也是相似,不過,進程間同步的代價要高於線程,並且底層實現不一樣。

multiprocessing 還提供了共享內存,服務器進程來共享數據,還提供了queue隊列,匹配管道用於進程間通訊


不一樣點

通訊方式不一樣
1 多進程就是啓用多個解釋器進程,進程間通訊必須序列化,反序列化
2 數據的安全性問題

多進程最好是在main中執行
多線程已經將數據進行處理了,其不須要再次進行序列化了

多進程傳遞必須序列化和反序列化。

3 進程應用

遠程調用,RPC,跨網絡

2 參數介紹

multiprocessing中的process類

process 類遵循了Thread類的API,減小了學習難度
不一樣進程能夠徹底調度到不一樣的CPU上執行

IO 密集型最好使用多線程
CPU 密集型最好使用多進程

進程提供的相關屬性

名稱 含義
pid 進程ID
exitcode 進程退出的狀態碼
terminate() 終止指定進程

3 實例

import  logging
import datetime
import multiprocessing
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc(i):
    sum=0
    for _ in range(1000000000):
        sum+=1
lst=[]
for  i in range(5):
    p=multiprocessing.Process(target=calc,args=(i,),name="P-{}".format(i))
    p.start()
    lst.append(p)
for p in  lst:
    p.join()

delta=(datetime.datetime.now()-start).total_seconds()
print (delta)

結果以下

python數據結構和GIL及多進程

多進程自己避開了進程和進程之間調度須要的時間,多核心都使用了,此處存在CPU的調度問題
多進程對CPU的提高是顯而易見的。
單線程,多線程都跑了很長時間,而多進程只是用了1分半,是真正的並行

4 進程池相關

import  logging
import datetime
import multiprocessing
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc(i):
    sum=0
    for _ in range(1000000000):
        sum+=1
    print (i,sum)
if  __name__=='__main__':
    start=datetime.datetime.now()
    p=multiprocessing.Pool(5)  # 此處用於初始化進程池,其池中的資源是能夠複用的
    for i in range(5):
        p.apply_async(calc,args=(i,))
    p.close()  # 下面要執行join,上面必須先close
    p.join()
    delta=(datetime.datetime.now()-start).total_seconds()
    print (delta)

結果以下

python數據結構和GIL及多進程

進程建立的多,使用進程池進行處理仍是一種比較好的處理方式

5 多進程和多線程的選擇

1 選擇

1 CPU 密集型
Cpython 中使用了GIL,多線程的時候互相競爭,且多核優點不能發揮,python使用多進程效率更高

2 IO密集型

適合使用多線程,減小IO序列化開銷,且在IO等待時,切換到其餘線程繼續執行,效率不錯,固然多進程也適用於IO密集型

2 應用

請求/應答模型: WEB應用中常見的處理模型

master啓動多個worker工做進程,通常和CPU數目相同
worker工做進程中啓動多個線程,提升併發處理能力,worker處理用戶的請求,每每須要等待數據
這就是nginx的工做模式

工做進程通常都和CPU核數相同,CPU的親原性,進程在CPU的遷移成本比較高。

三 concurrent包

1 概念

concurrent.futures
3.2 版本引入的模塊
異步並行任務編程模塊,提供一個高級的異步可執行的便利接口

提供了2個池執行器

ThreadPoolExecutor 異步調用的線程池的Executor
ProcessPoolExecutor 異步調用進程池的Executor

2 參數詳解

方法 含義
ThreadPoolExecutor(max_workers=1) 池中至多建立max_workers個線程的池來同時異步執行,返回Executor實例
submit(fn,*args,**kwagrs) 提交執行的函數及參數,返回Future實例
shutdown(wait=True) 清理池

Future 類

方法 含義
result() 能夠查看調用的返回結果
done() 若是調用被成功的取消或者執行完成,則返回爲True
cancelled() 若是調用被成功取消,返回True
running() 若是正在運行且不能被取消,則返回True
cancel() 嘗試取消調用,若是已經執行且不能取消則返回False,不然返回True
result(timeout=None) 取返回的結果,超時時爲None,一直等待返回,超時設置到期,拋出concurrent.futures.TimeoutError異常
execption(timeout=None) 取返回的異常,超時爲None,一直等待返回,超時設置到期,拋出concurrent.futures.TimeoutError異常

3 線程池相關實例

import  logging
import threading
from   concurrent  import  futures
import logging
import  time

logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")

def worker(n):  # 定義將來執行的任務
    logging.info("begin to work{}".format(n))
    time.sleep(5)
    logging.info("finished{}".format(n))
# 建立一個線程池,池容量爲3
executor=futures.ThreadPoolExecutor(max_workers=3)

fs=[]
for i in range(3):
    f=executor.submit(worker,i)  # 傳入參數,返回Future對象
    fs.append(f)

for  i in range(3,6):
    f=executor.submit(worker,i)  # 傳入參數,返回Future對象
    fs.append(f)
while True:
    time.sleep(2)
    logging.info(threading.enumerate())  #返回存活線程列表
    flag=True
    for  f  in fs:
        logging.info(f.done()) # 若是被成功調用或取消完成,此處返回爲True
        flag=flag  and f.done()  # 若都調用成功,則返回爲True,不然則返回爲False
    if flag:
        executor.shutdown()  # 若是所有調用成功,則須要清理池
        logging.info(threading.enumerate())
        break

結果以下

python數據結構和GIL及多進程

其線程池中的線程是持續使用的,一旦建立好的線程,其不會變化,惟一很差的就是線程名未發生變化,但其最多影響了打印效果

4 進程池相關實例

import  logging
import threading
from   concurrent  import  futures
import logging
import  time

logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")

def worker(n):  # 定義將來執行的任務
    logging.info("begin to work{}".format(n))
    time.sleep(5)
    logging.info("finished{}".format(n))
# 建立一個進程池,池容量爲3
executor=futures.ProcessPoolExecutor(max_workers=3)

fs=[]
for i in range(3):
    f=executor.submit(worker,i)  # 傳入參數,返回Future對象
    fs.append(f)

for  i in range(3,6):
    f=executor.submit(worker,i)  # 傳入參數,返回Future對象
    fs.append(f)
while True:
    time.sleep(2)
    flag=True
    for  f  in fs:
        logging.info(f.done()) # 若是被成功調用或取消完成,此處返回爲True
        flag=flag  and f.done()  # 若都調用成功,則返回爲True,不然則返回爲False
    if flag:
        executor.shutdown()  # 若是所有調用成功,則須要清理池
        break

結果以下

python數據結構和GIL及多進程

5 支持上下文管理

concurrent.futures.ProcessPoolExecutor 繼承自concurrent.futures.base.Executor,而父類有enter,_exit方法,其是支持上下文管理的,可使用with語句

import  logging
import threading
from   concurrent  import  futures
import logging
import  time

logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")

def worker(n):  # 定義將來執行的任務
    logging.info("begin to work{}".format(n))
    time.sleep(5)
    logging.info("finished{}".format(n))
fs=[]
with   futures.ProcessPoolExecutor(max_workers=3) as executor:
    for  i in range(6):
        futures=executor.submit(worker,i)
        fs.append(futures)
while True:
    time.sleep(2)
    flag=True
    for  f  in fs:
        logging.info(f.done()) # 若是被成功調用或取消完成,此處返回爲True
        flag=flag  and f.done()  # 若都調用成功,則返回爲True,不然則返回爲False
    if flag:
        executor.shutdown()  # 若是所有調用成功,則須要清理池
        break

結果以下

python數據結構和GIL及多進程

6 總結

統一了線程池,進程池的調用,簡化了編程,是python簡單的思想哲學的提現惟一缺點: 沒法設置線程名稱

相關文章
相關標籤/搜索