目錄python
from threading import Thread,Lock x = 0 mutex = Lock() def task(): global x for i in range(200000): x = x+1 """ 因爲沒有進程鎖,程序切換比較快,例如: t1 的x 剛拿到0 x = 0+1 還沒進行運算,保存了狀態,就被cpu切換 t2 的 x 拿到0 進行了運算 x = 1 又切換到t1,x = 0+1 x = 1 產生數據安全的問題,因此結果就會隨機 """ if __name__ == '__main__': t1 = Thread(target=task) t2 = Thread(target=task) t3 = Thread(target=task) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print(x) ################# 337566 #比較隨機,正確的應該是600000
from threading import Thread, Lock x = 0 mutex = Lock() def task(): mutex.acquire() # 線程鎖 global x for i in range(200000): x = x + 1 mutex.release() if __name__ == '__main__': t1 = Thread(target=task) t2 = Thread(target=task) t3 = Thread(target=task) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print(x) ########################## 600000
from threading import Thread,Lock import time mutex1 = Lock() mutex2 = Lock() class MyTreada(Thread): def run(self): self.task1() self.task2() def task1(self): mutex1.acquire() print(f'{self.name}搶到了鎖1') mutex2.acquire() print(f'{self.name}搶到了鎖2') mutex2.release() print(f'{self.name}釋放了鎖1') mutex1.release() print(f'{self.name}釋放了鎖2') def task2(self): mutex2.acquire() print(f'{self.name}搶到了鎖2') time.sleep(1) mutex1.acquire() print(f'{self.name}搶到了鎖2') mutex2.release() print(f'{self.name}釋放了鎖2') mutex1.release() print(f'{self.name}釋放了鎖1') for i in range(3): t = MyTreada() t.start() ################## Thread-1搶到了鎖1 Thread-1搶到了鎖2 Thread-1釋放了鎖1 Thread-1釋放了鎖2 Thread-1搶到了鎖2 Thread-2搶到了鎖1##########下面的線程被阻塞 ############################# """ 倆個線程 線程1拿到了鎖頭2,要想往下執行須要鎖頭1 線程2拿到了鎖頭1,想要往下執行須要鎖頭2 互相拿到了彼此往下執行的必要條件(鎖頭),互相不放棄手裏的鎖頭,出現阻塞 """
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。安全
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
from threading import Thread,RLock """ 遞歸鎖:在同一個線程內能夠被屢次acquire 釋放:內部至關於維護了一個計數器,也就是說同一個線程acquire了幾回就要release()幾回 """ import time mutex1 = RLock() mutex2 = mutex1 class MyThreada(Thread): def run(self): self.task1() self.task2() def task1(self): mutex1.acquire() print(f'{self.name}搶到了鎖1') mutex2.acquire() print(f'{self.name}搶到了鎖2') mutex2.release() print(f'{self.name}釋放了鎖2') mutex1.release() print(f'{self.name}釋放了鎖1') def task2(self): mutex2.acquire() print(f'{self.name}搶到了鎖2') time.sleep(1) mutex1.acquire() print(f'{self.name}搶到了鎖1') mutex1.release() print(f'{self.name}釋放了鎖1') mutex2.release() print(f'{self.name}釋放了鎖2') for i in range(3): t = MyThreada() t.start() ############################# Thread-1搶到了鎖1 Thread-1搶到了鎖2 Thread-1釋放了鎖2 Thread-1釋放了鎖1 Thread-1搶到了鎖2 Thread-1搶到了鎖1 Thread-1釋放了鎖1 Thread-1釋放了鎖2 Thread-2搶到了鎖1 Thread-2搶到了鎖2 Thread-2釋放了鎖2 Thread-2釋放了鎖1 Thread-2搶到了鎖2 Thread-2搶到了鎖1 Thread-2釋放了鎖1 Thread-2釋放了鎖2 Thread-3搶到了鎖1 Thread-3搶到了鎖2 Thread-3釋放了鎖2 Thread-3釋放了鎖1 Thread-3搶到了鎖2 Thread-3搶到了鎖1 Thread-3釋放了鎖1 Thread-3釋放了鎖2
同進程的同樣服務器
實現:定製了鎖的個數,也就意味着最多有幾個線程能夠搶到鎖頭網絡
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器+1;
調用release() 時內置計數器-1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。多線程
實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):併發
from threading import Thread,currentThread,Semaphore import time def task(): sm.acquire() print(f'{currentThread().name}正在執行') time.sleep(3) sm.release() sm = Semaphore(5) for i in range(15): t = Thread(target=task) t.start() #################### Thread-1正在執行 Thread-2正在執行 Thread-3正在執行 Thread-4正在執行 Thread-5正在執行 Thread-7正在執行 Thread-6正在執行 Thread-8正在執行 Thread-9正在執行 Thread-10正在執行 Thread-11正在執行 Thread-13正在執行 Thread-12正在執行 Thread-14正在執行 Thread-15正在執行
與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程。app
由於cpython自帶的垃圾回收機制不是線程安全的,因此要有GIL鎖dom
致使了同一個進程下,同一時間只能由一個線程,沒法利用多核優點異步
分析:socket
推薦使用多進程,利用cpu的多核優點,並行的計算(cpu主要負責計算)
下面例子,分析過程:
from threading import Thread from multiprocessing import Process import time def workl(): res = 0 for i in range(10000000): res*=i if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): p = Process(target=workl) t_list.append(p) p.start() for p in t_list: p.join() end = time.time() print("多進程",end-start) ################ 多進程 1.082251787185669
from threading import Thread from multiprocessing import Process import time def workl(): res = 0 for i in range(10000000): res*=i if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): t = Thread(target=workl) t_list.append(t) t.start() for t in t_list: t.join() end = time.time() print("多線程",end-start) ############### 多進程 2.43462872505188
推薦使用多線程解決,大部分時間都在io,而且開啓一個線程要比開啓進程的速度快的多
大部分的需求都是io密集型,由於大部分的軟件都是基於網絡的
from threading import Thread from multiprocessing import Process import time def work(): x = 1+1 time.sleep(5) if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): t = Thread(target=work) t_list.append(t) t.start() for t in t_list: t.join() end = time.time() print("多線程",end-start) ################ 多線程 5.002916097640991
from multiprocessing import Process import time def work(): x = 1+1 time.sleep(5) if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): p = Process(target=work) t_list.append(p) p.start() for p in t_list: p.join() end = time.time() print("多進程",end-start) ################# 多進程 5.1951446533203125
[TOC]
import queue q = queue.Queue() q.put('123') q.put('dd') print(q.get()) print(q.get()) ############### 123 dd
import queue q = queue.LifoQueue()#堆棧,後進先出 q.put('ocean') q.put('sku') q.put('chen') print(q.get()) print(q.get()) print(q.get()) ############## chen sku ocean
import queue q = queue.PriorityQueue() q.put((50,'chen'))#一般元組的第一個值是int類型 q.put((100,'sky')) q.put((1,'rocky')) print(q.get()) print(q.get()) print(q.get()) ############## (1, 'rocky') (50, 'chen') (100, 'sky')
import queue q = queue.PriorityQueue() q.put('chen') q.put('sky') q.put('rocky') print(q.get()) print(q.get()) print(q.get()) ################## chen rocky sky
import queue q = queue.PriorityQueue() q.put(50) q.put(100) q.put(1) print(q.get()) print(q.get()) print(q.get()) ############ 1 50 100
和time.sleep()不一樣,線程定時器不會阻礙下面代碼的執行,而time.sleep()會阻礙。
from threading import Timer import time def task(): print('線程執行了') time.sleep(2) print('線程結束了') t = Timer(4,task)###元組第一個數是多少秒開啓線程 t.start() print("不會阻礙線程的執行") ######################## 不會阻礙線程的執行 線程執行了 線程結束了
服務端
import socket from threading import Thread def talk(conn): while True: try: msg = conn.recv(1024) if len(msg)==0:break conn.send(msg.upper()) except Exception: print('客戶點關閉鏈接') break def server_demo(): server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(("127.0.0.1",8008)) server.listen(5) while True: conn,addr = server.accept() t = Thread(target=talk,args=(conn,)) t.start() if __name__ == '__main__': server_demo()
客戶端
import socket from threading import Thread,currentThread def client_dome(): client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8008)) while True: msg = f"{currentThread().name}" if len(msg)==0:continue client.send(msg.encode('utf8')) feedback = client.recv(1024) print(feedback.decode('utf8')) client.close() if __name__ == '__main__': for i in range(20): t = Thread(target=client_dome) t.start()
進程池和線程池
何時限制?
當併發的任務數量遠遠大於計算機所能承受的範圍的時候,即沒法一次性開啓過多的任務數量,就應該考慮去限制進程數或者線程數,從而保證服務器不會崩潰。(蕩機)
同步:提交一個任務,必須等待任務執行完畢(或者最後拿到返回值),才能執行下一行代碼
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{current_process().name}在執行任務{i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ProcessPoolExecutor(4)#池子裏有四個進程 fu_list = [] for i in range(20): future = pool.submit(task,i)#task任務要作20次,4個線程負責作這些事情 print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行 ################# Process-1在執行任務0 0 Process-2在執行任務1 1 Process-3在執行任務2 4 Process-4在執行任務3 9 Process-1在執行任務4 16 Process-2在執行任務5 25 Process-3在執行任務6 36 Process-4在執行任務7 49 Process-1在執行任務8 64 Process-2在執行任務9 81 Process-3在執行任務10 100 Process-4在執行任務11 121 Process-1在執行任務12 144 Process-2在執行任務13 169 Process-3在執行任務14 196 Process-4在執行任務15 225 Process-1在執行任務16 256 Process-2在執行任務17 289 Process-3在執行任務18 324 Process-4在執行任務19 361
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{current_process().name}在執行任務{i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ProcessPoolExecutor(4)#池子裏有四個進程 fu_list = [] for i in range(20): future = pool.submit(task,i)#task任務要作20次,4個線程負責作這些事情 # print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行 fu_list.append(future) pool.shutdown()#關閉了池的入口,會等待全部的任務執行完,結束阻塞. for fu in fu_list: print(fu.result()) ########################### Process-1在執行任務0 Process-2在執行任務1 Process-3在執行任務2 Process-4在執行任務3 Process-1在執行任務4 Process-2在執行任務5 Process-3在執行任務6 Process-4在執行任務7 Process-1在執行任務8 Process-2在執行任務9 Process-3在執行任務10 Process-4在執行任務11 Process-1在執行任務12 Process-2在執行任務13 Process-3在執行任務14 Process-4在執行任務15 Process-1在執行任務16 Process-2在執行任務17 Process-3在執行任務18 Process-4在執行任務19 0 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name}在執行任務{i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ThreadPoolExecutor(4)#池子裏有四個線程 fu_list = [] for i in range(20): future = pool.submit(task,i)#task任務要作20次,4個線程負責作這些事情 print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行 ########################## ThreadPoolExecutor-0_0在執行任務0 0 ThreadPoolExecutor-0_0在執行任務1 1 ThreadPoolExecutor-0_1在執行任務2 4 ThreadPoolExecutor-0_0在執行任務3 9 ThreadPoolExecutor-0_2在執行任務4 16 ThreadPoolExecutor-0_1在執行任務5 25 ThreadPoolExecutor-0_3在執行任務6 36 ThreadPoolExecutor-0_0在執行任務7 49 ThreadPoolExecutor-0_2在執行任務8 64 ThreadPoolExecutor-0_1在執行任務9 81 ThreadPoolExecutor-0_3在執行任務10 100 ThreadPoolExecutor-0_0在執行任務11 121 ThreadPoolExecutor-0_2在執行任務12 144 ThreadPoolExecutor-0_1在執行任務13 169 ThreadPoolExecutor-0_3在執行任務14 196 ThreadPoolExecutor-0_0在執行任務15 225 ThreadPoolExecutor-0_2在執行任務16 256 ThreadPoolExecutor-0_1在執行任務17 289 ThreadPoolExecutor-0_3在執行任務18 324 ThreadPoolExecutor-0_0在執行任務19 361
異步:提交一個任務,不須要等待執行完畢,能夠直接執行下一行代碼。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{current_process().name}在執行任務{i}') time.sleep(1) return i ** 2 def parse(future): print(future.result()) if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 池子裏有四個進程 for i in range(20): future = pool.submit(task, i) # task任務要作20次,4個線程負責作這些事情 # print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行 future.add_done_callback(parse) # 爲當前任務綁定了一個函數,在當前任務執行結束的時候會觸發這個函數, # 會把future對象做爲參數傳給函數 # 這個稱之爲回調函數,處理完了回來就調用這個函數. ####################### Process-1在執行任務0 Process-2在執行任務1 Process-3在執行任務2 Process-4在執行任務3 Process-1在執行任務4 0 Process-2在執行任務5 1 Process-3在執行任務6 4 Process-4在執行任務7 9 Process-1在執行任務8 16 Process-2在執行任務9 25 Process-3在執行任務10 36 Process-4在執行任務11 49 Process-1在執行任務12 64 Process-2在執行任務13 81 Process-3在執行任務14 100 Process-4在執行任務15 121 Process-1在執行任務16 144 Process-2在執行任務17 169 Process-3在執行任務18 196 Process-4在執行任務19 225 256 289 324 361
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name}在執行任務{i}') time.sleep(1) return i ** 2 def parse(future): print(future.result()) if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 池子裏有四個線程 for i in range(20): future = pool.submit(task, i) # task任務要作20次,4個線程負責作這些事情 # print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行 future.add_done_callback(parse) # 爲當前任務綁定了一個函數,在當前任務執行結束的時候會觸發這個函數, # 會把future對象做爲參數傳給函數 # 這個稱之爲回調函數,處理完了回來就調用這個函數. ############################## ThreadPoolExecutor-0_0在執行任務0 ThreadPoolExecutor-0_1在執行任務1 ThreadPoolExecutor-0_2在執行任務2 ThreadPoolExecutor-0_3在執行任務3 0 ThreadPoolExecutor-0_0在執行任務4 1 ThreadPoolExecutor-0_1在執行任務5 9 ThreadPoolExecutor-0_3在執行任務6 4 ThreadPoolExecutor-0_2在執行任務7 25 ThreadPoolExecutor-0_1在執行任務8 16 ThreadPoolExecutor-0_0在執行任務9 36 ThreadPoolExecutor-0_3在執行任務10 49 ThreadPoolExecutor-0_2在執行任務11 81 ThreadPoolExecutor-0_0在執行任務12 64 ThreadPoolExecutor-0_1在執行任務13 100 ThreadPoolExecutor-0_3在執行任務14 121 ThreadPoolExecutor-0_2在執行任務15 144 ThreadPoolExecutor-0_0在執行任務16 169 ThreadPoolExecutor-0_1在執行任務17 196 ThreadPoolExecutor-0_3在執行任務18 225 ThreadPoolExecutor-0_2在執行任務19 256 289 361 324
什麼樣的協程是有有意義的?
優勢:
缺點:
對比多線程
本身要檢測全部的io,只要有一個阻塞總體都會跟着阻塞
對比多進程
沒法利用多核優點
爲何要有協程?
本身控制切換的速度比操做系統控制切換的快,下降了單個線程的io時間。
import gevent import time def eat(): print("eat 1") gevent.sleep(2) print('eat 2') def play(): print("play 1") gevent.sleep(3) print("play 2") start = time.time() g1 = gevent.spawn(eat) g2 = gevent.spawn(play) g1.join() g2.join() end = time.time() print(end - start) ################### eat 1 play 1 eat 2 play 2 3.006275177001953
from gevent import monkey;monkey.patch_all()####爲協程打補丁 # monkey.patch_all() import gevent,time def eat(): print('eat 1') time.sleep(2) print('eat 2') def play(): print('play 1 ') time.sleep(3) print('play 2') start = time.time() g1 = gevent.spawn(eat) g2 = gevent.spawn(play) g1.join() g2.join() end = time.time() print(end-start) ##################### eat 1 play 1 eat 2 play 2 3.004676580429077