python中線程

線程

線程的實質:進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位。php

線程的特性:html

  1. 同一個進程內的多個線程共享該進程內的地址資源,但也任然有本身獨立的存儲空間
  2. 建立線程的開銷要遠小於建立進程的開銷(建立一個進程,就是建立一個車間,涉及到申請空間,並且在該空間內建至少一條流水線,但建立線程,就只是在一個車間內造一條流水線,無需申請空間,因此建立開銷小)

注意:java

加入你的電腦是四核的,我開四個進程的話,每個進程佔用一個cpu來運行,若是開4個線程的話 這四個線程都隸屬於一個進程裏面,全部4個線程只是佔用一個cpu運行(僞併發,GIL鎖)node

由於解釋性語言很難知道當前這一句執行完了下一句是什麼,解釋器的鍋,不是python語言的鍋python

線程和進程的使用場景:mysql

若是兩個任務,須要共享內存,又想實現異步,使用多線程git

若是兩個任務,須要數據隔離,使用多進程程序員

線程小故事github


 全局解釋器鎖(GIL鎖)解釋爲何Cpython無法使用多核優點

GIL本質就是一把互斥鎖,既然是互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。sql

首先肯定一點:運行python文件其實是運行python解釋器的進程,每次執行python程序,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生3個不一樣的python進程

解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就致使了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操做,解決這種問題沒有什麼高明的方法,就是加鎖處理,以下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼

注:上圖表示多個線程同時執行python代碼,而後和垃圾回收線程一塊兒訪問解釋器代碼

GIL與Lock

機智的同窗可能會問到這個問題:Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock?

首先,咱們須要達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據

而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖。

最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不同前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock,以下圖

有了GIL的存在,同一時刻同一進程中只有一個線程被執行

聽到這裏,有的同窗立馬質問:進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點,也就是說python沒用了,php纔是最牛逼的語言?

這裏就回到了最開始的地方:線程和進程的使用場景:

若是兩個任務,須要共享內存,又想實現異步,使用多線程

若是兩個任務,須要數據隔離,使用多進程

python線程模塊的選擇

  Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊容許程序員建立和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構。
  避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程結束時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做,至少threading模塊能確保重要的子線程退出後進程才退出。

  thread模塊不支持守護線程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出。而threading模塊支持守護線程,守護線程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就在那等着,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。

threading模塊

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹

線程的建立Threading.Thread類

建立線程的方式1

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')

建立線程的方式2

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主線程')

注意:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣,而開多個進程,每一個進程都有不一樣的pid

t.start()和t.run(),start會自動調用run,可是調用start會自動執行run,可是不必定會當即執行,是等待調度,什麼時候真正的被調度取決於cpu,而調用t.run則是直接執行線程對象的run方法

線程相關的基本方法和使用

Thread實例對象的方法
isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。


threading模塊提供的一些方法:

threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。


 使用實例

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啓線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進程
    Thread-1
    '''

多線程實現socket實例

服務端:

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

客戶端:

import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

守護線程

和進程同樣,主線程依舊會等待子線程的結束才結束,若是不想這樣,把子線程設置成守護線程

不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬

須要強調的是:運行完畢並不是終止運行

#1.對主進程來講,運行完畢指的是主進程代碼運行完畢

#2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢

詳細解釋:

#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,

#2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

實例

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar, daemon=True)

# t2.daemon=True
t1.start()
t2.start()
print("main-------")

# 123
# 456
# main-------
# end123

死鎖和遞歸鎖

一 死鎖現象

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖(A和B都拿着鑰匙,但A要B的鑰匙才能還鎖,B要A的鑰匙才能還鎖,這就是死鎖)

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

執行效果

Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖 #出現死鎖,整個程序阻塞住

二 遞歸鎖

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖,兩者的區別是:遞歸鎖能夠連續acquire屢次,而互斥鎖只能acquire一次

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
Python大法就是好,提供了with自動加鎖和解鎖!有了它,加鎖時,咱們就能夠解放雙手~(≧▽≦)/~啦啦啦!
官方大法:Lock 對象和 with 語句塊一塊兒使用能夠保證互斥執行,就是每次只有一個線程能夠執行 with 語句包含的代碼塊。with 語句會在這個代碼塊執行前自動獲取鎖,在執行結束後自動釋放鎖。
import threading
class SharedCounter:

    def __init__(self, init_value=0):
        self.init_value = init_value
        self._value_lock = threading.Lock()

    def incr(self, delta=1):
        with self._value_lock:
            self.init_value += delta

    def decr(self, delta=1):
        with self._value_lock:
            self.init_value -= delta

使用遞歸鎖,儘量避免死鎖的發生!強烈推薦使用遞歸鎖加with大法!

import threading

class SharedCounter:
    _lock = threading.RLock()

    def __init__(self, initial_value=0):
        self._value = initial_value

    def incr(self, delta=1):
        with SharedCounter._lock:
            self._value += delta

    def decr(self, delta=1):
        with SharedCounter._lock:
            self.incr(-delta)

信號量

同進程的同樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

Event事件

同進程的同樣,線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。

對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。

一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行


event.isSet():返回event的狀態值;

event.wait():若是 event.isSet()==False將阻塞線程,若是給wait傳值,那麼就表明只阻塞多長時間;

event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。


 

例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連接超時')
        print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>連接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

條件

使得線程等待,只有知足某條件時,才釋放n個線程

import threading
 
def run(n):
    con.acquire()
    con.wait()  # 等着
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
    con = threading.Condition()  # 條件=鎖+wait的功能
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))  # 傳遞信號,能夠放行幾個
        con.release()

定時器

定時器,指定n秒後執行某操做

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

驗證碼定時器

from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input('>>: ').strip()
            if inp.upper() ==  self.cache:
                print('驗證成功',end='\n')
                self.t.cancel()
                break


if __name__ == '__main__':
    obj=Code()
    obj.check()

線程queue

queue隊列 :使用import queue,用法與進程Queue同樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
class queue.LifoQueue(maxsize=0) #last in fisrt out 
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列,同數字FIFO
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

Python標準模塊--concurrent.futures實現線程池和進程池

Python標準庫爲咱們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼,可是當項目達到必定的規模,頻繁建立/銷燬進程或者線程是很是消耗資源的,這個時候咱們就要編寫本身的線程池/進程池,以空間換時間。但從Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。

1.Executor和Future:

  concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。可是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor倒是很是有用,顧名思義二者分別被用來建立線程池和進程池的代碼。咱們能夠將相應的tasks直接放入線程池/進程池,不須要維護Queue來操心死鎖的問題,線程池/進程池會自動幫咱們調度。

  Future這個概念相信有java和nodejs下編程經驗的朋友確定不陌生了,你能夠把它理解爲一個在將來完成的操做,這是異步編程的基礎,傳統編程模式下好比咱們操做queue.get的時候,在等待返回結果以前會產生阻塞,cpu不能讓出來作其餘事情,而Future的引入幫助咱們在等待的這段時間能夠完成其餘的操做。

  p.s: 若是你依然在堅守Python2.x,請先安裝futures模塊。

https://docs.python.org/dev/library/concurrent.futures.html

值得一提的是Executor實現了__enter__和__exit__使得其對象可使用with操做符

池子的意義:

在剛開始學多進程或多線程時,咱們火燒眉毛地基於多進程或多線程實現併發的套接字通訊,然而這種實現方式的致命缺陷是:服務的開啓的進程數或線程數都會隨着併發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,因而咱們必須對服務端開啓的進程數或線程數加以控制,讓機器在一個本身能夠承受的範圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質仍是基於多進程,只不過是對開啓進程的數目加上了限制

注意:若是機器可以支持100個線程沒有必要開10個線程的那種線程池,直接開100線程更快

開進程池和線程池都有兩種方式:

第一種:

進程池——multiprocessing.Pool

線程池——multiprocessing.dummy.Pool

第二種:

進程池——from concurrent.futures import ProcessPoolExecutor

線程池——from concurrent.futures import ThreadPoolExecutor

注:第二種是對第一種的高度封裝

官網:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

基本方法

一、submit(fn, *args, **kwargs)
異步提交任務,不阻塞,不用等任務提交完就能繼續往下執行,*args和**kwargs是參數

二、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做

三、shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前(shutdown以前以後不能再提交任務了)

四、result(timeout=None)
取得結果,該方法會使異步池變成同步,由於得拿到結果才能進行下一步,優化是把線程對象先放在列表裏,結束後循環拿

五、add_done_callback(fn)
回調函數,這裏參數接收的是函數,函數接收的不是返回值,而是future對象  pool.submit(task).add_done_callback(other_task)

六、pool.submit().done()
能夠斷定提交的任務是否完成,完成了返回True,不然返回done

進程池進階

介紹

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)  # 提交任務,task是任務名,i是參數
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

線程池使用

介紹

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

用法

把ProcessPoolExecutor換成ThreadPoolExecutor,其他用法所有相同

from concurrent.futures import ThreadPoolExecutor
from urllib2 import urlopen
import time
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'http://qq.com/']

def load_url(url):
    res = urlopen(url, timeout=60)
    print('%r page is %d bytes' % (url, len(res.read())))

if __name__ == '__main__':
    start = time.time()
    executor = ThreadPoolExecutor(max_workers=3)
    #使用submit方式
    for url in URLS:
        future = executor.submit(load_url,url)
        #print(future.done())
        print (future.result()) #加了.result()會阻塞主線程
    #使用map方式
    #executor.map(load_url, URLS)
    end = time.time()
    #print('主線程')
    print (end-start)
    ####

concurrent.futures.wait:

  wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另一個是uncompleted(未完成的)。使用wait方法的一個優點就是得到更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置爲ALL_COMPLETED。

  若是採用默認的ALL_COMPLETED,程序會阻塞直到線程池裏面的全部任務都完成,再執行主線程:

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list,return_when='ALL_COMPLETE'))

print('主線程')

# 運行結果:
'http://www.163.com' page is 662047 bytes
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes
DoneAndNotDoneFutures(done={<Future at 0x2d0f898 state=finished returned NoneType>, <Future at 0x2bd0630 state=finished returned NoneType>, <Future at 0x2d27470 state=finished returned NoneType>}, not_done=set())
主線程

future

future是concurrent.futures模塊和asyncio模塊的重要組件
從python3.4開始標準庫中有兩個名爲Future的類:concurrent.futures.Future和asyncio.Future
這兩個類的做用相同:兩個Future類的實例都表示可能完成或者還沒有完成的延遲計算。與Twisted中的Deferred類、Tornado框架中的Future類的功能相似

注意:一般狀況下本身不該該建立future,而是由併發框架(concurrent.futures或asyncio)實例化

緣由:future表示終將發生的事情,而肯定某件事情會發生的惟一方式是執行的時間已經安排好,所以只有把某件事情交給concurrent.futures.Executor子類處理時,纔會建立concurrent.futures.Future實例。
如:Executor.submit()方法的參數是一個可調用的對象,調用這個方法後會爲傳入的可調用對象排定時間,並返回一個future

客戶端代碼不能應該改變future的狀態,併發框架在future表示的延遲計算結束後會改變期物的狀態,咱們沒法控制計算什麼時候結束。

這兩種future都有.done()方法,這個方法不阻塞,返回值是布爾值,指明future連接的可調用對象是否已經執行。客戶端代碼一般不會詢問future是否運行結束,而是會等待通知。所以兩個Future類都有.add_done_callback()方法,這個方法只有一個參數,類型是可調用的對象,future運行結束後會調用指定的可調用對象。

.result()方法是在兩個Future類中的做用相同:返回可調用對象的結果,或者從新拋出執行可調用的對象時拋出的異常。可是若是future沒有運行結束,result方法在兩個Futrue類中的行爲差異很是大。
對concurrent.futures.Future實例來講,調用.result()方法會阻塞調用方所在的線程,直到有結果可返回,此時,result方法能夠接收可選的timeout參數,若是在指定的時間內future沒有運行完畢,會拋出TimeoutError異常。
而asyncio.Future.result方法不支持設定超時時間,在獲取future結果最好使用yield from結構,可是concurrent.futures.Future不能這樣作

無論是asyncio仍是concurrent.futures.Future都會有幾個函數是返回future,其餘函數則是使用future,在最開始的例子中咱們使用的Executor.map就是在使用future,返回值是一個迭代器,迭代器的__next__方法調用各個future的result方法,所以咱們獲得的是各個futrue的結果,而不是future自己

相關文章
相關標籤/搜索