全稱global interpreter lock 全局解釋鎖html
gil使得python同一個時刻只有一個線程在一個cpu上執行字節碼,而且沒法將多個線程映射到多個cpu上,即不能發揮多個cpu的優點。python
gil會根據執行的字節碼行數以及時間片釋放gil,也會在遇到IO操做時候主動釋放。安全
操做系統可以調動的最小單元就是線程。最開始是進程,由於進程對資源的消耗大,因此演變成了線程。多線程
對於IO操做來講,多線程和多進程性能差異不大。app
import threading import time def get_html(url): print('get html started') time.sleep(2) print('get html ended') def get_url(url): print('get url started') time.sleep(2) print('get url ended') get_html = threading.Thread(target=get_html, args=('url1',)) get_url = threading.Thread(target=get_url, args=('url2',)) if __name__ =='__main__': start_time = time.time() get_html.start() get_url.start() print(time.time() - start_time) 輸出結果: get html started get url started 0.0009999275207519531 get html ended get url ended
此處由於自定義了兩個線程,可是實際有三個線程,(還有一個主線程)由於直接線程.start()是非阻塞的,因此先會運行打印時間,而後再結束上面兩個線程。若是想要等上面兩個線程結束以後再執行主線程打印出時間話(即阻塞)能夠有兩種方法函數
①在線程開始前加入語言:(只要主線程結束以後就結束整個程序,Kill全部的子線程)性能
get_html.setDaemon(True)ui
get_url.setDaemon(True)編碼
②在線程開始以後加入語言(將等待線程運行結束以後再往下繼續執行代碼):url
get_html.join()
get_url.join()
import threading import time class GetHtml(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print('get html started') time.sleep(2) print('get html ended') class GetUrl(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print('get url started') time.sleep(2) print('get url ended') get_html = GetHtml('HTML') get_url = GetUrl('URL') if __name__ =='__main__': start_time =time.time() get_html.start() get_url.start() get_html.join() get_url.join() print(time.time() - start_time) 輸出結果: get html started get url started get html ended get url ended 2.0011143684387207
import time import threading url_list = [] def get_html(): global url_list url = url_list.pop() print('get html form {} started'.format(url)) time.sleep(2) print('get html from {} ended'.format(url)) def get_url(): global url_list print('get url started') time.sleep(2) for i in range(20): url_list.append('http://www.baidu.com/{id}'.format(id=i)) print('get url ended') if __name__ == '__main__': thread_url = threading.Thread(target=get_url) for i in range(10): thread_html = threading.Thread(target=get_html) thread_html.start()
上述代碼比較原始,不靈活,能夠將全局變量url_list經過參數傳入函數調用
import time
import threading url_list = [] def get_html(url_list): url = url_list.pop() print('get html form {} started'.format(url)) time.sleep(1) print('get html from {} ended'.format(url))
def add_url(url_list): print('add url started') time.sleep(1) for i in range(20): url_list.append('http://www.baidu.com/{id}'.format(id=i)) print('add url ended') if __name__ == '__main__': thread_url = threading.Thread(target=add_url, args=(url_list,)) thread_url.start() thread_url.join() for i in range(20): thread_html = threading.Thread(target=get_html, args=(url_list,)) thread_html.start()
還有一種方式爲新建一個py文件,而後在文件中定義一個變量,url_list = [] 而後開頭的時候用import導入這個變量便可。這種方式對於變量不少的狀況下爲避免混亂統一將變量進行管理。可是此方式必定要注意import的時候只要import到文件,而不要import到變量。(好比說文件名爲variables.python內定義一個變量名url_list=[], 須要import variables,而後代碼中用variables.url_list 而不是 from variables import url_list 由於後一種方式導入的話,在其餘線程修改此變量的時候,咱們是看不到的。可是第一種方式能夠看到。
總結:無論以何種形式共享全局變量,都不是線程安全的操做,因此爲了達到線程安全,就須要用到線程鎖,lock的機制,代碼就會比較複雜,全部引入了一種安全的線程通訊,from queue import Queue
import time import threading from queue import Queue def get_html(queue): url = queue.get() print('get html form {} started'.format(url)) time.sleep(1) print('get html from {} ended'.format(url)) def add_url(queue): print('add url started') time.sleep(1) for i in range(20): queue.put('http://www.baidu.com/{id}'.format(id=i)) print('add url ended') if __name__ == '__main__': url_queue = Queue(maxsize=1000) # 設置隊列中元素的max個數。 thread_url = threading.Thread(target=add_url, args=(url_queue,)) thread_url.start() thread_url.join() list1=[] for i in range(20): thread_html = threading.Thread(target=get_html, args=(url_queue,)) list1.append(thread_html) for i in list1: i.start()
線程的同步(即當有一個線程在對內存進行操做時,其餘線程都不能夠對這個內存地址進行操做,直到該線程完成操做, 其餘線程才能對該內存地址進行操做,而其餘線程又處於等待狀態)
問題:既然python有GIL機制,那麼線程就是安全的,那麼爲何還有線程同步問題?
回到上面GIL的介紹(gil會根據執行的字節碼行數以及時間片釋放gil,也會在遇到IO操做時候主動釋放)
再看一個經典的案列:若是GIL使線程絕對安全的話,那麼最後結果恆爲0,事實卻不是這樣。
from threading import Thread total = 0 def add(): global total for i in range(1000000): total += 1 def desc(): global total for i in range(1000000): total -= 1 thread1 = Thread(target=add) thread2 = Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
312064
結果打印不穩定,都不會0,
注意,鎖的獲取和釋放也須要時間,因而會對程序的運行性能產生必定的影響。並且極易形成死鎖,因而對應的能夠將Lock改成Rlock,就能夠支持同時多個acquire進入鎖,可是必定注意,Rlock只在單線程內起做用,而且acquire次數要和release次數想等。
import threading from threading import Lock l = Lock() a = 0 def add(): global a global l l.acquire() for i in range(1000000): a += i l.release() # 記得線程段結束運行以後必定須要解鎖。否則其餘程序就阻塞了。 def desc(): global a global l l.acquire() for i in range(1000000): a -= i l.release() thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() # 再次注意若是線程只是start()沒有join()的話,那麼任意線程執行完了就會往下執行print語句,可是若是加了join的話,就會等thread1和thread2運行完以後在運行下面的語句。 thread2.join() print(a) 輸出結果恆爲0
複雜的線程通信的話lock機制已經再也不適用,例如:
from threading import Condition, Thread, Lock # 條件變量,用複雜的線程間的同步 lock = Lock() class Tom(Thread): def __init__(self, lock): self.lock = lock super().__init__(name='Tom') def run(self): self.lock.acquire() print('{}: hello, Bob.'.format(self.name)) self.lock.release() self.lock.acquire() print("{}: Let's have a chat.".format(self.name)) self.lock.release() class Bob(Thread): def __init__(self, lock): self.lock = lock super().__init__(name='Bob') def run(self): self.lock.acquire() print('{}: Hi, Tom.'.format(self.name)) self.lock.release() self.lock.acquire() print("{}:Well, I like to talk to you.".format(self.name)) self.lock.release() tom = Tom(lock) bob = Bob(lock) tom.start() bob.start() Tom: hello, Bob. Tom: Let's have a chat. Bob: Hi, Tom. Bob:Well, I like to talk to you.
爲何會這樣?緣由很簡單,Tom在start()的時候,尚未來得及Bob start()以前就將全部的邏輯執行完了,其次,GIL切換的時候是根據時間片或者字節碼行數來的,即也可能由於在時間片內將Tom執行完畢以後才切換到Bob。因而引入了條件變量機制,condition,
看condition原代碼能夠了解到,其集成了魔法方法__enter__ 和 __exit__因而能夠用with語句調用,在__enter__方法中,調用了
def __enter__(self): return self._lock.__enter__()
而__enter__() 方法則直接調用了acquire方法, 同時acquire其實就是調用了Rlock.acquire()方法。因此condition內部其實仍是使用了Rlock方法來實現。同理__exit__則調用了Rlock.release()
重要方法 wait()和notify()
wait()容許咱們等待某個條件變量的通知,而notify()方法則是發送一個通知。因而就能夠修改上述代碼:
from threading import Condition, Thread, Lock # 條件變量,用複雜的線程間的同步 class Tom(Thread): def __init__(self, condition): self.condition = condition super().__init__(name='Tom') def run(self): with self.condition: print('{}: hello, Bob.'.format(self.name)) self.condition.notify() self.condition.wait() print("{}: Let's have a chat.".format(self.name)) self.condition.notify() class Bob(Thread): def __init__(self, condition): self.condition = condition super().__init__(name='Bob') def run(self): with self.condition: self.condition.wait() print('{}: Hi, Tom.'.format(self.name)) self.condition.notify() self.condition.wait() print("{}:Well, I like to talk to you.".format(self.name)) if __name__ == '__main__': condition = Condition() tom = Tom(condition) bob = Bob(condition) bob.start() tom.start()
上述代碼注意:
用於控制進入某段代碼線程的數量,好比說作爬蟲的時候,在請求頁面的時候防止線程數量過多,短期內請求頻繁被發現,可使用semaphore來控制進入請求的線程數量。
from threading import Thread, Semaphore, Condition, Lock, RLock import time class GetHtml(Thread): def __init__(self, url, sem): super().__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print('get html successful.') self.sem.release() # 開啓以後記得要釋放。 class GetUrl(Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): self.sem.acquire() # 開啓semaphore get_html = GetHtml('www.baidu.com/{}'.format(i), self.sem) get_html.start() if __name__ == '__main__': sem = Semaphore(3) # 接受一個參數,設置最大進入的線程數爲3 get_url = GetUrl(sem) get_url.start()
from concurrent import futures
出了控制線程數量的其它功能:
import time from concurrent.futures import ThreadPoolExecutor def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) task1 = excutor.submit(get_html, 3) #task1爲一個Tuture類對象, submit方法是非阻塞的,當即返回的。第二個參數爲函數參數 tesk2 = excutor.submit(get_html, 2) print(task1.done()) # 判斷函數是否執行成功 輸出結果: False get page2 success get page3 success
分析:由於submit方法是非阻塞的,當即返回的。後面的print代碼不會等待task1運行結束。若是加入等待時間等待task1完成則將返回True:
import time from concurrent.futures import ThreadPoolExecutor def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) task1 = excutor.submit(get_html, 3) #task1爲一個futures類對象, submit方法是非阻塞的,當即返回的。第二個參數爲函數參數 tesk2 = excutor.submit(get_html, 2) print(task1.done()) # 判斷函數是否執行成功 time.sleep(4) print(task1.done()) 輸出結果: False get page2 success get page3 success True
代碼後面加入
print(task1.result()) # 用result()方法能夠獲取到線程函數返回的結果。
能夠用result()方法能夠獲取到線程函數返回的結果。
用代碼:print(task1.cancel())能夠將task1在運行以前取消掉,若是取消成功則返回True,反之False
import time from concurrent.futures import ThreadPoolExecutor def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=1) # 將線程池數量改成1,讓tesk2先等待不執行,方便取消。 task1 = excutor.submit(get_html, 3) #task1爲一個futures類對象, submit方法是非阻塞的,當即返回的。第二個參數爲函數參數 tesk2 = excutor.submit(get_html, 2) print(task1.done()) # 判斷函數是否執行成功 print(tesk2.cancel()) time.sleep(4) print(task1.done()) print(task1.result()) # 用result()方法能夠獲取到線程函數返回的結果。 輸出結果:(結果無get page 2 sucess) False True get page3 success True 3
import time from concurrent.futures import ThreadPoolExecutor, as_completed def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] all_task = [excutor.submit(get_html, url) for url in urls] for futures in as_completed(all_task): data = futures.result() print('get {} page'.format(data)) 輸出結果: get page2 success get 2 page get page3 success get 3 page get page4 success get 4 page
代碼分析:能夠看到由於excutor.submit()是非阻塞的,由打印結果能夠看出,沒一個線程執行成功以後,as_complete()就會拿到其結果。
import time from concurrent.futures import ThreadPoolExecutor, as_completed def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] for data in excutor.map(get_html, urls): print('get {} page'.format(data)) 結果: get page2 success get page3 success get 3 page get 2 page get page4 success get 4 page
能夠看到用excutor.map方法不是完成一個打印一個,而是按照參數列表中的順序,先get第一個參數結果,而後依次get,推薦可使用第一種as_complete()方式。
等待全部線程完成以後再往下走,wait()裏面也能夠選擇參數return_when,默認是ALL_COMPLETE,若是爲FIRST_COMPLETE(注意該參數須要在前面的import先導入)則第一個執行完成以後就會往下執行。
import time from concurrent.futures import ThreadPoolExecutor, as_completed, wait def get_html(times): time.sleep(times) print('get page{} success'.format(times)) return times excutor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] all_task = [excutor.submit(get_html, url) for url in urls] wait(all_task) print('主線程結束') 打印結果: get page2 success get page3 success get page4 success 主線程結束