操做系統線程理論html
1,線程概念的引入背景python
1.1,有了進程爲何要有線程mysql
進程有不少優勢,它提供了多道編程,讓咱們感受咱們每一個人都擁有本身的CPU和其餘資源,能夠提升計算機的利用率。不少人就不理解了,既然進程這麼優秀,爲何還要線程呢?其實,仔細觀察就會發現進程仍是有不少缺陷的,主要體如今兩點上:git
進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。程序員
進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。github
若是這兩個缺點理解比較困難的話,舉個現實的例子也許你就清楚了:若是把咱們上課的過程當作一個進程的話,那麼咱們要作的是耳朵聽老師講課,手上還要記筆記,腦子還要思考問題,這樣才能高效的完成聽課的任務。而若是隻提供進程這個機制的話,上面這三件事將不能同時執行,同一時間只能作一件事,聽的時候就不能記筆記,也不能用腦子思考,這是其一;若是老師在黑板上寫演算過程,咱們開始記筆記,而老師忽然有一步推不下去了,阻塞住了,他在那邊思考着,而咱們呢,也不能幹其餘事,即便你想趁此時思考一下剛纔沒聽懂的一個問題都不行,這是其二。算法
如今你應該明白了進程的缺陷了,而解決的辦法很簡單,咱們徹底可讓聽、寫、思三個獨立的過程,並行起來,這樣很明顯能夠提升聽課的效率。而實際的操做系統中,也一樣引入了這種相似的機制——線程。sql
1.2,線程的出現數據庫
TCB包括如下信息: (1)線程狀態。 (2)當線程不運行時,被保存的現場資源。 (3)一組執行堆棧。 (4)存放每一個線程的局部變量主存區。 (5)訪問同一個進程中的主存和其它資源。 用於指示被執行指令序列的程序計數器、保留局部變量、少數狀態參數和返回地址等的一組寄存器和堆棧。
多個線程共享同一個進程的地址空間中的資源,是對一臺計算機上多個進程的模擬,有時也稱線程爲輕量級的進程。編程
而對一臺計算機上多個進程,則共享物理內存、磁盤、打印機等其餘物理資源。多線程的運行也多進程的運行相似,是cpu在多個線程之間的快速切換。
不一樣的進程之間是充滿敵意的,彼此是搶佔、競爭cpu的關係,若是迅雷會和QQ搶資源。而同一個進程是由一個程序員的程序建立,因此同一進程內的線程是合做關係,一個線程能夠訪問另一個線程的內存地址,你們都是共享的,一個線程乾死了另一個線程的內存,那純屬程序員腦子有問題。
相似於進程,每一個線程也有本身的堆棧,不一樣於進程,線程庫沒法利用時鐘中斷強制線程讓出CPU,能夠調用thread_yield運行線程自動放棄cpu,讓另一個線程運行。
線程一般是有益的,可是帶來了不小程序設計難度,線程的問題是:
1. 父進程有多個線程,那麼開啓的子線程是否須要一樣多的線程
2. 在同一個進程中,若是一個線程關閉了文件,而另一個線程正準備往該文件內寫內容呢?
所以,在多線程的代碼中,須要更多的心思來設計程序的邏輯、保護程序的數據。
6,用戶級線程和內核級線程(瞭解)
線程的實現能夠分爲兩類:用戶級線程(User-Level Thread)和內核線線程(Kernel-Level Thread),後者又稱爲內核支持的線程或輕量級進程。在多線程操做系統中,各個系統的實現方式並不相同,在有的系統中實現了用戶級線程,有的系統中實現了內核級線程
6.1,用戶級線程
內核的切換由用戶態程序本身控制內核切換,不須要內核干涉,少了進出內核態的消耗,但不能很好的利用多核Cpu。
在用戶空間模擬操做系統對進程的調度,來調用一個進程中的線程,每一個進程中都會有一個運行時系統,用來調度線程。此時當該進程獲取cpu時,進程內再調度出一個線程去執行,同一時刻只有一個線程執行。
6.2,內核級線程
內核級線程:切換由內核控制,當線程進行切換的時候,由用戶態轉化爲內核態。切換完畢要從內核態返回用戶態;能夠很好的利用smp,即利用多核cpu。windows線程就是這樣的。
6.3,用戶級與內核級線程的對比
1 內核支持線程是OS內核可感知的,而用戶級線程是OS內核不可感知的。 2 用戶級線程的建立、撤消和調度不須要OS內核的支持,是在語言(如Java)這一級處理的;而內核支持線程的建立、撤消和調度都需OS內核提供支持,並且與進程的建立、撤消和調度大致是相同的。 3 用戶級線程執行系統調用指令時將致使其所屬進程被中斷,而內核支持線程執行系統調用指令時,只致使該線程被中斷。 4 在只有用戶級線程的系統內,CPU調度仍是以進程爲單位,處於運行狀態的進程中的多個線程,由用戶程序控制線程的輪換運行;在有內核支持線程的系統內,CPU調度則以線程爲單位,由OS的線程調度程序負責線程的調度。 5 用戶級線程的程序實體是運行在用戶態下的程序,而內核支持線程的程序實體則是能夠運行在任何狀態下的程序。
優勢:當有多個處理機時,一個進程的多個線程能夠同時執行。
缺點:由內核進行調度。
優勢:
線程的調度不須要內核直接參與,控制簡單。
能夠在不支持線程的操做系統中實現。
建立和銷燬線程、線程切換代價等線程管理的代價比內核線程少得多。
容許每一個進程定製本身的調度算法,線程管理比較靈活。
線程可以利用的表空間和堆棧空間比內核級線程多。
同一進程中只能同時有一個線程在運行,若是有一個線程使用了系統調用而阻塞,那麼整個進程都會被掛起。另外,頁面失效也會產生一樣的問題。
缺點:
資源調度按照進程進行,多個處理機下,同一個進程中的線程只能在同一個處理機下分時複用
用戶級:一次只能調用一個線程,多線程併發,是由分時複用實現。
內核級:一個進程的多線程能夠同時執行
6.4,混合實現
用戶級與內核級的多路複用,內核同一調度內核線程,每一個內核線程對應n個用戶線程
線程和python——理論知識
1,全局解釋器鎖GIL
Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中能夠「運行」多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。
在多線程環境中,Python 虛擬機按如下方式執行:
a、設置 GIL;
b、切換到一個線程去運行;
c、運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用 time.sleep(0));
d、把線程設置爲睡眠狀態;
e、解鎖 GIL;
d、再次重複以上全部步驟。
在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束爲止(因爲在這期間沒有Python的字節碼被運行,因此不會作線程切換)編寫擴展的程序員能夠主動解鎖GIL。
2,python線程模塊的選擇
Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊容許程序員建立和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構。
避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程結束時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做,至少threading模塊能確保重要的子線程退出後進程才退出。
thread模塊不支持守護線程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出。而threading模塊支持守護線程,守護線程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就在那等着,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。
threading模塊
multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹(官方連接)
1,線程的建立Threading.Thread類
1.1,使用Thread開啓線程,是異步的
import os
from threading import Thread
def func():
print('hello world',os.getpid())
t = Thread(target = func)
t.start()
print(os.getpid())
1.2,等待子線程執行完,在執行主線程後面的,且子線程仍是異步。
# import os
# import time
# from threading import Thread
#
# def func(): # 子線程
# time.sleep(1)
# print('hello world',os.getpid())
# thread_lst = []
# for i in range(10):
# t = Thread(target=func)
# t.start()
# thread_lst.append(t)
# [t.join() for t in thread_lst]
# print(os.getpid()) # 主線程
1.3,開啓線程的另外一種方式
計算開啓的線程數量,多個子線程 共享 這個數據
# import time
# import os
# import time
# from threading import Thread
#
# class MyThread(Thread):
# count = 0 # 靜態屬性 # 各線程共享
# def __init__(self,arg1,arg2):
# super().__init__()
# self.arg1 = arg1
# self.arg2 = arg2
# def run(self):
# MyThread.count += 1
# time.sleep(1)
# print('%s,%s,%s,%s'%(self.arg1,self.arg2,self.name,os.getpid()))
#
# for i in range(10):
# t = MyThread(i,i*'*')
# t.start()
# print(t.count)
1.4,Thread類的其餘方法
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。
threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
import time
import threading
def func(i):
time.sleep(0.5)
# print(i,threading.currentThread())
print(i,threading.currentThread().name,threading.currentThread().ident)
# ident 線程id
for i in range(10):
t = threading.Thread(target=func,args=(i,))
t.start()
# print(len(threading.enumerate())) # 返回正在運行着的線程列表,返回11個(主線程也是)
print(threading.activeCount()) # 當前還存活着的線程。
2,多線程與多進程
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每一個進程都有不一樣的pid
p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid())
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啓線程
t=Thread(target=work) t.start() print('主線程/主進程') ''' 打印結果: hello 主線程/主進程 '''
#在主進程下開啓子進程
t=Process(target=work) t.start() print('主線程/主進程') ''' 打印結果: 主線程/主進程 hello '''
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100
# p=Process(target=work)
# p.start()
# p.join()
# print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100
n=1 t=Thread(target=work) t.start() t.join() print('主',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據
同一進程內的線程共享該進程的數據?
3,實例
import socket from threading import Thread def func(conn): conn.send(b'hello') ret = conn.recv(1024) print(ret) conn.close() sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() Thread(target=func,args=(conn,)).start() sk.close()
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) ret = sk.recv(1024) print(ret) msg = input('>>>') sk.send(msg.encode('utf-8')) sk.close()
守護線程程
1) 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,
2) 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而主進程必須保證非守護線程都運行完畢後才能結束。
1,建立守護線程
import time from threading import Thread def func(): print('開始執行子線程') time.sleep(3) print('子線程執行完畢') t = Thread(target=func) t.setDaemon(True) # 必須在t.start()以前設置
# 線程設置是個方法 # 進程設置守護進程 是一個屬性 daemon = True t.start() # t是個守護線程 t2 = Thread(target=func) t2.start() # t2是個線程 # t2.join() # 等待t2結束
1.1,上面代碼結尾1
t2 = Thread(target=func)
t2.start()
# 主進程代碼執行完畢,守護進程就結束了,主進程沒有結束等待 t2 線程結束。
# 主線程還沒結束 ,等待t2繼續執行,t2執行完畢, 主線程結束(也意味着主進程結束)
1.2,上面代碼結尾2
t2 = Thread(target=func)
t2.start()
t2.join() # 等待t2結束 執行完這句話代碼才執行完畢
# 主線程中沒有運行的線程了,守護線程結束,該代碼的主線程結束。
# 且主進程沒有也沒有代碼了,守護進程結束,該代碼的主進程結束。
線程——鎖
1,鎖與GIL
1)雖然有線程鎖,即cpu只給每一個線程一個固定的運行時間,該時間內即便沒運行完,cpu也會換下一個線程處理。依次輪換。
2)仍是存在安全問題,如,全部線程對一個數據進行修改,其中一個線程拿到了要修改的數據,並存入本身的線程,但此時正好cpu給的時間到了,該線程中止,此時,原數據並無被修改爲處理後的數據,下一個線程仍是取這個數據,存入本身的線程,而後時間又到了,cpu又處理下一個線程,當再次輪到未處理完的這些線程時,線程繼續以前未進行完的處理,那麼問題來了,此時,每一個線程事實上都處理了一變,例如原數據是一百,每一個線程都是取數據減一,然而這些線程拿到的數據都是一百,處理後都是99,而本來的意圖是,數據每次減一。
1.1,存在上面所說的問題的代碼
# 下面的程序,有線程鎖就能夠知足,防止數據不安全
# 可是,加上延遲後(模擬處理時間長)後出現數據不安全。
import time
from threading import Thread
def func():
global n
# n -= 1
temp = n # 從進程中獲取n
time.sleep(0.01)
n = temp-1 # 獲得結果,再存儲回進程
n = 100
t_lst = []
for i in range(100):
t = Thread(target=func)
t.start()
t_lst.append(t)
[t.join() for t in t_lst]
print(n)
1.2,解決上面的問題,對數據上鎖。
import time
from threading import Thread
from threading import Lock
def func():
global n
# time.sleep(2) # 用join,此處線程
lock.acquire()
temp = n
time.sleep(0.01)
n = temp-1 # 獲得結果,再存儲回進程
lock.release()
n = 100
lock = Lock()
t_lst = []
for i in range(100):
t = Thread(target=func)
t.start()
t_lst.append(t)
[t.join() for t in t_lst]
print(n)
# GIL 不是鎖數據 而是鎖線程
# 在多線程中 特殊狀況 仍然要加鎖 對數據
# 會變慢,由於加鎖後每一個線程依次sleep,不是一塊兒sleep。
2,死鎖
1)科學家吃麪 : 現有四我的吃一份面,吃麪需知足兩個條件,一、拿到筷子(鎖住),二、拿到面(鎖住)。
2)死鎖現象:定義了兩種吃法,一、先拿面後拿筷子,二、先拿筷子後拿面。 會出現一我的拿到了筷子,一我的拿到面,而後都再等着拿另外一個,而後誰也吃不成。
2.1,死鎖現象
import time from threading import Lock from threading import Thread kz = Lock() m = Lock() def eat(name): kz.acquire() # 拿鑰匙
print('%s拿到筷子了'%name) m.acquire() print('%s拿到面了'%name) print('%s吃麪'%name) m.release() kz.release() def eat2(name): m.acquire() print('%s拿到面了' % name) time.sleep(1) kz.acquire() print('%s拿到筷子了' % name) print('%s吃麪' % name) kz.release() m.release() Thread(target=eat,args=('哪吒',)).start() Thread(target=eat2,args=('egon',)).start() Thread(target=eat,args=('苑昊',)).start() Thread(target=eat2,args=('金老闆',)).start()
3,遞歸鎖
3.1,解決死鎖: 用RLock
遞歸鎖:再一個線程裏,同一把鎖能夠鎖多層。
from threading import RLock # 遞歸鎖
from threading import Lock # 互斥鎖
lock = RLock() # 遞歸鎖
lock.acquire() # 開一層鎖
lock.acquire() # 再開一層鎖
lock.acquire() # 再開一層鎖
print(123) # 被鎖的三層
lock.release() # 鎖上最裏層
lock.release() # 再鎖一層
lock.release() # 鎖最外層
3.2,爲何不用互斥鎖這樣寫?
from threading import Lock # 互斥鎖
kz.release() # 互斥鎖
kz.acquire() # 開鎖
kz.acquire() # 雖然是同一把鎖,但仍是等着拿鑰匙。
print(123)
kz.release()
kz.release()
3.3,選 鎖 或者 遞歸鎖 ?
在多線程併發的狀況下,同一個線程中 若是出現屢次acquire 就可能產生死鎖線程現象,用遞歸鎖就能避免
線程——信號量
1,信號量
import time
import random
from threading import Thread
from threading import Semaphore
def func(n,sem):
sem.acquire()
print('thread -%s start'%n)
time.sleep(random.random())
print('thread -%s done' % n)
sem.release()
sem = Semaphore(5) # 一把鎖有5把鑰匙
for i in range(20):
Thread(target=func,args=(i,sem)).start()
2,信號量 和 線程池 有什麼區別?
相同點 :
在信號量acquire以後,和線程池同樣 同時在執行的只能有n個
不一樣點 :
開的線程數不同 線程池來講 一共就只開5個線程 信號量有幾個任務就開幾個線程
3,對有信號量限制的程序來講 能夠同時執行不少線程麼?
實際上 信號量並不影響線程或者進程的併發,只是在加鎖的階段進行流量限制。即池中最多開五個線程,而信號量是開全部的設定的而線程,即在執行有加鎖函數線程時,沒有加鎖的部分全部線程都會執行,沒有限制,只是在執行到加鎖部分時,才進行流量限制。
# 上面的代碼
def func(n,sem): # 全部線程都會開啓 print('123') # 未加鎖部分都會無限制執行,沒有進房間前 print('123') sem.acquire() # 加鎖部分進行流量限制 ,限制進入房間的數量。 print('thread -%s start'%n) time.sleep(random.random()) print('thread -%s done' % n) sem.release()
線程——事件
# 設置事件內部會有一個flag標誌,開始時flag = False
# wait() 時 flag = False 阻塞,flag = True 非阻塞
# set() 將 flag = True
# clear() 將 flag = False
1,例子:鏈接MySQL數據庫
1)我鏈接三次數據庫,每0.5秒鏈接一次
2)建立一個事件 來標誌數據庫的鏈接狀況,若是鏈接成功,就顯示成功,不然 就報錯 主動拋異常TimeoutError
import time
import random
from threading import Event
from threading import Thread
def conn_mysql(): # 鏈接數據庫
count = 1
while not e.is_set(): # 當事件的flag爲False時才執行循環內的語句
if count>3:
raise TimeoutError
print('嘗試鏈接第%s次'%count)
count += 1
e.wait(0.5) # 一直阻塞變成了只阻塞0.5
print('鏈接成功') # 收到check_conn函數內的set指令,讓flag變爲True跳出while循環,執行本句代碼
def check_conn():
'''
檢測數據庫服務器的鏈接是否正常
'''
time.sleep(random.randint(1,2)) # 模擬鏈接檢測的時間
e.set() # 告訴事件的標誌數據庫能夠鏈接
e = Event()
check = Thread(target=check_conn)
check.start()
conn = Thread(target=conn_mysql)
conn.start()
定時器 Timer
from threading import Timer
def hello():
print("hello, world")
while True: # 每隔一段時間要開啓一個線程
t = Timer(10, hello) # 定時開啓一個線程,執行一個任務
# 定時 : 多久以後 單位是s
# 要執行的任務 :函數名
t.start()
也能夠用下面的方式:
def hello():
while True: # 這樣作這個線程一直在
time.sleep(10)
print("hello, world")
用哪一個?看sleep的時間
# sleep的時間短 就在線程內while True
# sleep的時間長 就在主線程while True
條件
import threading
def run(n):
con.acquire()
con.wait() # 等着信號
print("run the thread: %s" % n)
con.release()
if __name__ == '__main__':
con = threading.Condition() # 條件 = 鎖 + wait的功能
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire() # condition中的鎖 是 遞歸鎖
if inp == 'all':
con.notify_all() # 所有放行
else:
con.notify(int(inp)) # 傳遞信號 notify(1) --> 能夠放行一個線程 # 設置放行幾個線程
con.release()
隊列
1,隊列
# import queue
# q = queue.Queue() # 隊列 線程安全的 # q.get() # q.put() # q.qsize()
2,後進先出
# import queue
# lfq = queue.LifoQueue() # 後進先出 :棧 # lfq.put(1) # lfq.put(2) # lfq.put(3) # lfq.put(4) # print(lfq.get()) # print(lfq.get()) # print(lfq.get()) # print(lfq.get())
3,優先級
import queue
pq = queue.PriorityQueue()
# 有優先級的隊列 # 值越小越優先,值相同就asc碼小的先出
pq.put((1,'z'))
pq.put((1,'b'))
pq.put((15,'c'))
pq.put((2,'d'))
#
print(pq.get())
print(pq.get())
線程池
# from concurrent import futures
# futures.ThreadPoolExecutor # 開線程池
# futures.ProcessPoolExecutor # 開進程池
# 進程池:cpu個數 + 1(默認數量)
# 線程池:cpu個數 * 5 (默然數量)
1,建立線程池
import time import random from concurrent import futures def funcname(n): print(n) time.sleep(random.randint(1,3)) return n*'*' thread_pool = futures.ThreadPoolExecutor(5) # 建立有5個線程的線程池
f_lst = [] for i in range(10): f = thread_pool.submit(funcname,i) # submit 合併了建立線程對象和start的功能 # print(f.result()) # 直接在這打印就變成同步的了
f_lst.append(f) # 經過將線程對象放入列表,還保持異步
[print(i.result()) for i in f_lst] # 必定是按順序輸出結果,遇到沒執行玩的線程對象,f.result會阻塞。
2,就想等全部線程執行完了再輸出結果
import time import random from concurrent import futures def funcname(n): print(n) time.sleep(random.randint(1,3)) return n*'*' thread_pool = futures.ThreadPoolExecutor(5) # 建立有5個線程的線程池
f_lst = [] for i in range(10): f = thread_pool.submit(funcname,i) # submit 合併了建立線程對象和start的功能 # print(f.result()) # 直接在這打印就變成同步的了
f_lst.append(f) # 經過將線程對象放入列表,還保持異步
thread_pool.shutdown() # 有close() join()的做用 # 沒有join方法
[print(i.result()) for i in f_lst]
2.1,或者用map
# map,天生異步,接收可迭代對象的數據,不支持返回值
thread_pool = futures.ThreadPoolExecutor(5) thread_pool.map(funcname,range(10))
回調函數
1,add_done_callback(回調函數的名字)
import time
import random
from concurrent import futures
def funcname(n):
print(n)
time.sleep(random.randint(1,3))
return n*'*'
def call(args):
print(args.result())
thread_pool = futures.ThreadPoolExecutor(5)
thread_pool.submit(funcname,1).add_done_callback(call)
****************************************************************************
Python標準模塊--concurrent.futures
https://docs.python.org/dev/library/concurrent.futures.html
#1 介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.
#2 基本方法
#submit(fn, *args, **kwargs)
異步提交任務
#map(func, *iterables, timeout=None, chunksize=1)
取代for循環submit的操做
#shutdown(wait=True)
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前
#result(timeout=None)
取得結果
#add_done_callback(fn)
回調函數
# done()
判斷某一個線程是否完成
# cancle()
取消某個任務
#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised. #用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2
if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. #用法
與ProcessPoolExecutor相同
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2
if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11):
# future=executor.submit(task,i)
executor.map(task,range(1,12)) #map取代了for+submit
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3)
# for url in urls:
# p.apply_async(get_page,args=(url,),callback=pasrse_page)
# p.close()
# p.join()
p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果