python 線程入門

線程的定義:

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。python

進程和線程的區別

  • Threads share the address space of the process that created it; processes have their own address space.
  • 線程的地址空間共享,每一個進程有本身的地址空間。
  • Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  • 一個進程中的線程直接接入他的進程的數據段,可是每一個進程都有他們本身的從父進程拷貝過來的數據段
  • Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  • 一個進程內部的線程之間可以直接通訊,進程之間必須使用進程間通訊實現通訊
  • New threads are easily created; new processes require duplication of the parent process.
  • 新的線程很容易被建立,新的進程須要從父進程複製
  • Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  • 一個進程中的線程間可以有至關大的控制力度,進程僅僅只能控制他的子進程
  • Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
  • 改變主線程(刪除,優先級改變等)可能影響這個進程中的其餘線程;修改父進程不會影響子進程

python線程模塊的選擇

  Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊容許程序員建立和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構。
  避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程結束時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做,至少threading模塊能確保重要的子線程退出後進程才退出。 mysql

  thread模塊不支持守護線程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出。而threading模塊支持守護線程,守護線程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就在那等着,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。程序員

threading模塊

線程的兩種調用方式

線程的調用有兩種方式,分爲直接調用和繼承式調用,示例代碼以下:sql

1 #直接調用
 2 import threading
 3 import time
 4 
 5 def sayhi(num): #定義每一個線程要運行的函數
 6 
 7     print("running on number:%s" %num)
 8 
 9     time.sleep(3)
10 
11 if __name__ == '__main__':
12 
13     t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
14     t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例
15 
16     t1.start() #啓動線程
17     t2.start() #啓動另外一個線程
18 
19     print(t1.getName()) #獲取線程名
20     print(t2.getName())
21 
22 #繼承式調用
23 import threading
24 import time
25 
26 
27 class MyThread(threading.Thread):
28     def __init__(self,num):
29         threading.Thread.__init__(self)
30         self.num = num
31 
32     def run(self):#定義每一個線程要運行的函數
33 
34         print("running on number:%s" %self.num)
35 
36         time.sleep(3)
37 
38 if __name__ == '__main__':
39 
40     t1 = MyThread(1)
41     t2 = MyThread(2)
42     t1.start()
43     t2.start()
新建線程

join和setDaemon

join()方法在該線程對象啓動了以後調用線程的join()方法以後,那麼主線程將會阻塞在當前位置直到子線程執行完成才繼續往下走,若是全部子線程對象都調用了join()方法,那麼主線程將會在等待全部子線程都執行完以後再往下執行。編程

setDaemon(True)方法在子線程對象調用start()方法(啓動該線程)以前就調用的話,將會將該子線程設置成守護模式啓動,這是什麼意思呢?當子線程還在運行的時候,父線程已經執行完了,若是這個子線程設置是以守護模式啓動的,那麼隨着主線程執行完成退出時,子線程立馬也退出,若是沒有設置守護啓動子線程(也就是正常狀況下)的話,主線程執行完成以後,進程會等待全部子線程執行完成以後才退出。安全

示例代碼以下:服務器

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())   #查詢線程狀態
join
#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,
#2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)
#demo1
if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()以前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''


#demo2
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")
守護進程setDaemon

其餘方法數據結構

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

互斥鎖

互斥鎖的產生是由於前面提到過多線程之間是共享同一塊內存地址的,也就是說多個不一樣的線程可以訪問同一個變量中的數據,那麼,當多個線程要修改這個變量,會產生什麼狀況呢?當多個線程修改同一個數據的時候,若是操做的時間夠短的話,能獲得咱們想要的結果,可是,若是修改數據不是原子性的(這中間的時間太長)的話。。。頗有可能形成數據的錯誤覆蓋,從而獲得咱們不想要的結果。多線程

這時候,就須要互斥鎖出場了,爲了讓臨界資源(會被多個線程同時訪問)可以實現按照咱們控制訪問,須要使用互斥鎖來鎖住臨界資源,當一個線程須要訪問臨界資源時先檢查這個資源有沒有被鎖住,若是沒有被鎖住,那麼訪問這個資源並同時給這個資源加上鎖,這樣別
的線程就沒法訪問該臨界資源了,直到這個線程訪問完了這個臨界資源以後,釋放這把鎖,其餘線程纔可以搶佔該臨界資源。這個,就是互斥鎖的概念。
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)
View Code
#不加鎖:併發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼併發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是
#start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麼的恐怖
'''
互斥鎖與join的區別

線程死鎖和遞歸鎖

若是公共的臨界資源比較多,而且線程間都使用互斥鎖去訪問臨界資源,那麼將有可能出現一個狀況:併發

  • 線程1拿到了資源A,接着須要資源B才能繼續執行下去
  • 線程2拿到了資源B,接着須要資源A才能繼續執行下去

這樣,線程1和線程2各執己見。。。結果就都卡死在這了,這就是線程死鎖的由來。。。

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程.

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
死鎖案例
import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['tom','jack','lucy']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
典型問題:科學家吃麪

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

python中提供了一個方法(不止python中,基本上全部的語言中都支持這個方法)那就是遞歸鎖。遞歸鎖的建立是使用threading.RLock(),它裏面其實維護了兩個東西,一個是Lock,另外一個是counter,counter記錄了加鎖的次數,每加一把鎖,counter就會+1,釋放一次鎖counter就會減一,直到全部加的鎖都被釋放掉了以後其餘線程纔可以訪問這把鎖獲取資源。固然這個限制是對於線程之間的,同一個線程中,只要這個線程搶到了這把鎖,那麼這個線程就能夠對這把鎖加多個鎖,而不會阻塞本身的執行。這就是遞歸鎖的原理。

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['馬雲','馬化騰','馬蓉']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
遞歸鎖

信號量

信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。

計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)

BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()
View Code

事件

同進程的同樣

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。
import threading
import time,random
from threading import Thread,Event

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連接超時')
        print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>連接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
View Code

隊列Queue

使用隊列方法:

1 建立一個「隊列」對象
 2 import Queue
 3 q = Queue.Queue(maxsize = 10)
 4 Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。
 5 
 6 將一個值放入隊列中
 7 q.put(10)
 8 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
 9 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。
10 
11 將一個值從隊列中取出
12 q.get()
13 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。
14 
15 Python Queue模塊有三種隊列及構造函數:
16 一、Python Queue模塊的FIFO隊列先進先出。  class queue.Queue(maxsize)
17 二、LIFO相似於堆,即先進後出。             class queue.LifoQueue(maxsize)
18 三、還有一種是優先級隊列級別越低越先出來。   class queue.PriorityQueue(maxsize)
19 
20 此包中的經常使用方法(q = Queue.Queue()):
21 q.qsize() 返回隊列的大小
22 q.empty() 若是隊列爲空,返回True,反之False
23 q.full() 若是隊列滿了,返回True,反之False
24 q.full 與 maxsize 大小對應
25 q.get([block[, timeout]]) 獲取隊列,timeout等待時間
26 q.get_nowait() 至關q.get(False)
27 非阻塞 q.put(item) 寫入隊列,timeout等待時間
28 q.put_nowait(item) 至關q.put(item, False)
29 q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
30 q.join() 實際上意味着等到隊列爲空,再執行別的操做
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
先進先出
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''
後進先出
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''
優先級隊列
相關文章
相關標籤/搜索