python併發編程之多線程(實踐篇)

一.threading模塊介紹

官網連接:https://docs.python.org/3/library/threading.html?highlight=threading#html

1.開啓線程的兩種方式

#直接調用
import threading import time def run(n): print('task',n) time.sleep(2) t1 = threading.Thread(target=run,args=('t1',)) t1.start()
#繼承式調用
mport threading import time class MyThread(threading.Thread): def __init__(self,n,sleep_time): super(MyThread, self).__init__() self.n = n self.sleep_time = sleep_time def run(self): print('running task',self.n) time.sleep(self.sleep_time) print('task done,',self.n) t1 = MyThread('t1',2) t1.start()

2.在一個進程下開啓多個線程與在一個進程下開啓多個子進程的區別

from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啓線程
    t=Thread(target=work) t.start() print('主線程/主進程') ''' 打印結果: hello 主線程/主進程 '''

    #在主進程下開啓子進程
    t=Process(target=work) t.start() print('主線程/主進程') ''' 打印結果: 主線程/主進程 hello '''
1.開啓速度比較
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
    t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每一個進程都有不一樣的pid
    p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid())
2.比較pid
from  threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100
 n=1 t=Thread(target=work) t.start() t.join() print('',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據
3.數據是否共享

3.應用

1)將socket通訊改寫爲多線程模式python

#_*_coding:utf-8_*_ #!/usr/bin/env python
import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
多線程併發的socket服務端
#_*_coding:utf-8_*_ #!/usr/bin/env python


import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data)
客戶端

2)三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件安全

rom threading import Thread msg_l=[] format_l=[] def talk(): while True: msg=input('>>: ').strip() if not msg:continue msg_l.append(msg) def format_msg(): while True: if msg_l: res=msg_l.pop() format_l.append(res.upper()) def save(): while True: if format_l: with open('db.txt','a',encoding='utf-8') as f: res=format_l.pop() f.write('%s\n' %res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format_msg) t3=Thread(target=save) t1.start() t2.start() t3.start()
View Code

3)主線程等待子線程結束多線程

from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() #主線程等待子線程運行結束了再往下走
    print('主線程') print(t.is_alive()) ''' egon say hello 主線程 False '''
join()方法

 

二.守護線程

不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬併發

1)對主進程來講,運行完畢指的是主進程代碼運行完畢app

2)對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢dom

須要強調的是:運行完畢並不是終止運行socket

from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置
 t.start() print('主線程') print(t.is_alive()) #結果爲True說明此時主線程並沒結束,守護進程還在
    ''' 主線程 True '''
守護線程生命週期
from threading import Thread import time def foo(): print(123) time.sleep(3) print("end123") def bar(): print(456) time.sleep(1) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True  #將t1設置爲守護進程,主進程結束後t1也結束,
t1.start()  #可能會出現t1沒有徹底徹底走完就結束的狀況
t2.start() print("main-------") """ 運行結果: 123 456 main------- end456 """
案例分析

三.Python GIL(Global Interpreter Lock)

https://www.cnblogs.com/linhaifeng/articles/7449853.htmlide

五.同步鎖

1.GIL與lock

1)線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來ui

2)join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高

3)GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

2.過程分析

全部線程搶的是GIL鎖,或者說全部線程搶的是執行權限

線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程當中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後正常執行到釋放Lock。。。這就致使了串行運行的效果

既然是串行,那咱們執行

t1.start()

t1.join

t2.start()

t2.join()

這也是串行執行啊,爲什麼還要加Lock呢,需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼。

3.Lock使用

鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

import threading R=threading.Lock() R.acquire() #獲取所對象 ''' 對公共數據的操做 ''' R.release() #釋放

 

#1.100個線程去搶GIL鎖,即搶執行權限 #2. 確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire() #3. 極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL #4.直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘的線程再重複2 3 4的過程
GIL鎖與互斥鎖綜合分析
#不加鎖:併發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1


if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''


#不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行
    time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行
 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''

#思考:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊 #沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是 #start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的 #單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1


if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 '''
互斥鎖與join的區別

 

六.死鎖現象與遞歸鎖

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() ''' Thread-1 拿到A鎖 Thread-1 拿到B鎖 Thread-1 拿到B鎖 Thread-2 拿到A鎖 而後就卡住,死鎖了 '''
死鎖現象

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,
#這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

 

七.信號量Semaphore

同進程的同樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ' get semaphore') # time.sleep(2) # sm.release()
def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
View Code

與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

 互斥鎖與信號量推薦博客:http://url.cn/5DMsS9r

八.Event

同進程的同樣

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行。

event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
import threading,time event = threading.Event() def lighter(): count = 0 event.set() #先設置綠燈
    while True: if count > 5 and count < 10:#改爲紅燈
            event.clear()#標誌位清了
            print('\033[41;1mred light is on ...\033[0m') elif count > 10: event.set()#變綠燈
            count = 0 else: print('\033[42;1mgreen light is on ...\033[0m') time.sleep(1) count += 1

def car(name): while True: if event.is_set():#表明綠燈
            print('[%s] running...'%name) time.sleep(1) else: print('[%s] sees red light ,waiting ...' %name) event.wait() print('\033[34;1m[%s] green light is on,start going ... \033[0m' %name) light = threading.Thread(target=lighter,) light.start() car1 = threading.Thread(target=car,args=('寶馬',)) car1.start()
紅綠燈

 

九.條件Condition

使得線程等待,只有知足某條件時,才釋放n個線程

import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()

 

十.定時器

定時器,指定n秒後執行某操做

 

from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed

 

from threading import Timer import random,time class Code: def __init__(self): self.make_cache() def make_cache(self,interval=5): self.cache=self.make_code() print(self.cache) self.t=Timer(interval,self.make_cache) self.t.start() def make_code(self,n=4): res=''
        for i in range(n): s1=str(random.randint(0,9)) s2=chr(random.randint(65,90)) res+=random.choice([s1,s2]) return res def check(self): while True: inp=input('>>: ').strip() if inp.upper() == self.cache: print('驗證成功',end='\n') self.t.cancel() break


if __name__ == '__main__': obj=Code() obj.check()
驗證碼定時器

 

 

十一.線程queue

queue隊列 :使用import queue,用法與進程Queue同樣

class queue.Queue(maxsize=0) #先進先出

 

import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
View Code

 

class queue.LifoQueue(maxsize=0) #last in fisrt out 

import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
後進先出

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
設置優先級
相關文章
相關標籤/搜索