Python線程模塊threading

線程,程序執行的最小單元,單線程處理多個任務只能一個處理完後繼續處理下一個直到所有處理完,多線程處理任務會比單線程處理起來快嗎?在python程序裏得看狀況,首先有GIL鎖的存在致使同一時刻只能有一個線程執行(執行遇到中斷釋放GIL鎖),這乍一看和單線程處理多任務沒有區別,可是若是執行的任務是I/O密集型任務就可以提升任務執行效率,但若是任務是CPU密集型任務顯然得不到任何效率提高,反而還會由於上下文切換等致使執行不如單線程執行。html

Python中實現多線程模塊推薦使用threading,threading獲得了java線程的啓示。Queue模塊對於線程同步是十分有用的,該模塊提供了一個同步FIFO隊列類型,這個類型很是便於處理線程之間的通訊和協調。java

threading模塊

__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
           'enumerate', 'main_thread', 'TIMEOUT_MAX',
           'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
           'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
           'setprofile', 'settrace', 'local', 'stack_size']
threading模塊提供對象 做用
current_thread/currentThread 爲調用線程返回一個Thread對象。若是調用線程不是由threading模塊建立的將建立並返回一個具備有限功能的Thread對象
Thread 表示控制線程的類,建立一個線程能夠直接實例化一個Thread對象,直接實例化時須要指定參數target,t對象調用run方法即執行target(*args, **kwargs); 還能夠同個繼承Thread類建立一個線程,這樣建立須要覆蓋run方法而不須要傳參target。
getName,setName t.getName(), t.setName(name)
getName返回t的名稱,setName設置t的名稱。線程名能夠是任意字符串而且不要求惟一。
isAlive t.isAlive()
若是t是活動的,將返回True(t.start()已經執行,t.run()尚未結束)。不然返False
isDaemon, setDaemon t.isDaemon() t.setDaemon(daemonic)
若是t是一個駐留程序(即便t仍然是活動的,Python能夠終止整個處理過程,這樣能夠結束線程t),isDaemon將返回True,不然返回False。最開始,當且僅當建立t的線程是一個駐留程序時,t纔是一個駐留程序。只能在t.start()以前調用t.setDaemon(True)把t設置爲一個駐留程序。通俗地講正常狀況下若是主線程執行內容中有兩個子線程任務要執行,若是給子線程設置setDaemon(True),那麼當主線程任務結束時兩個子線程仍是活動的時候主線程不會等待子線程執行完成。默認狀況下都是False
join t.join(timeout=None)
建立這個線程的線程將會掛起直到t結束。只能在t.start()以後調用t.join()
run t.run()
run是執行代碼(任務)的方法,不過統一使用t.start(), 讓t..start()去調用執行run
start t.start()
start可讓t活動,並讓run在一個單獨的線程中執行,一個線程對象只能調用一次run方法

代碼demo

import threading
import time


def cal(n):
    print(">>>>>>>>%s" % n)
    time.sleep(n)
    temp = n * n * n
    print("result: {}".format(temp))
    with open('{}.txt'.format(n), 'w') as f:
        f.write('{}abcde'.format(n))
    print("total_thread_count is: {}".format(threading.active_count()))


if __name__ == '__main__':
    t1 = threading.Thread(target=cal, args=(2,))
    t2 = threading.Thread(target=cal, args=(3,))
    t = threading.current_thread()
    print(t.isDaemon())     # False
    t2.start()
    t1.start()
    print("main thread ending")

# 執行結果
"""
False
>>>>>>>>3
>>>>>>>>2
main thread ending
result: 8
total_thread_count is: 3
result: 27
total_thread_count is: 2
"""
# 同目錄下生成2.txt和3.txt文件

給兩個子線程setDaemon(True)執行實例

import threading
import time


def cal(n):
    print(">>>>>>>>%s" % n)
    time.sleep(n)
    temp = n * n * n
    print("result: {}".format(temp))
    with open('{}.txt'.format(n), 'w') as f:
        f.write('{}abcde'.format(n))
    print("total_thread_count is: {}".format(threading.active_count()))


if __name__ == '__main__':
    t1 = threading.Thread(target=cal, args=(2,))
    t2 = threading.Thread(target=cal, args=(3,))
    t = threading.current_thread()
    t1.setDaemon(True)
    t2.setDaemon(True)
    print(t.isDaemon())     # False
    t2.start()
    t1.start()
    print("main thread ending")

# 執行結果
'''
False
>>>>>>>>3
>>>>>>>>2
main thread ending
'''
# 並不會生成2.txt和3.txt文件

使用類繼承建立Thread對象

import threading
import time


class CalTask(threading.Thread):
    def __init__(self, num):
        super(CalTask, self).__init__()
        self.num = num

    def run(self):
        n = self.num
        print(">>>>>>>>%s" % n)
        time.sleep(n)
        temp = n * n * n
        print("result: {}".format(temp))
        with open('{}.txt'.format(n), 'w') as f:
            f.write('{}abcde'.format(n))
        print("total_thread_count is: {}".format(threading.active_count()))


if __name__ == '__main__':
    t1 = CalTask(num=2)
    t2 = CalTask(num=3)
    # t = threading.current_thread()
    t = threading.currentThread()
    # t1.setDaemon(True)
    t2.setDaemon(True)
    print(t.isDaemon())     # False
    t2.start()
    t1.start()
    print("main thread ending")

Lock對象和RLock對象

Lock(互斥鎖)是在多線程編程中線程同步控制的一種方法。在編程中遇到多個線程都修改同一個共享數據的時候就須要考慮線程同步控制。python

線程同步可以保證多個線程安全訪問競爭資源。互斥鎖爲資源引入兩個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態爲「鎖定」,其餘線程不能更改;直到該線程釋放資源,將資源的狀態變成「非鎖定」,其餘的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操做,從而保證了多線程狀況下數據的正確性。redis

代碼實例1

import threading
import time

total = 100


class CalTask(threading.Thread):
    def __init__(self):
        super(CalTask, self).__init__()

    def run(self):
        global total
        print(">>>>>>: ", total)
        time.sleep(0.0001)
        total -= 1


if __name__ == '__main__':
    for i in range(100):
        t = CalTask()
        t.start()
    print(threading.active_count())
    while True:
        if threading.active_count() == 1:
            break
    print(total)

'''
執行結果:
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  99
>>>>>>:  99
>>>>>>:  99
>>>>>>:  99
>>>>>>:  95
>>>>>>:  95
...

9
0
'''
# 最終結果是total爲0,每次直接修改全局變量total自己和Python存在的GIL鎖的緣由致使結果最終爲0

代碼實例2

import threading
import time

total = 100


class CalTask(threading.Thread):
    def __init__(self):
        super(CalTask, self).__init__()

    def run(self):
        global total
        print(">>>>>>: ", total)
        temp = total
        time.sleep(0.0001)
        total = temp - 1


if __name__ == '__main__':
    for i in range(100):
        t = CalTask()
        t.start()
    print(threading.active_count())
    while True:
        if threading.active_count() == 1:
            break
    print(total)

'''
執行結果:
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  100
>>>>>>:  99
>>>>>>:  99
>>>>>>:  99
>>>>>>:  99
>>>>>>:  98
>>>>>>:  98
...

7
89
'''

# 最終結果是total爲89,這和代碼實例一對比結果不一樣的緣由是由於雖然有GIL鎖的存在,但如今是經過一箇中間變量來操做而後從新賦值給total,這樣會形成數據不安全

代碼實例3(代碼2加互斥鎖後)

import threading
import time

total = 100
lock = threading.Lock()


class CalTask(threading.Thread):
    def __init__(self):
        super(CalTask, self).__init__()

    def run(self):
        global total
        lock.acquire()
        print(">>>>>>: ", total)
        temp = total
        time.sleep(0.0001)
        total = temp - 1
        lock.release()


if __name__ == '__main__':
    for i in range(100):
        t = CalTask()
        t.start()
    print(threading.active_count())
    while True:
        if threading.active_count() == 1:
            break
    print(total)
    
'''
執行結果:
>>>>>>:  100
>>>>>>:  99
>>>>>>:  98
>>>>>>:  97
>>>>>>:  96
>>>>>>:  95
>>>>>>:  94
>>>>>>:  93
>>>>>>:  92
>>>>>>:  91
>>>>>>:  90
>>>>>>:  89
90
>>>>>>:  88
>>>>>>:  87
...

0
'''

代碼實例4(代碼2線程+join)

import threading
import time

total = 100
lock = threading.Lock()


class CalTask(threading.Thread):
    def __init__(self):
        super(CalTask, self).__init__()

    def run(self):
        global total
        print(">>>>>>: ", total)
        temp = total
        time.sleep(0.0001)
        total = temp - 1


if __name__ == '__main__':
    for i in range(100):
        t = CalTask()
        t.start()
        t.join()
    print(threading.active_count())
    while True:
        if threading.active_count() == 1:
            break
    print(total)

'''
執行結果:
>>>>>>:  100
>>>>>>:  99
>>>>>>:  98
>>>>>>:  97
>>>>>>:  96
>>>>>>:  95
>>>>>>:  94
>>>>>>:  93
>>>>>>:  92
>>>>>>:  91
>>>>>>:  90
>>>>>>:  89
>>>>>>:  88
>>>>>>:  87
...
1
0
'''

# 串行執行,執行效率否則代碼3(加鎖)

join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分‘串行’,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高編程

RLock遞歸鎖

死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程安全

死鎖示例

from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('33[41m%s 拿到A鎖33[0m' % self.name)

        mutexB.acquire()
        print('33[42m%s 拿到B鎖33[0m' % self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('33[43m%s 拿到B鎖33[0m' % self.name)
        time.sleep(2)

        mutexA.acquire()
        print('33[44m%s 拿到A鎖33[0m' % self.name)
        mutexA.release()

        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

'''
33[41mThread-1 拿到A鎖33[0m
33[42mThread-1 拿到B鎖33[0m
33[43mThread-1 拿到B鎖33[0m
33[41mThread-2 拿到A鎖33[0m
死鎖
'''

RLock替代Lock

from threading import Thread, RLock
import time
mutexA = RLock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('33[41m%s 拿到A鎖33[0m' % self.name)

        mutexA.acquire()
        print('33[42m%s 拿到B鎖33[0m' % self.name)
        mutexA.release()

        mutexA.release()

    def func2(self):
        mutexA.acquire()
        print('33[43m%s 拿到B鎖33[0m' % self.name)
        time.sleep(2)

        mutexA.acquire()
        print('33[44m%s 拿到A鎖33[0m' % self.name)
        mutexA.release()

        mutexA.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。多線程

Condition對象

Condition對象提供了對複雜線程同步問題的支持。架構

Condition class Condition(lock=None)
Condition能夠建立並返回一個新Condition對象,並將對象L設置爲lock,若是lock=None,L將被設置爲一個新建立的RLock對象
acquire,release c.acquire(wait=1) c.release()
這些方法將調用L的對應方法,除非一個線程擁有鎖L,不然該線程不能對c調用任何其餘方法。
notify,notifyAll c.notify() c.notifyAll()
notify能夠喚醒正在等待c的線程中的某一個線程。調用線程在調用c.notify()以前必須擁有L,而且notify不會釋放L。除非被喚醒的線程能夠再次獲得L,不然該線程不會變爲就緒狀態。所以,調用線程一般會在調用notify以後調用release。notifyAll相似於notify,區別在於notifyAll將喚醒全部正在等待的線程而不僅是其中的一個
wait c.wait(timeout=None)
wait將釋放L, 而後掛起調用線程,直到其它線程對c調用notify或notifyAll。調用線程在調用c.wait()以前必須擁有L。

condition示例

import threading
import time


class Seeker(threading.Thread):
    def __init__(self, cond, name):
        super(Seeker, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        time.sleep(1)  # 確保先運行Hider中的方法
        self.cond.acquire()  # 2
        print(self.name + ': 我已經把眼睛蒙上了')
        self.cond.notify()
        print(self.name + ": 速度啊大兄弟")
        self.cond.wait()  # 3
        # 6
        print(self.name + ': 我找到你了 大兄弟')
        self.cond.notify()
        self.cond.release()
        # 7
        print(self.name + ': 我贏了')  # 8


class Hider(threading.Thread):
    def __init__(self, cond, name):
        super(Hider, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()
        self.cond.wait()  # 1    #釋放對瑣的佔用,同時線程掛起在這裏,直到被notify並從新佔有瑣。
        # 4
        print(self.name + ': 我已經藏好了,你快來找我吧')
        self.cond.notify()
        self.cond.wait()  # 5
        # 8
        self.cond.release()
        print(self.name + ': 被你找到了,大爺的')


cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()


'''
執行結果:
seeker: 我已經把眼睛蒙上了
seeker: 速度啊大兄弟
hider: 我已經藏好了,你快來找我吧
seeker: 我找到你了 大兄弟
seeker: 我贏了
hider: 被你找到了,大爺的
'''

Event對象

Event對象可讓任意數量的線程掛起並等待。等待事件對象e的全部線程在任何其餘線程調用e.set()時將變爲就緒狀態。事件對象e有一個標記,能夠記錄該事件是否已經發送;在e被建立時,這個標記的初始值爲False。Event就是這樣一個相似於簡化的Condition。併發

方法 做用
Event Event能夠建立並返回一個新事件對象e,而且e的標記被設置爲False
clear e.clear() 將e的標記設置爲False
isSet e.isSet() 返回e的標記的值,True或False
set e.set() 將e的標記設置爲True。全部等待e的線程將變爲就緒
wait e.wait(timeout=None)
若是e的標記爲True,wait將當即返回。不然,wait將掛起調用線程,直到其餘一些線程調用

Event代碼示例

import threading
import time
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)


def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug(
        'redis ready, and connect to redis server and do some work [%s]',
        time.ctime())
    time.sleep(1)


def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug(
        'first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3)  # simulate the check progress
    readis_ready.set()


if __name__ == "__main__":
    main()

'''
執行結果:
(t1        ) Waiting for redis ready...
(t2        ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t1        ) redis ready, and connect to redis server and do some work [Thu Feb 14 14:58:53 2019]
(t2        ) redis ready, and connect to redis server and do some work [Thu Feb 14 14:58:53 2019]
'''

Semaphore對象

信號量(也被稱爲計數信號量,counting semaphore)是廣義上的鎖。Lock的狀態能夠被看做是True或False,信號量對象s的狀態是一個0~n的數字,n是在s被建立設置的。信號量能夠用於管理固定的資源池,相比信號量使用隊列來實現這個功能更健壯一些。app

方法 做用
Semaphore class Semaphore(n=1)
Semaphore能夠建立並返回一個狀態被設置爲n的信號量對象s
acquire s.acquire(wait=True)
在s的狀態大於0時,acquire將把狀態值減1並返回True,在s的狀態爲0而且wait爲True時,acquire將掛起調用線程並等待,直到其餘一些線程調用了s.release。在s的狀態爲0,而且wait爲False時,acquire將當即返回False
release s.release()
在s的狀態大於0,或者在狀態爲0可是沒有線程正在等待s時,release將把狀態值增長1,在s的狀態爲0而且有線程正在等待s時,release將把s的狀態設爲0,並喚醒任意一個等待線程。調用release的線程將再也不被掛起,該線程將保持就緒,並繼續正常執行

Semphore代碼示例

import threading
import time

semaphore = threading.Semaphore(5)  # 設置同時能夠有5個線程能夠得到信號量鎖


def func():
    if semaphore.acquire():
        print(threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()


for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()

# 執行結果:每兩秒併發執行5個線程任務,即每2秒打印5次

線程本地存儲

threading模塊提供了一個local類,線程可使用這個類得到線程本地存儲,也被稱爲每線程數據。線程本地存儲對象能夠set設置和get獲取屬性,能夠在__dict__中獲取到,這個本地存儲對象是線程安全的,多個線程同時設置和得到對象的屬性是不會有問題的。

示例代碼:

import threading
L = threading.local()
print("in main thread, setting zop to 42")
L.zop = 42


def targ():
    print("in subthread, setting zop to 23")
    L.zop = 23
    print("in subthread, setting zop is now ", L.zop)

if __name__ == '__main__':
    t = threading.Thread(target=targ)
    t.start()
    t.join()
    print("in main thread, setting zop is now ", L.zop)

# 執行結果
"""
in main thread, setting zop to 42
in subthread, setting zop to 23
in subthread, setting zop is now  23
in main thread, setting zop is now  42
"""

線程程序架構

只要線程程序必須處理某些外部對象,能夠專門指定一個使用Queue對象的線程來實現這樣的處理。經過這個Queue對象,外部接口線程能夠經過這個Queue對象得到其餘線程放入的工做請求。外部接口線程能夠將結果放入到一個或多個其餘Queue對象來返回這些結果。下面示例展現了若是將這種架構包裝到一個通用的可重用類中:

線程程序簡易架構示例1

import threading

try:
    import Queue            # Python 2
except ImportError:
    import queue as Queue   # Python 3

class ExternalInterfacing(threading.Thread):
    def __init__(self, **kwargs):
        super(ExternalInterfacing, self).__init__()
        self.setDaemon(True)
        self.workRequestQueue = Queue.Queue()
        self.resultQueue = Queue.Queue()
        self.start()

    def apply(self, externalCallable, *args, **kwargs):
        "called by other threads as externalCallable would be"
        self.workRequestQueue.put((externalCallable, args, kwargs))
        return self.resultQueue.get()

    def run(self):
        while 1:
            externalCallable, args, kwargs = self.workRequestQueue.get()
            self.resultQueue.put(externalCallable(args, kwargs))

搜了下資料發現threadpool的源碼實現的基礎架構就是這樣的,threadpool源碼有四百多行就不在這裏分析了,寫到這裏了threadpool源碼分析

相關文章
相關標籤/搜索