同步/異步 異步回調 協成 線程隊列

目錄:html

    同步/異步python

    異步回調網絡

    協成多線程

    線程隊列併發

同步|異步:

線程的三種狀態:
  1.就緒
  2.運行
  3.阻塞
阻塞和非阻塞描述的是運行的狀態
阻塞 :遇到了IO操做,代碼卡住,沒法執行下一行,CPU會切換到其餘任務
非阻塞 :與阻塞相反,代碼正在執行(運行狀態) 或處於就緒狀態

同步和異步指的是提交任務的方式
同步 :提交任務必須等待任務完成,才能執行下一行
異步 :提交任務不須要等待任務完成,當即執行下一行app

代碼:異步

複製代碼
 1 def task():
 2     for i in range(1000000):
 3         i += 1000
 4     print("11111")
 5 
 6 print("start")
 7 task() # 同步提交方式,等函數運行完菜執行下一行
 8 print("end")
 9 
10 from threading import  Thread
11 
12 print("start1")
13 Thread(target=task).start() # 異步提交,開啓線程,而後去執行以後的代碼,線程內代碼自行執行
14 print("end1")
複製代碼

異步回調:任務執行結束後自動調用某個函數

複製代碼
異步回調:
  在發起異步任務後,子進程或子線程完成任務後須要通知任務發起方.經過調用一個函數,all_done_callback(函數名)
爲何須要回調? 子進程幫助主進程完成任務,處理任務的結果應該交還給主進程 其餘方式也能夠將數據交還給主進程 1.shutdown 主進程會等到全部任務完成 2.result函數 會阻塞直到任務完成 都會阻塞,致使效率下降,因此使用回調 注意: 回調函數何時被執行? 子進程任務完成時 誰在執行回調函數? 主進程 線程的異步回調 使用方式都相同,惟一的不一樣是執行回調函數,是子線程在執行(線程間數據共享)
複製代碼

三種方式:

複製代碼
 1 # 方式1 本身來保存數據 並執行shutdown  僅在多線程
 2 
 3 res = []
 4 def task():
 5     print("%s is 正在打水" % os.getpid())
 6     time.sleep(0.2)
 7     w = "%s 打的水" % os.getpid()
 8     res.append(w)
 9     return w
10 
11 if __name__ == '__main__':
12     for i in range(20):
13         # 提交任務會返回一個對象  用於回去執行狀態和結果
14         f = pool.submit(task)
15         print(f.result()) # 方式2   執行result 它是阻塞的直到任務完成  又變成串行了
16 
17     print("11111")
18     # pool.shutdown() # 首先不容許提交新任務 而後等目前全部任務完成後
19     # print(res)
20     print("over")
21 
22 ====================================================================================
23 
24 pool = ThreadPoolExecutor()
25 
26 # 方式3 經過回調(什麼是回調 任務執行結束後自動調用某個函數)
27 def task():
28     print("%s is 正在打水" % os.getpid())
29     # time.sleep(0.2)
30     w = "%s 打的水" % os.getpid()
31     return w
32 
33 def task_finish(res):
34     print("打水完成! %s" % res)
35 
36 if __name__ == '__main__':
37     for i in range(20):
38         # 提交任務會返回一個對象  用於回去執行狀態和結果
39         f = pool.submit(task)
40         f.add_done_callback(task_finish) #添加完成後的回調
41     print("11111")
42     print("over")
複製代碼

利用回調完成生產者消費者:

複製代碼
多進程:
複製代碼
1 from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 2 from threading import current_thread
 3 import  os
 4 # 進程池
 5 pool = ProcessPoolExecutor()
 6 # 爬蟲:從網絡某個地址獲取一個HTML文件
 7 import requests # 該模塊用於網絡(HTTP)請求
 8 
 9 # 生產數據,即生產者
10 def get_data_task(url):
11     print(os.getpid(),"正在生產數據!")
12     # print(current_thread(),"正在生產數據!")
13 
14     response = requests.get(url)
15     text = response.content.decode("utf-8")
16     print(text)
17     return text
18 
19 # 處理數據,即消費者
20 def parser_data(f):
21     print(os.getpid(),"處理數據")
22     # print(current_thread(), "處理數據")
23     print("正在解析: 長度%s" % len(f.result()))
24 
25 urls = [
26     "http://www.baidu.com",
27     "http://www.baidu.com",
28     "http://www.baidu.com",
29     "http://www.baidu.com"
30 ]
31 
32 if __name__ == '__main__':
33     for url in urls:
34         f = pool.submit(get_data_task,url)
35         f.add_done_callback(parser_data)  # 回調函數是主進程在執行
36         # 由於子進程是負責獲取數據的,然而數據怎麼處理 ,子進程並不知道.應該把數據還給主進程
37     print("over")
複製代碼
多線程:
複製代碼
 1 from  concurrent.futures import ThreadPoolExecutor
 2 from threading import current_thread
 3 # 進程池
 4 pool = ThreadPoolExecutor()
 5 
 6 # 爬蟲:從網絡某個地址獲取一個HTML文件
 7 import requests # 該模塊用於網絡(HTTP)請求
 8 
 9 # 生產數據
10 def get_data_task(url):
11     # print(os.getpid(),"正在生產數據!")
12     print(current_thread(),"正在生產數據!")
13 
14     response = requests.get(url)
15     text = response.content.decode("utf-8")
16     print(text)
17     return text
18 
19 #   處理數據
20 def parser_data(f):
21     # print(os.getpid(),"處理數據")
22     print(current_thread(), "處理數據")
23     print("正在解析: 長度%s" % len(f.result()))
24 
25 urls = [
26     "http://www.baidu.com",
27     "http://www.baidu.com",
28     "http://www.baidu.com",
29     "http://www.baidu.com"
30 ]
31 
32 if __name__ == '__main__':
33     for url in urls:
34         f = pool.submit(get_data_task,url)
35         f.add_done_callback(parser_data)  # 回調函數是主進程在執行
36         # 由於子進程是負責獲取數據的  然而數據怎麼處理 子進程並不知道  應該把數據還給主進程
37     print("over")
複製代碼
複製代碼

 線程隊列:

普通隊列/堆棧隊列/優先級隊列:socket

複製代碼
import queue

# 普通隊列 先進先出
q = queue.Queue()
q.put("a")
q.put("b")

print(q.get())
print(q.get())

# 堆棧隊列  先進後出 後進先出  函數調用就是進棧  函數結束就出棧 遞歸形成棧溢出
q2 = queue.LifoQueue()
q2.put("a")
q2.put("b")
print(q2.get())

# 優先級隊列
q3 = queue.PriorityQueue()  # 數值越小優先級越高  優先級相同時 比較大小 小的先取
q3.put((-100, "c"))
q3.put((1, "a"))
q3.put((100, "b"))
print(q3.get())
複製代碼

協程:在單線程下由應用程序級別實現併發

複製代碼
複製代碼
什麼是協程?
    協程指的是單線程下由應用程序級別實現的併發
    即把原本由操做系統控制的切換+保存狀態,在應用
    程序裏實現了

    協程的切換vs操做系統的切換
    優勢:
        切換速度遠快於操做系統
    缺點:
        一個任務阻塞了,其他的任務都沒法執行
    ps:只有遇到io才切換到其餘任務的協程才能提高
        單線程的執行效率

爲什麼用協程?
    把單個線程的io降到最低,最大限度地提高單個線程的執行效率

如何實現協程?
    from gevent import spawn,monkey;monkey.patch_all()
複製代碼
協程的目的是在單線程下實現併發
爲何出現協程? 由於cpython中,因爲GIL而致使同一時間只有一個線程在跑
意味着:若是你的程序時計算密集,多線程效率也不會提高
      若是是io密集型 沒有必要在單線程下實現併發,我會開啓多線程來處理io,子線遇到io,cpu切走.
      不能保證必定切到主線
      若是能夠,我在遇到io的時候轉而去作計算,這樣一來能夠保證cpu一直在處理你的程序,固然處理時間太長也要切走

 總結:單線程下實現併發,是將io阻塞時間用於執行計算,能夠提升效率
 原理:一直使用CPU直到超時
怎麼實現單線程併發?
    併發:指的是看起來像是同時運行,實際是在任務間來回切換,同時須要保存執行的狀態
    任務一堆代碼  能夠用函數裝起來
    1.如何讓兩個函數切換執行
        yield能夠保存函數的執行狀態
        經過生成器能夠實現僞併發
        併發不必定提高效率,當任務全是計算時,反而會下降效率
    2.如何知道發生了io, 從而切換執行?
        第三方模塊,gevent
第三方模塊 greenlet 能夠實現併發 可是不能檢測io
第三方模塊 gevent 封裝greenlet 能夠實現單線程併發,而且可以檢測io操做,自動切換
協程的應用場景:
TCP 多客戶端實現方式
1.來一個客戶端就來一個進程 資源消耗較大
2.來一個客戶端就來一個線程 也不能無限開
3.用進程池 或 線程池 仍是一個線程或進程只能維護一個鏈接
4.協程 一個線程就能夠處理多個客戶端 遇到io就切到另外一個
複製代碼

協成實現:單線程實現併發

 

1.yield 把函數作成生成器,生成器會自動保存狀態

複製代碼
 1 # 這是一個進程,默認包含一個主線程
 2 import time
  #生成器函數 3 def task(): 4 while True: 5 print("task1") 6 time.sleep(1)#I/O,CPU切走 7 yield 1 8 9 def task2(): 10 g = task() 11 while True: 12 try: 13 print("task2") 14 next(g)#next()函數參數傳一個可迭代對象 15 except Exception: 16 print("任務完成") 17 break 18 task2() 19 打印結果: 20 task2 21 task1 22 task2 23 task1 24 task2 25 task1 26 ..........
複製代碼

2.greenlet模塊:幫咱們封裝yield,能夠實現任務切換,可是不能檢測I/O

複製代碼
# 1.實例化greenlet獲得一個對象,傳入要執行的任務,至少須要兩個任務
# 2.先讓某個任務執行起來,使用對象調用switch
# 3.在任務的執行過程當中,手動調用switch來切換
複製代碼
 1 import greenlet
 2 import time
 3 def task1():
 4     print("task1 1")
 5     time.sleep(2)
 6     g2.switch()
 7     print("task1 2")
 8     g2.switch()
 9 
10 def task2():
11     print("task2 1")
12     g1.switch()
13     print("task2 2")
14 
15 g1 = greenlet.greenlet(task1)
16 g2 = greenlet.greenlet(task2)
17 
18 g1.switch()
複製代碼
複製代碼

3.gevent:在greenlet的基礎上封裝檢測io操做,自動切換

複製代碼
# 1.spawn函數傳入你的任務
# 2.調用join 去開啓任務
# 3.檢測io操做須要打monkey補丁,就是一個函數,在程序最開始的地方調用它
複製代碼
 1 from gevent import monkey
 2 monkey.patch_all()
 3 
 4 import gevent
 5 import time
 6 def eat():
 7     print('eat food 1')
 8     time.sleep(2)
 9     print('eat food 2')
10 
11 def play():
12     print('play 1')
13     time.sleep(1)
14     print('play 2')
15 
16 g1=gevent.spawn(eat)
17 g2=gevent.spawn(play)
18 
19 gevent.joinall([g1,g2])
20 print('主')
複製代碼
複製代碼

 協程實現TCP:

複製代碼
服務端:
複製代碼
 1 import gevent
 2 from gevent import monkey
 3 monkey.patch_all()
 4 import socket
 5 
 6 server = socket.socket()
 7 # 重用端口
 8 server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
 9 
10 server.bind(("127.0.0.1",9999))
11 
12 server.listen(5)
13 def data_handler(conn):
14     print("一個新鏈接..")
15     while True:
16         data = conn.recv(1024)
17         conn.send(data.upper())
18 
19 while True:
20     conn,addr = server.accept()
21     # 切處處理數據的任務去執行
22     gevent.spawn(data_handler,conn)
相關文章
相關標籤/搜索