目錄:html
同步/異步python
異步回調網絡
協成多線程
線程隊列併發
線程的三種狀態:
1.就緒
2.運行
3.阻塞
阻塞和非阻塞描述的是運行的狀態
阻塞 :遇到了IO操做,代碼卡住,沒法執行下一行,CPU會切換到其餘任務
非阻塞 :與阻塞相反,代碼正在執行(運行狀態) 或處於就緒狀態
同步和異步指的是提交任務的方式
同步 :提交任務必須等待任務完成,才能執行下一行
異步 :提交任務不須要等待任務完成,當即執行下一行app
代碼:異步
1 def task(): 2 for i in range(1000000): 3 i += 1000 4 print("11111") 5 6 print("start") 7 task() # 同步提交方式,等函數運行完菜執行下一行 8 print("end") 9 10 from threading import Thread 11 12 print("start1") 13 Thread(target=task).start() # 異步提交,開啓線程,而後去執行以後的代碼,線程內代碼自行執行 14 print("end1")
異步回調:
在發起異步任務後,子進程或子線程完成任務後須要通知任務發起方.經過調用一個函數,all_done_callback(函數名)
爲何須要回調? 子進程幫助主進程完成任務,處理任務的結果應該交還給主進程 其餘方式也能夠將數據交還給主進程 1.shutdown 主進程會等到全部任務完成 2.result函數 會阻塞直到任務完成 都會阻塞,致使效率下降,因此使用回調 注意: 回調函數何時被執行? 子進程任務完成時 誰在執行回調函數? 主進程 線程的異步回調 使用方式都相同,惟一的不一樣是執行回調函數,是子線程在執行(線程間數據共享)
1 # 方式1 本身來保存數據 並執行shutdown 僅在多線程 2 3 res = [] 4 def task(): 5 print("%s is 正在打水" % os.getpid()) 6 time.sleep(0.2) 7 w = "%s 打的水" % os.getpid() 8 res.append(w) 9 return w 10 11 if __name__ == '__main__': 12 for i in range(20): 13 # 提交任務會返回一個對象 用於回去執行狀態和結果 14 f = pool.submit(task) 15 print(f.result()) # 方式2 執行result 它是阻塞的直到任務完成 又變成串行了 16 17 print("11111") 18 # pool.shutdown() # 首先不容許提交新任務 而後等目前全部任務完成後 19 # print(res) 20 print("over") 21 22 ==================================================================================== 23 24 pool = ThreadPoolExecutor() 25 26 # 方式3 經過回調(什麼是回調 任務執行結束後自動調用某個函數) 27 def task(): 28 print("%s is 正在打水" % os.getpid()) 29 # time.sleep(0.2) 30 w = "%s 打的水" % os.getpid() 31 return w 32 33 def task_finish(res): 34 print("打水完成! %s" % res) 35 36 if __name__ == '__main__': 37 for i in range(20): 38 # 提交任務會返回一個對象 用於回去執行狀態和結果 39 f = pool.submit(task) 40 f.add_done_callback(task_finish) #添加完成後的回調 41 print("11111") 42 print("over")
多進程:
1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 2 from threading import current_thread 3 import os 4 # 進程池 5 pool = ProcessPoolExecutor() 6 # 爬蟲:從網絡某個地址獲取一個HTML文件 7 import requests # 該模塊用於網絡(HTTP)請求 8 9 # 生產數據,即生產者 10 def get_data_task(url): 11 print(os.getpid(),"正在生產數據!") 12 # print(current_thread(),"正在生產數據!") 13 14 response = requests.get(url) 15 text = response.content.decode("utf-8") 16 print(text) 17 return text 18 19 # 處理數據,即消費者 20 def parser_data(f): 21 print(os.getpid(),"處理數據") 22 # print(current_thread(), "處理數據") 23 print("正在解析: 長度%s" % len(f.result())) 24 25 urls = [ 26 "http://www.baidu.com", 27 "http://www.baidu.com", 28 "http://www.baidu.com", 29 "http://www.baidu.com" 30 ] 31 32 if __name__ == '__main__': 33 for url in urls: 34 f = pool.submit(get_data_task,url) 35 f.add_done_callback(parser_data) # 回調函數是主進程在執行 36 # 由於子進程是負責獲取數據的,然而數據怎麼處理 ,子進程並不知道.應該把數據還給主進程 37 print("over")
多線程:
1 from concurrent.futures import ThreadPoolExecutor 2 from threading import current_thread 3 # 進程池 4 pool = ThreadPoolExecutor() 5 6 # 爬蟲:從網絡某個地址獲取一個HTML文件 7 import requests # 該模塊用於網絡(HTTP)請求 8 9 # 生產數據 10 def get_data_task(url): 11 # print(os.getpid(),"正在生產數據!") 12 print(current_thread(),"正在生產數據!") 13 14 response = requests.get(url) 15 text = response.content.decode("utf-8") 16 print(text) 17 return text 18 19 # 處理數據 20 def parser_data(f): 21 # print(os.getpid(),"處理數據") 22 print(current_thread(), "處理數據") 23 print("正在解析: 長度%s" % len(f.result())) 24 25 urls = [ 26 "http://www.baidu.com", 27 "http://www.baidu.com", 28 "http://www.baidu.com", 29 "http://www.baidu.com" 30 ] 31 32 if __name__ == '__main__': 33 for url in urls: 34 f = pool.submit(get_data_task,url) 35 f.add_done_callback(parser_data) # 回調函數是主進程在執行 36 # 由於子進程是負責獲取數據的 然而數據怎麼處理 子進程並不知道 應該把數據還給主進程 37 print("over")
普通隊列/堆棧隊列/優先級隊列:socket
import queue # 普通隊列 先進先出 q = queue.Queue() q.put("a") q.put("b") print(q.get()) print(q.get()) # 堆棧隊列 先進後出 後進先出 函數調用就是進棧 函數結束就出棧 遞歸形成棧溢出 q2 = queue.LifoQueue() q2.put("a") q2.put("b") print(q2.get()) # 優先級隊列 q3 = queue.PriorityQueue() # 數值越小優先級越高 優先級相同時 比較大小 小的先取 q3.put((-100, "c")) q3.put((1, "a")) q3.put((100, "b")) print(q3.get())
什麼是協程? 協程指的是單線程下由應用程序級別實現的併發 即把原本由操做系統控制的切換+保存狀態,在應用 程序裏實現了 協程的切換vs操做系統的切換 優勢: 切換速度遠快於操做系統 缺點: 一個任務阻塞了,其他的任務都沒法執行 ps:只有遇到io才切換到其餘任務的協程才能提高 單線程的執行效率 爲什麼用協程? 把單個線程的io降到最低,最大限度地提高單個線程的執行效率 如何實現協程? from gevent import spawn,monkey;monkey.patch_all()
協程的目的是在單線程下實現併發 爲何出現協程? 由於cpython中,因爲GIL而致使同一時間只有一個線程在跑 意味着:若是你的程序時計算密集,多線程效率也不會提高 若是是io密集型 沒有必要在單線程下實現併發,我會開啓多線程來處理io,子線遇到io,cpu切走. 不能保證必定切到主線 若是能夠,我在遇到io的時候轉而去作計算,這樣一來能夠保證cpu一直在處理你的程序,固然處理時間太長也要切走 總結:單線程下實現併發,是將io阻塞時間用於執行計算,能夠提升效率 原理:一直使用CPU直到超時 怎麼實現單線程併發? 併發:指的是看起來像是同時運行,實際是在任務間來回切換,同時須要保存執行的狀態 任務一堆代碼 能夠用函數裝起來 1.如何讓兩個函數切換執行 yield能夠保存函數的執行狀態 經過生成器能夠實現僞併發 併發不必定提高效率,當任務全是計算時,反而會下降效率 2.如何知道發生了io, 從而切換執行? 第三方模塊,gevent 第三方模塊 greenlet 能夠實現併發 可是不能檢測io 第三方模塊 gevent 封裝greenlet 能夠實現單線程併發,而且可以檢測io操做,自動切換
協程的應用場景:
TCP 多客戶端實現方式
1.來一個客戶端就來一個進程 資源消耗較大
2.來一個客戶端就來一個線程 也不能無限開
3.用進程池 或 線程池 仍是一個線程或進程只能維護一個鏈接
4.協程 一個線程就能夠處理多個客戶端 遇到io就切到另外一個
1 # 這是一個進程,默認包含一個主線程 2 import time
#生成器函數 3 def task(): 4 while True: 5 print("task1") 6 time.sleep(1)#I/O,CPU切走 7 yield 1 8 9 def task2(): 10 g = task() 11 while True: 12 try: 13 print("task2") 14 next(g)#next()函數參數傳一個可迭代對象 15 except Exception: 16 print("任務完成") 17 break 18 task2() 19 打印結果: 20 task2 21 task1 22 task2 23 task1 24 task2 25 task1 26 ..........
# 1.實例化greenlet獲得一個對象,傳入要執行的任務,至少須要兩個任務 # 2.先讓某個任務執行起來,使用對象調用switch # 3.在任務的執行過程當中,手動調用switch來切換
1 import greenlet 2 import time 3 def task1(): 4 print("task1 1") 5 time.sleep(2) 6 g2.switch() 7 print("task1 2") 8 g2.switch() 9 10 def task2(): 11 print("task2 1") 12 g1.switch() 13 print("task2 2") 14 15 g1 = greenlet.greenlet(task1) 16 g2 = greenlet.greenlet(task2) 17 18 g1.switch()
# 1.spawn函數傳入你的任務 # 2.調用join 去開啓任務 # 3.檢測io操做須要打monkey補丁,就是一個函數,在程序最開始的地方調用它
1 from gevent import monkey 2 monkey.patch_all() 3 4 import gevent 5 import time 6 def eat(): 7 print('eat food 1') 8 time.sleep(2) 9 print('eat food 2') 10 11 def play(): 12 print('play 1') 13 time.sleep(1) 14 print('play 2') 15 16 g1=gevent.spawn(eat) 17 g2=gevent.spawn(play) 18 19 gevent.joinall([g1,g2]) 20 print('主')
服務端:
1 import gevent 2 from gevent import monkey 3 monkey.patch_all() 4 import socket 5 6 server = socket.socket() 7 # 重用端口 8 server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 9 10 server.bind(("127.0.0.1",9999)) 11 12 server.listen(5) 13 def data_handler(conn): 14 print("一個新鏈接..") 15 while True: 16 data = conn.recv(1024) 17 conn.send(data.upper()) 18 19 while True: 20 conn,addr = server.accept() 21 # 切處處理數據的任務去執行 22 gevent.spawn(data_handler,conn)