使用方式 from threading import Thread
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread from multiprocessing import Process def work(name): print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=work,args=('egon',)) # t=Process(target=work,args=('egon',)) t.start() print('主線程')
#方式一 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程') #方式二 from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')
#!/usr/bin/Python # -*- coding:utf-8 -*- from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print('主線程/主進程')
多線程併發socket php
#!/usr/bin/python # -*- coding:utf-8 -*- from socket import * from threading import Thread def server(ip,port): s.bind((ip,port)) s.listen(5) while True: conn, addr = s.accept() print('client',addr) t = Thread(target=talk, args=(conn, addr)) t.start() def talk(conn,addr): #通訊 try: while True: res=conn.recv(1024) if not res:break print('client %s:%s msg:%s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception: pass finally: conn.close() if __name__ == '__main__': server('127.0.0.1', 8080)
客戶端python
#_*_coding:utf-8_*_ #!/usr/bin/env python import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data)
多線程文本保存輸入內容mysql
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread msg_l=[] format_l=[] def talk(): while True: msg=input('>>: ').strip() if not msg:continue msg_l.append(msg) def format(): while True: if msg_l: res=msg_l.pop() res=res.upper() format_l.append(res) def save(): while True: if format_l: res=format_l.pop() with open('db.txt','a',encoding='utf-8') as f: f.write('%s\n' %res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format) t3=Thread(target=save) t1.start() t2.start() t3.start()
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
#!/usr/bin/python # -*- coding:utf-8 -*- n=11111111111111111111111111111111111 import time from threading import Thread import threading def work(): time.sleep(2) print('%s say hello' %(threading.current_thread().getName())) if __name__ == '__main__': t=Thread(target=work) # t.setDaemon(True)#設置守護線程隨主線程關閉 t.start() t.join() print(threading.enumerate()) #當前活躍的線程對象,是一個列表形式 print(threading.active_count()) #當前活躍的線程數目 print('主線程',threading.current_thread().getName())
''' 定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) ''' 結論:在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點
首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。web
那麼CPython實現中的GIL又是什麼呢?GIL全稱Global Interpreter Lock爲了不誤導,咱們仍是來看一下官方給出的解釋:redis
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)sql
好吧,是否是看上去很糟糕?一個防止多線程併發執行機器碼的一個Mutex,乍一看就是個BUG般存在的全局鎖嘛!別急,咱們下面慢慢的分析。編程
爲何會有GIL緩存
因爲物理上得限制,各CPU廠商在覈心頻率上的比賽已經被多核所取代。爲了更有效的利用多核處理器的性能,就出現了多線程的編程方式,而隨之帶來的就是線程間數據一致性和狀態同步的困難。即便在CPU內部的Cache也不例外,爲了有效解決多份緩存之間的數據同步時各廠商花費了很多心思,也不可避免的帶來了必定的性能損失。服務器
Python固然也逃不開,爲了利用多核,Python開始支持多線程。而解決多線程之間數據完整性和狀態同步的最簡單方法天然就是加鎖。 因而有了GIL這把超級大鎖,而當愈來愈多的代碼庫開發者接受了這種設定後,他們開始大量依賴這種特性(即默認python內部對象是thread-safe的,無需在實現時考慮額外的內存鎖和同步操做)。多線程
慢慢的這種實現方式被發現是蛋疼且低效的。但當你們試圖去拆分和去除GIL的時候,發現大量庫代碼開發者已經重度依賴GIL而很是難以去除了。有多難?作個類比,像MySQL這樣的「小項目」爲了把Buffer Pool Mutex這把大鎖拆分紅各個小鎖也花了從5.5到5.6再到5.7多個大版爲期近5年的時間,本且仍在繼續。MySQL這個背後有公司支持且有固定開發團隊的產品走的如此艱難,那又更況且Python這樣核心開發和代碼貢獻者高度社區化的團隊呢?
因此簡單的說GIL的存在更多的是歷史緣由。若是推到重來,多線程的問題依然仍是要面對,可是至少會比目前GIL這種方式會更優雅。
GIL的影響
從上文的介紹和官方的定義來看,GIL無疑就是一把全局排他鎖。毫無疑問全局鎖的存在會對多線程的效率有不小影響。甚至就幾乎等於Python是個單線程的程序。
那麼讀者就會說了,全局鎖只要釋放的勤快效率也不會差啊。只要在進行耗時的IO操做的時候,能釋放GIL,這樣也仍是能夠提高運行效率的嘛。或者說再差也不會比單線程的效率差吧。理論上是這樣,而實際上呢?Python比你想的更糟。
下面咱們就對比下Python在多線程和單線程下得效率對比。測試方法很簡單,一個循環1億次的計數器函數。一個經過單線程執行兩次,一個多線程執行。最後比較執行總時間。測試環境爲雙核的Mac pro。注:爲了減小線程庫自己性能損耗對測試結果帶來的影響,這裏單線程的代碼一樣使用了線程。只是順序的執行兩次,模擬單線程。
這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
#計算密集型 from threading import Thread from multiprocessing import Process import os import time def work(): res=0 for i in range(1000000): res+=i if __name__ == '__main__': t_l=[] start_time=time.time() # for i in range(300): #串行 # work() for i in range(300): t=Thread(target=work) #在個人機器上,4核cpu,多線程大概15秒 # t=Process(target=work) #在個人機器上,4核cpu,多進程大概10秒 t_l.append(t) t.start() for i in t_l: i.join() stop_time=time.time() print('run time is %s' %(stop_time-start_time)) print('主線程')
#I/O密集型 from threading import Thread from multiprocessing import Process import time import os def work(): time.sleep(2) #模擬I/O操做,能夠打開一個文件來測試I/O,與sleep是一個效果 print(os.getpid()) if __name__ == '__main__': t_l=[] start_time=time.time() for i in range(1000): t=Thread(target=work) #耗時大概爲2秒 # t=Process(target=work) #耗時大概爲25秒,建立進程的開銷遠高於線程,並且對於I/O密集型,多cpu根本無論用 t_l.append(t) t.start() for t in t_l: t.join() stop_time=time.time() print('run time is %s' %(stop_time-start_time))
關於GIL與Lock的比較請看2.3小節,此處只需知道:有了GIL的存在,同一時刻統一進程中只有一個線程被執行
聽到這裏,有的同窗立馬質問:進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點,也就是說python沒用了,php纔是最牛逼的語言?
彆着急啊,老孃還沒講完呢。
要解決這個問題,咱們須要在幾個點上達成一致:
1. cpu究竟是用來作計算的,仍是用來作I/O的?
1. 多cpu,意味着能夠有多個核並行完成計算,因此多核提高的是計算性能
2. 每一個cpu一旦遇到I/O阻塞,仍然須要等待,因此多核對I/O操做沒什麼用處
一個工人至關於cpu,此時計算至關於工人在幹活,I/O阻塞至關於爲工人幹活提供所需原材料的過程,工人幹活的過程當中若是沒有原材料了,則工人幹活的過程須要中止,直到等待原材料的到來。
若是你的工廠乾的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的工人,意義也不大,還不如一我的,在等材料的過程當中讓工人去幹別的活,
反過來說,若是你的工廠原材料都齊全,那固然是工人越多,效率越高
結論:
對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用
固然對於一個程序來講,不會是純計算或者純I/O,咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,從而進一步分析python的多線程有無用武之地
分析:
咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,解決方案能夠是:
方案一:開啓四個進程
方案二:一個進程下,開啓四個線程
單核狀況下,分析結果:
若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝
若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝
多核狀況下,分析結果:
若是四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝
若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝
結論:如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。
應用:
多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析
鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:
進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一切說了額
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread,Lock,RLock import time class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutex.acquire() print('\033[45m%s 拿到A鎖\033[0m' %self.name) mutex.acquire() print('\033[43m%s 拿到B鎖\033[0m' % self.name) mutex.release() mutex.release() def f2(self): mutex.acquire() time.sleep(5) print('\033[43m%s 拿到B鎖\033[0m' % self.name) mutex.acquire() time.sleep(10) print('\033[45m%s 拿到A鎖\033[0m' % self.name) mutex.release() mutex.release() if __name__ == '__main__': # mutexA=Lock() # mutexB=Lock() mutex=RLock() for i in range(20): t=MyThread() t.start()
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()
實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5,控制併發量):
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread,Semaphore import time def work(id): with sem: time.sleep(2) print('%s say hello' %id) if __name__ == '__main__': sem=Semaphore(5) for i in range(20): t=Thread(target=work,args=(i,)) t.start()
與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
1.9.1 Event
同進程的同樣,線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。
import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set() if __name__=="__main__": main() 複製代碼
mysql
from threading import Thread,Event import threading import time,random def conn_mysql(): print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName()) event.wait() print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName()) def check_mysql(): print('\033[41m正在檢查mysql。。。\033[0m') time.sleep(random.randint(1,3)) event.set() time.sleep(random.randint(1,3)) if __name__ == '__main__': event=Event() t1=Thread(target=conn_mysql) #等待鏈接mysql t2=Thread(target=conn_mysql) #等待鏈接myqsl t3=Thread(target=check_mysql) #檢查mysql t1.start() t2.start() t3.start()
threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import Thread,Event import threading import time,random def conn_mysql(): while not event.is_set(): print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName()) event.wait(0.1) print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName()) def check_mysql(): print('\033[41m正在檢查mysql。。。\033[0m') time.sleep(random.randint(1,3)) event.set() time.sleep(random.randint(1,3)) if __name__ == '__main__': event=Event() t1=Thread(target=conn_mysql) t2=Thread(target=conn_mysql) t3=Thread(target=check_mysql) t1.start() t2.start() t3.start()
使得線程等待,只有知足某條件時,才釋放n個線程
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()
def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()
定時器,指定n秒後執行某操做
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
ueue隊列 :使用import queue,用法與進程Queue同樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
queue.
Queue
(maxsize=0) #先進先出
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
class queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
class queue.
LifoQueue
(maxsize=0) #l後進先出
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
須要強調的是:
1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)
2. 單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換
對比操做系統控制線程的切換,用戶在單線程內控制協程的切換,優勢以下:
1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
要實現協程,關鍵在於用戶程序本身控制程序切換,切換以前必須由用戶程序本身保存協程上一次調用時的狀態,如此,每次從新調用時,可以從上次的位置繼續執行
(詳細的:協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧)
爲此,咱們以前已經學習過一種在單線程下能夠保存程序運行狀態的方法,即yield,咱們來簡單複習一下: