python 學習_第四模塊 併發編程(多線程)python
一 開啓線程方式web
from threading import Thread import time def say(name): time.sleep(2) print("%s hello"%name) if __name__ =="__main__": t = Thread(target=say,args=("alex",)) t.start() print("主線程")
from threading import Thread import time class say(Thread): def __init__ (self,name): super().__init__() self.name = name def run(self): time.sleep(2) print("%s say hello" %self.name) if __name__ == "__main__": t = say("alex") t.start() print("主線程")
基於套接字實現多線程編程
from socket import * from threading import Thread def communicate(conn): while True: try: data = conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): server = socket(AF_INET,SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn,addr = server.accept() t = Thread(target=communicate,args=(conn,)) t.start() server.close() if __name__ == "__main__": server("127.0.0.1",8080)
from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(("127.0.0.1",8080)) while True: msg = input(">>: ").strip() if not msg:continue client.send(msg.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) client.close()
二 多線程與多進程的區別安全
區別:
多線程
1 線程開銷小, 進程開銷大併發
from threading import Thread def work(): print("hello") if __name__ == "__main__": t = Thread(target=work) t.start() print("end..") ''' hello end.. ''' from multiprocessing import Process def work(): print("hello") if __name__ == "__main__": t = Process(target=work) t.start() print("end..") ''' end.. hello '''
2 同一個進程內多線程它們的PID都是同樣的,多進程的PID 不一樣app
from threading import Thread import os def work(): print("hello",os.getpid()) if __name__ == "__main__": t1 = Thread(target=work) t2 = Thread(target=work) t1.start() t2.start() print("end..") ''' hello 2684 hello 2684 end.. '''
from multiprocessing import Process import os def work(): print("hello",os.getpid()) if __name__ == "__main__": t1 = Process(target=work) t2 = Process(target=work) t1.start() t2.start() print("end..") ''' end.. hello 16372 hello 8456 '''
3 同一個進程內的線程共享進程的數據, 進程之間地址空間是隔離的
dom
from multiprocessing import Process def work(): global n n = 10 if __name__ == "__main__": n = 100 p = Process(target=work) p.start() p.join() print("主",n) ''' 主 100 '''
from threading import Thread def work(): global n n = 10 if __name__ == "__main__": n = 100 p = Thread(target=work) p.start() p.join() print("主",n) ''' 主 10 '''
三 Thread對象的其餘屬性或方法異步
''' thread 實例對象的方法 isAlive() 返回線程是否活動的 getName() 返回線程名 setName() 設置線程名 threadming 模塊提供的一些方法 threading.currentThread() 返回當前線程變量 threading.enumerate() 返回一個包含正在運行的線程的list。正在運行指線程啓動後,結束前,不包括啓動前和終止後的進程 threading.activeCount() 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果 '''
from threading import Thread import threading import time def work(): time.sleep(2) threading.currentThread().setName("wwwww") # 設置線程名 print(threading.currentThread().getName()) if __name__ == "__main__": t = Thread(target=work) t.start() print(threading.current_thread().getName()) # MainThread print(threading.current_thread()) # <_MainThread(MainThread, started 12296)> print(threading.enumerate()) # [<_MainThread(MainThread, started 12296)>, <Thread(Thread-1, started 3420)>] print(threading.active_count()) # 2 print('主線程/主進程') # 主線程/主進程 ''' MainThread <_MainThread(MainThread, started 5860)> [<_MainThread(MainThread, started 5860)>, <Thread(Thread-1, started 16080)>] 2 主線程/主進程 wwwww '''
主線程等待子線程結束socket
from threading import Thread import time def say(name): time.sleep(2) print("%s say hello" %name) if __name__== "__main__": t = Thread(target=say,args=("egon",)) t.start() t.join() print("end") print(t.is_alive()) ''' egon say hello end False '''
四 守護線程
1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,
2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(2) print("end456") if __name__ == "__main__": t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon = True t1.start() t2.start() print("----------------------") ''' 123 456 ---------------------- end123 end456 '''
五 GIL全局解釋器鎖
1 GIL介紹
GIL本質就是一把互斥鎖,既然是互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。
保護不一樣的數據的安全,就應該加不一樣的鎖。
解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就致使了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操做,解決這種問題沒有什麼高明的方法,就是加鎖處理,以下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼
2 GIL與Lock區別
一、100個線程去搶GIL鎖,即搶執行權限
二、確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire()
三、極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL
四、直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘的線程再重複2 3 4的過程
3 GIL與多線程
有了GIL的存在,同一時刻同一進程只有一個線程被執行
如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。
4 多線程性能測試
from multiprocessing import Process from threading import Thread import os,time def work(): res = 0 for i in range(10000000): res *=i if __name__ == "__main__": l = [] start = time.time() for i in range(4): # 4 核 p = Process(target=work) #p = Thread(target=work) l.append(p) p.start() for p in l: p.join() stop = time.time() print("run time is %s "%(stop - start )) ''' 多線程用於IO密集型,如socket,爬蟲,web 多進程用於計算密集型,如金融分析 '''
六 死鎖現象與遞歸鎖
死鎖現象
所謂死鎖,是指兩個或兩個以上的進程或線程在執行過程當中, 因爭奪資源而形成的一種相互等待的現象,若無外力做用,它們都將沒法推動下去。 此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程
# 死鎖現象
# 死鎖現象 from threading import Thread,Lock import time mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print("%s 拿到了A鎖"%self.name) mutexB.acquire() print("%s 拿到了B鎖" % self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print("%s 拿到了B鎖"%self.name) time.sleep(2) mutexA.acquire() print("%s 拿到了A鎖" % self.name) mutexA.release() mutexB.release() if __name__ == "__main__": for i in range(10): t = MyThread() t.start() ''' Thread-1 拿到了A鎖 Thread-1 拿到了B鎖 Thread-1 拿到了B鎖 Thread-2 拿到了A鎖 #出現死鎖,整個程序阻塞 '''
# 遞歸鎖 from threading import Thread,Lock,RLock import time mutexA =mutexB = RLock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print("%s 拿到了A鎖"%self.name) mutexB.acquire() print("%s 拿到了B鎖" % self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print("%s 拿到了B鎖"%self.name) time.sleep(2) mutexA.acquire() print("%s 拿到了A鎖" % self.name) mutexA.release() mutexB.release() if __name__ == "__main__": for i in range(10): t = MyThread() t.start() ''' 遞歸鎖, 在Python中爲了支持在同一線程中屢次請求同一資源,Python 提供了可重入鎖Rlock,這個Rlock內部維護者一個Lock和一個 counter變量,counter記錄了acquire的次數,從而使資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源 上面的例子若是使用Rlock代替Lock 則不會發生死鎖,兩者的區別是 遞歸鎖能夠連續acquire屢次,二互斥鎖acquire 一次 '''
七 信號量 Event 定時器
1 信號量
''' 信號量也是一把鎖, 能夠指定信號量爲5 , 對比互斥鎖同一時間只能有一個任務搶到鎖去執行, 信號量同一時間能夠有5個任務拿到鎖去執行, 若是說互斥鎖是合租房屋的人去搶一個廁所,那麼信號量就至關於一羣路人去搶奪公共廁所,公共廁全部多個坑位,這意味着同一時間能夠有多我的 上公共廁所,但公共廁所容納的人物是必定的,這即是信號量的大小 '''
from threading import Thread,Semaphore import threading,time def func(): sm.acquire() print('%s get sm' % threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == "__main__": sm = Semaphore(5) for i in range(13): t = Thread(target=func) t.start()
2. Event
''' from threading import Event event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
from threading import Thread,Event,currentThread import time event = Event() def conn(): n = 0 while not event.is_set(): if n ==3: print("%s try too many times" %currentThread().getName()) return print("%s try %s" %(currentThread().getName(),n)) event.wait(2) n +=1 print("%s is connected" %currentThread().getName()) def check(): print("%s is checking" %currentThread().getName()) time.sleep(3) event.set() if __name__ == "__main__": for i in range(3): t = Thread(target=conn) t.start() t = Thread(target=check) t.start()
import time from threading import Event,Thread event = Event() class Boss(Thread): def run(self): print("BOSS: 今晚你們都要加班到22:00") event.set() time.sleep(5) print("Boss: <22:00>能夠下班") event.set() class Worker(Thread): def run(self): event.wait() print("worker: 哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("worker OhYeah!") if __name__ == "__main__": threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() print("ending...")
3.定時器
定時器,指定n秒後執行某操做
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
八 線程queue
1 先進先出
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
2 後進先出
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
3 優先級隊列:存儲數據時可設置優先級的隊列
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
九 進程池與線程池
''' from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor 線程池 提供異步調用 ProcessPoolExecutor 進程池 提供異步調用 '''
基本方法
''' 1. submit(fn,*args,**kwargs) 異步提交任務 2. map(func,*iterables,timeout=None,chunksize=1) 取代for循環submit的操做 3. shutdown(wait=True) 至關於進程池的pool.close()+pool.join() 操做 wait=True 等待池內全部任務執行完畢回收完資源後,才繼續 wait=False 馬上返回 並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map 必須在shutdown以前 4. result(timeout=None) 取得結果 5. add_done_callback(fn) 回調函數 '''
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(3): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random from threading import currentThread def task(): print("name :%s pid: %s run" %(currentThread().getName(),os.getpid())) time.sleep(random.randint(1,3)) if __name__ == "__main__": pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task,) pool.shutdown(wait=True) print("主")