PythonI/O進階學習筆記_10.python的多線程

 content:
1. python的GIL
2. 多線程編程簡單示例
3. 線程間的通訊
4. 線程池
5. threadpool Future 源碼分析
 
===========================
 
一. python的GIL
關於python的GIL,有一位博主寫的我以爲真的挺好的,清晰明瞭通俗易懂。 http://cenalulu.github.io/python/gil-in-python/
在這裏就不贅述了,可是注意文章中的試驗結論是基於python2的。python3中已經有所改進因此文中示例未必能得出當時相同的結論。
可是依舊對咱們理解GIL頗有幫助。
 
那總結下來什麼是GIL呢?
global interpreter lock
python前期爲了在多線程編程的時候更爲簡單,因而誕生了GIL。GIL使得同一時刻只有一個線程在一個cpu上執行字節碼,沒法將多個線程映射到多個cpu上。也就使得python沒法實現真正意義上的多線程。
 
那是否是有了GIL是否是就絕對安全了?咱們編碼的時候就不須要考慮線程安全了?
並非,GIL釋放的時間可能那時候進程並無執行完成。
GIL會在適當的時候釋放,好比在字節碼某特定行數以及特定時間片被釋放,也會在遇到io操做的時候主動釋放。
 
二. 多線程編程簡單示例
想要實現開啓線程執行任務,有兩種方法,直接用Thread進行實例化,或者本身實現繼承Thread的子類。
 
1.經過therad類實例化
這種狀況適用於代碼量比較少,邏輯比較簡單的時候
import time
import  threading
def get_detail_html(url):
    print("get detail html start")
    time.sleep(2)
    print("get detail html stop")
 
def get_detail_url(url):
    print("url start")
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    thread1= threading.Thread(target=get_detail_html,args=("",))
    thread2= threading.Thread(target=get_detail_url,args=("",))
    start_time=time.time()
    
    # thread1.setDaemon()
    # thread2.setDaemon()
 
    thread1.start()
    thread2.start()
 
    thread1.join()
    thread2.join()
 
    print("lasttime :{}".format(time.time()-start_time))
    pass

 

2.經過繼承Thread來實現多線程(繼承Thread,完成本身的thread子類)
按這種狀況來寫的話,咱們就須要重載咱們的run方法。(注意是run方法而不是start)
import time
import  threading
 
class thread_get_detail_html(threading.Thread):
    def run(self):
        print("get detail html start")
        time.sleep(2)
        print("get detail html stop")
 
class thread_get_detail_url(threading.Thread):
    def run(self):
        print("url start")
        time.sleep(2)
        print("url end")
 
if __name__=="__main__":
    # thread1= threading.Thread(target=get_detail_html,args=("",))
    # thread2= threading.Thread(target=get_detail_url,args=("",))
    thread1=thread_get_detail_html()
    thread2=thread_get_detail_url()
    start_time=time.time()
 
    # thread1.setDaemon()
    # thread2.setDaemon()
 
    thread1.start()
    thread2.start()
 
    thread1.join()
    thread2.join()
 
    print("lasttime :{}".format(time.time()-start_time))
    pass

以上就能發現,啓動了兩個線程分別執行了thread_get_detail_url和thread_get_detail_url。html

 
三. 線程間的通訊
實際上在二中,是在模擬一個簡易爬蟲的流程。先獲取全部咱們要爬取的url,而後再對每一個url的html頁面內容進行獲取。那麼這就涉及到一個問題了,thread_get_detail_url和thread_get_detail_html之間,須要thread_get_detail_url來的帶一個url列表,而thread_get_detail_html也能獲取這個url列表去進行操做。
這就涉及到線程間的通訊了。
python中經常使用的線程間的這種需求的通訊方式有:

- 全局變量
- Queue消息隊列
假設咱們如今繼續來完成這個爬蟲的正常邏輯。python

1. 線程間的變量傳遞git

1.1 全局變量
import time
import  threading
 
detail_url_list=[]
 
def get_detail_html():
    global detail_url_list
    if len(detail_url_list)==0:
        return
    url=detail_url_list.pop()
    print("get detail html start :{}".format(url))
    time.sleep(2)
    print("get detail html stop :{}".format(url))
 
def get_detail_url():
    global  detail_url_list
    print("url start")
    for i in range(20):
        detail_url_list.append("htttp://www.baidu.com/{id}".format(id=i))
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    start_time=time.time()
    thread1= threading.Thread(target=get_detail_url)
    thread1.start()
    for i in range(10):
        thread_2=threading.Thread(target=get_detail_html)
        thread_2.start()
    print("lasttime :{}".format(time.time()-start_time))
    pass

實際上,還能夠更方便。將變量做爲參數傳遞,在方法中就不須要global了。github

import time
import  threading
 
detail_url_list=[]
def get_detail_html(detail_url_list):
    if len(detail_url_list)==0:
        return
    url=detail_url_list.pop()
    print("get detail html start :{}".format(url))
    time.sleep(2)
    print("get detail html stop :{}".format(url))
 
def get_detail_url(detail_url_list):
    print("url start")
    for i in range(20):
        detail_url_list.append("htttp://www.baidu.com/{id}".format(id=i))
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    start_time=time.time()
    thread1= threading.Thread(target=get_detail_url,args=(detail_url_list,))
    thread1.start()
    for i in range(10):
        thread_2=threading.Thread(target=get_detail_html,args=(detail_url_list,))
        thread_2.start()
 
    print("lasttime :{}".format(time.time()-start_time))
    pass
可是這樣是不能應用於多進程的。
還能夠生成一個variables.py文件,直接import這個文件,這種狀況變量過多的時候,這種方法比較方便。
可是若是咱們直接import變量名,是不能看到其餘進程對這個變量的修改的。
 
可是以上的方法都是線程不安全的操做。想要達到咱們要的效果,就必需要加鎖。因此除非對鎖足夠了解,知道本身在幹嗎,不然並不推薦這種共享變量的方法來進行通訊。
 
1.2 queue消息隊列
 
a.queue實現上述
import time
import  threading
from queue import  Queue
 
def get_detail_html(queue):
    url=queue.get()
    print("get detail html start :{}".format(url))
    time.sleep(2)
    print("get detail html stop :{}".format(url))
 
def get_detail_url(queue):
    print("url start")
    for i in range(20):
        queue.put("htttp://www.baidu.com/{id}".format(id=i))
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    start_time=time.time()
    url_queue=Queue()
    thread1= threading.Thread(target=get_detail_url,args=(url_queue,))
    thread1.start()
    for i in range(10):
        thread_2=threading.Thread(target=get_detail_html,args=(url_queue,))
        thread_2.start()

 

b.queue是如何實現線程安全的?
咱們並不推薦1.1中直接用全局變量的方法,是由於須要咱們本身花精力去維護其中的鎖操做才能實現線程安全。而python的Queue是在內部幫咱們實現了線程安全的。
queue使用了deque deque是在字節碼的程度上就實現了線程安全的
 
c.queue的其餘方法
get_nowait(當即取出一個元素,不等待)(異步)
put_nowait(當即放入一個元素,不等待)(異步)
join: 一直block住,從quque的角度阻塞住線程。調用task_done()函數退出。
 

2.線程間的同步問題編程

2.1 線程爲何須要同步?同步究竟是個啥意思?
這是在多線程中,必需要面對的問題。
例子:咱們有個共享變量total,一個方法對total進行加法,一個方法對加完以後的total進行減法。
若是循環對total進行加減的次數比較大的時候,就會比較明顯的發現,每次運行的時候,獲得的taotal多是不同的。
import threading
 
total=0
def add():
    global total
    for i in range(100000000):
        total += 1
def desc():
    global  total
    for i in range(100000000):
        total = total - 1
if __name__=="__main__":
    add_total=threading.Thread(target=add)
    desc_total=threading.Thread(target=desc)
    add_total.start()
    desc_total.start()
    add_total.join()
    desc_total.join()
 
    print(total)
爲何不會像咱們但願的最後的total爲0呢?
從字節碼的角度上看,咱們看看簡化後的add和desc的字節碼。
#input 
def add1(a):
    a += 1
 
def desc1(a):
    a -= 1
 
import  dis
print(dis.dis(add1))
print(dis.dis(desc1))
 
#output
22           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_ADD
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None
25           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_SUBTRACT
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None
從字節碼來看流程爲:#1.load a #2.load 1 #3.add  #4.賦值給a
任何一步字節碼都是有可能被切換出去另一個線程的字節碼去操做a,可能在1線程運行到4字節碼(a和1相加)的時候,開始運行2線程的6字節碼(賦值給a)。
相似的有銀行存取錢、商品庫存等也會有這個問題。
 
2.2 線程如何同步?
用鎖將這段代碼段鎖住,鎖住時,不進行切換。直接運行完這段代碼段。
 
a.Lock和Rlock
threading中有提供lock。
#input 
def add1(a):
    a += 1
 
def desc1(a):
    a -= 1
 
import  dis
print(dis.dis(add1))
print(dis.dis(desc1))
 
#output
22           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_ADD
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None
25           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_SUBTRACT
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None
注意acquire和release成對存在。運行的時候會發現比不加鎖的時候慢比較多。因此其實鎖的問題也很明顯:鎖會影響性能,鎖會引發死鎖。死鎖裏有個很是常見的問題資源競爭是很容易發生的。
那能不能我鎖裏套着鎖呢?Lock方法是不能夠的,可是threading提供了Rlock可重入鎖。
Rlock在同一個線程裏面,能夠連續調用屢次acquire,可是注意acquire和release也必定是要成對存在的。
from threading import  RLock
 
total=0
lock=RLock()
def add():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        lock.acquire()
        total += 1
        lock.release()
        lock.release()
 
3.condition使用以及源碼分析
condition是條件變量,用於複雜的線程間同步。
 
3.1 condition的使用
例子:現有一個需求,要求 天貓精靈和小愛一人一句進行對話。若是咱們現用lock來實現是沒辦法作到這邊說完一句,那邊就說一句的。因此有了condition。
在這個例子中,須要用到condition的兩個重要方法 notify()和wait()。notify()用於通知這邊動做完成,wait()用於阻塞住等待消息。
#input
import  threading
 
class XiaoAi(threading.Thread):
    def __init__(self,cond):
        self.cond=cond
        super().__init__(name="小愛")
    def run(self):
        with self.cond:
            print("小愛: 天貓在嗎 我是小愛")
            self.cond.notify()  #小愛print完了,信號發送
            self.cond.wait()  #小愛等待接受信號
            print("小愛: 咱們來背詩吧")
            self.cond.notify()
class TianMao(threading.Thread):
    def __init__(self,cond):
        self.cond=cond
        super().__init__(name="天貓")
 
    def run(self):
        with self.cond:
            self.cond.wait()
            print("天貓: 在 我是天貓")
            self.cond.notify()
            self.cond.wait()
            print("天貓: 好啊")
            self.cond.notify()
 
if __name__=="__main__":
    condition=threading.Condition()
    xiaoai=XiaoAi(condition)
    tianmao=TianMao(condition)
 
    tianmao.start()
    xiaoai.start()
 
#output:
小愛: 天貓在嗎 我是小愛
天貓: 在 我是天貓
小愛: 咱們來背詩吧
天貓: 好啊
ps:須要注意的是
  • condition必須先with 再調用 notify和wait方法
  • 這麼寫的時候,線程的start()順序很重要
 
3.2 Condition源碼分析
condition實際上是有兩層鎖的。一把底層鎖,會在線程調用了wait()的時候釋放。
上層鎖會在wait()的時候放入雙端隊列中,在調用notify()的時候被喚醒。
 
a.condition=threading.Condition()
condition初始化的時候申請了一把鎖
 
b.self.cond.wait()
先釋放了condition初始化的時候申請的底層鎖,而後又申請了鎖放入雙端隊列。
 
c. self.cond.notify()
 
4.信號量 semaphore
是能夠用來控制線程執行數量的鎖。
 
4.1 semaphore的使用
需求:如今有個文件,對文件能夠進行讀和寫,可是寫是互斥的,讀是共享的。而且對讀的共享數也是有控制的。
例:爬蟲。控制爬蟲的併發數。
import threading
import time
 
class HtmlSpider(threading.Thread):
    def __init__(self,url,sem):
        super().__init__()
        self.url=url
        self.sem=sem
 
    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()
 
class UrlProducer(threading.Thread):
    def __init__(self,sem):
        super().__init__()
        self.sem=sem
    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_test=HtmlSpider("www.baidu.com/{}".format(i),self.sem)
            html_test.start()
 
if __name__=="__main__":
    sem=threading.Semaphore(3)  #設置控制的數量爲3
 
    urlproducer=UrlProducer(sem)
    urlproducer.start()

ps:安全

  • 每acquire一次,數量就會被減小一,release的時候數量會自動回來。
  • 須要注意sem釋放的地方,應該是在HtmlSpider運行完以後進行釋放。
 
4.2 semaphore源碼 
實際上semaphore就是對condition的簡單應用。
 
a.sem=threading.Semaphore(3)
實際上就是在初始化的時候,調用了Condition。
 
b.self.sem.acquire()
咱們簡單看這個邏輯就是,若是設置的數用完了,就讓condition進入wait狀態,不然就把數量減一。
 
c.self.sem.release()
release 也是很簡單的數量加一和condition的notify。
 
 
5.除了上述的對Condition的應用,queue模塊中的Queue也對Condition作了更爲複雜的應用。特別是queue中的put。
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        。。。
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
 
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0
 
    def put(self, item, block=True, timeout=None):
       
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()
    。。。。。。

 

四. 線程池
在前面進行線程間通訊的時候,想要多個線程進行併發的時候,須要咱們本身去維護鎖。
可是咱們如今但願有工具來幫咱們對想要線程併發數進行管理。因而有了線程池。
那麼爲何明明有了信號量 semaphore 還須要線程池呢?
由於線程池不僅是控制了線程數量而已。
 
好比說,如今有需求,在主進程中,咱們須要獲得某個線程的狀態。
而且線程的狀態不論是退出仍是什麼,主進程能馬上知道。
futures讓多線程和多進程的接口一致。
 
1.使用線程池
concurrent.futures中有兩個類ThreadPoolExecutor和 ProcessPoolExecutor 分別用於線程池和進程池的建立,基類是futures的Executor類。
使用ThreadPoolExecutor只須要將要執行的函數和要併發的線程數交給它就能夠了。
使用線程池來執行線程任務的步驟以下:
a.調用 ThreadPoolExecutor 類的構造器建立一個線程池。
b.定義一個普通函數做爲線程任務。
c.調用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務。submit返回的是Future類(重要)
d. 當不想提交任何任務時,調用 ThreadPoolExecutor 對象的 shutdown() 方法來關閉線程池。
#例:將以前爬蟲模擬的腳本改成線程池用。

from concurrent.futures import  ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print("get html page {} successed!".format(times))
    return times

excutor=ThreadPoolExecutor(max_workers=2)
#submit 提交到線程池
#submit的返回很重要,返回的對象Future類能夠判斷這個函數的執行狀態等
#submit 是非阻塞的
task1=excutor.submit(get_html,(3))
task2=excutor.submit(get_html,(2))

print(task1.done())  
print(task2.done())

#result 是阻塞的,接受函數的返回
print(task1.result())
print(task2.result())

#output:
False
False
get html page 2 successed!
get html page 3 successed!
3
2
爲何done()輸出的是false呢。由於submit的返回是非阻塞的,沒有等task執行完就返回了task done的狀態。
若是隔幾秒輸出done()的返回又是true了。
 
Future對象經常使用的其餘方法:cancel()
取消該 Future 表明的線程任務。若是該任務正在執行,不可取消,則該方法返回 False;不然,程序會取消該任務,並返回 True。
print(task1.done())
print(task1.cancel())
print(task2.done())

#result 是阻塞的,接受函數的返回
print(task1.result())

print(task2.result())

output:
False
False
False
get html page 2 successed!
get html page 3 successed!
3
2

cancelled():返回 Future 表明的線程任務是否被成功取消。多線程

 
2.獲取全部完成的future的狀態和值as_completed和map
 
2.1 .as_complete
from concurrent.futures import  ThreadPoolExecutor,as_completed
import time
 
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
future_list=[ excutor.submit(get_html,(uid)) for uid in uids]
for future in as_completed(future_list):
    print(future.result())
 
#output:
get url successed: " www.test.com/2 "
2
get url successed: " www.test.com/5 "
5
get url successed: " www.test.com/3 "
3

as_completed():yield 全部完成的futures的全部返回。併發

那麼as_complete是如何作到收集全部完成的異步方法的狀態的呢?app

先把全部已是finish狀態的future返回,
再一直while pending,等待timeout範圍內的future變成finish,把finish的future yield出來。
from concurrent.futures import  ThreadPoolExecutor,as_completed
import time
 
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
future_list=[ excutor.submit(get_html,(uid)) for uid in uids]
for future in as_completed(future_list):
    print(future.result())
 
#output:
get url successed: " www.test.com/2 "
2
get url successed: " www.test.com/5 "
5
get url successed: " www.test.com/3 "
3
 
2.2  經過excutor的map方法 獲取已經完成的future
excutor的map,和map是差很少的,傳遞函數和參數列表,就會多多線程運行參數列表數的線程。
與a中不同的是,map返回的順序和url中的順序是同樣的,而a的as_completed是誰先finishi誰就先被yield出來。
並且map返回的就是result,而as_completed返回的是Future。
from concurrent.futures import  ThreadPoolExecutor,as_completed,wait
import time
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
 
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
 
result_list=excutor.map(get_html,uids)
for result in result_list:
    print(result)
 
#output:
get url successed: " www.test.com/2 "
get url successed: " www.test.com/5 "
5
2
get url successed: " www.test.com/3 "
3

 

3.wait()方法
wait方法用於阻塞,指定某一個task或者一些task執行完成再往下執行。
def wait(fs, timeout=None, return_when=ALL_COMPLETED)
例:若是我想在所有task執行完以後打印"task end"字符串
from concurrent.futures import  ThreadPoolExecutor,as_completed,wait
import time
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
future_list=[ excutor.submit(get_html,(uid)) for uid in uids]
print("task end")
 
#output:
task end
get url successed: " www.test.com/2 "
get url successed: " www.test.com/5 "
get url successed: " www.test.com/3 "

#尚未執行完,就輸出了 task end。須要加上: wait(future_list)
print("task end") #output: get url successed: " www.test.com/2 " get url successed: " www.test.com/5 " get url successed: " www.test.com/3 " task end
wait還能夠指定在何時執行完後返回。
 
五. threadpool Future 源碼分析
這段源碼我是真的很想認真通俗淺顯的去分析的。由於在python的多進程和多線程中這個Future的概念是十分常見和重要的。
可是我發現我解釋起來太蒼白了,尚未直接去看源碼來的通俗易懂。就在這放一小段入口的我本身的理解吧。
看完了上面幾段筆記,確定會有這些疑惑:
- submit返回的Future對象究竟是啥?
    Future是用來表示task的對象的一個類,不少人稱爲將來對象,就是這個任務未必如今執行完成了,可是將來是會執行完成的。
    獲得了Future對象,能經過其中的屬性和方法獲得task的狀態,是否執行完成等。
    在python的多線程、多進程中,不少地方用到了Future概念。
    具體屬性能夠去看Class Future中的屬性和方法。
 
- 那麼Future這個對象是怎麼設計的呢?Future怎麼知道task的狀態改變的呢?
在以前的例子裏,咱們用ThreadPoolExecutor的submit提交全部的task,返回了Future對象。
那麼submit對Future對象的哪些屬性進行了哪些處理而後返回,才能讓咱們獲得它的result的呢?
submit的源碼:最主要的邏輯是註釋了的那幾句。
def submit(self, fn, *args, **kwargs):
    with self._shutdown_lock:
        if self._broken:
            raise BrokenThreadPool(self._broken)

        if self._shutdown:
            raise RuntimeError('cannot schedule new futures after shutdown')
        if _shutdown:
            raise RuntimeError('cannot schedule new futures after'
                               'interpreter shutdown')

        f = _base.Future()    #初始化一個future對象f
        w = _WorkItem(f, fn, args, kwargs) #其實是這個_WorkItem把(future對象,執行函數,函數須要的參數)放進去的,而且完成函數的執行,而且設置future的result

        self._work_queue.put(w)       #將w這個task放入 _work_queue隊列,會在下面這個方法中,被起的Thread進行調用。
        self._adjust_thread_count()   #調整線程數量,而且初始化線程,開啓線程。Thread方法的參數是self._work_queue。起來的線程中執行的task是上兩步生成的w隊列。
        return f
相關文章
相關標籤/搜索