python學習併發編程

1、併發編程理論基礎

  • 併發編程得應用:
    • 網絡應用:爬蟲(直接應用併發編程)
    • 網絡架構django、flask、tornado 源碼-併發編程
    • socket server 源碼-併發編程
  • 計算機操做系統發展史
    • 手工操做-讀穿孔的紙帶、用戶獨佔全機、cpu等待手工操做、cpu利用不充分
    • 批處理-磁帶存儲(聯機批處理{一邊寫入磁帶,一邊從磁帶讀出計算}、脫機批處理、只能運行一個程序,遇到IO就等待閒置
  • 多道程序系統
    • 同時執行多個任務,遇到io就切換,
    • 即空間隔離(多個程序運行),時空複用(IO切換)的特色
  • 分時系統
    • 同時執行多個任務,沒有遇到IO也切換,固定時間片到了就切
    • 時間片輪轉
    • 多路性、交互性、獨立性、及時性
    • 切會浪費cpu時間,下降了cpu效率,但提升了用戶體驗。
  • 實時系統
    • 及時響應
    • 高可靠性
    • 時刻等待響應,即時處理
  • 通用操做系統:
    • 多道批處理
    • 分時
    • 實時
  • 操做系統分類:
    • 我的計算機操做系統
    • 網絡操做系統
    • 分佈式操做系統
  • 操做系統的做用:
    • 是一個協調、管理、和控制計算機硬件資源和軟件資源的控制程序。【調度硬件資源、調用管理軟件】
    • 隱藏了醜陋的硬件調用接口,提供良好的抽象接口
    • 管理、調度進程,並將多個進程對硬件的競爭變得有序
  • I/O操做:(相對內存來講得)
    • 輸入【讀到內存中】: input、f.read、accept、recv、connect
    • 輸出【從內存讀出】:print、f.write、send、connect
    • 文件操做/網絡操做都是IO操做
  • 異步:兩個任務同時運行(並行)
  • 同步:多個任務 串行 【按順序執行】
  • 阻塞:等待 input、accept、recv
  • 非阻塞:不等待,直接執行
  • 並行:多個任務同時執行【多個CPU在同時執行任務】
  • 併發:只有一個CPU,交替執行多個任務【宏觀上同時執行,實際是輪轉】
  • https://www.bughui.com/2017/08/23/difference-between-concurrency-and-parallelism/
  • 程序:沒有運行
  • 進程:運行中得程序、計算機中最小得資源分配單元
  • 進程調度:
    • 多個進程(運行中得程序)在操做系統得控制下被CPU執行,去享用計算機資源
    • 先來先服務調度算法
    • 短做業優先
    • 時間片輪轉
    • 多級反饋隊列
    • 進程調度得過程是不能隨意被程序影響得
  • 進程三大狀態
    • 就緒 程序開始運行,進入就緒隊列,等待cpu分配時間
    • 運行 執行進程,單個時間片內運行完成,就釋放資源,沒有左右完,就又自動切換到就緒隊列等待下次得調度
    • 阻塞 執行中得進程遇到事件入IO等致使沒法執行,進入阻塞狀態,解除後進入就緒隊列等待進程調度
  • 進程與父進程
    • 進程 PID    經過os.getpid()  能夠獲得
    • 父進程 PPID 負責回收一些子進程得資源後才關閉   經過os.getppid()

2、python中的併發編程-進程

  • 包:multiprocessing
    • 是python中操做、管理進程
    • 建立進程、進程同步、進程池、進程之間數據共享
    • python中的進程建立
      • 對Windows來講建立子進程,是將主進程內存空間中的全部變量import子進程內存空間中,(import就會執行文件代碼)
      • windows平臺建立子進程必須放在 if __name__ == '__main__' 下執行(否則就會循環建立子進程致使系統崩潰)
      • 對Linux來講建立子進程,就是將主進程內存空間中全部複製過去,複製不會執行內存空間中的代碼
      • Linux平臺就無所謂了,反正只是複製
    • 建立進程:異步運行

    • multiprocess.Process模塊
    • Process() 由該類實例化獲得得對象,表示一個子進程中得任務(還沒有啓動)
    • p = Process(group,target= func,agrs=('name',),kwargs={},name=)
      • group 值默認始終爲None
      • target 表示調用對象
      • args 調用對象得位置參數 元組!!!
      • kwargs 調用對象得字典
      • name爲子進程得名稱
      • 注:必須用關鍵字方式指定參數、位置參數必須是元組格式,那怕是一個參數也要寫成(n,)
    • 方法:
      • p.start() : 啓動進程,調用操做系統的命令,傳達要建立進程的申請,並調用子進程中的p.run()方法
      • p.run(): 進程啓動時運行的方法,其是真正去調用target指定的函數,自定義類,就須要本身定義run方法
      • p.terminate(): 強制終止進程p
      • p.is_alive(): 查看進程狀態值,True表示運行中
      • p.join(): 主線程等待子進程p運行結束後才運行 阻塞
    • 實操:
    • 複製代碼
       1 import os
       2 import time
       3 from multiprocessing import Process
       4 
       5 
       6 def f(name):
       7     print('in f', os.getpid(), os.getppid())
       8     print('i am is son process', name)
       9 
      10 
      11 if __name__ == '__main__':
      12     p = Process(target=f, args=('小青',))
      13     p.start()   #  start不是運行一個程序,而是調用操做系統的命令,要建立子進程
      14    
      15     print('我是主程序……')
      
      》》》結果:
      我是主程序……
      in f 6016 12312
      i am is son process 小青
      
      結論:在另外一個地方開闢內存空間,執行f函數,主進程不會阻塞等待
      建立子進程
    • 給子進程傳參:Process(target= func ,args = (參1,))必須是元組結構!!!
    •  1 def f(args):
       2     print('in func 2', args, os.getpid(), os.getppid())
       3 
       4 
       5 if __name__ == '__main__':
       6     print('我在主進程中……')
       7     p1 = Process(target=f, args=(666,))
       8     p1.start()
       9     p2 = Process(target=f, args=(777,))
      10     p2.start()
      11     print('我會在子進程前運行……,由於我和他們是隔離的,不會等他們')
      12 
      13 '''
      14 結果:
      15 我在主進程中……
      16 我會在子進程前運行……,由於我和他們是隔離的,不會等他們
      17 in func 2 666 8144 7660
      18 in func 2 777 10820 7660
      19 '''
      代碼演練
    • 建立多個子進程:start()方法不是執行方法,而是建立子進程函數,真正執行任務函數是run()方法
    •  1 def fu(num):
       2     print("我是進程 :%d " % num)
       3 
       4 
       5 if __name__ == '__main__':
       6     print('in main', os.getpid(), os.getppid())
       7     for i in range(10):
       8         Process(target=fu, args=(i,)).start()
       9     print('main  66666')
      10 """
      11 結果:
      12 in main 11984 2064
      13 main  66666
      14 我是進程 :1 
      15 我是進程 :0 
      16 我是進程 :8 
      17 我是進程 :7 
      18 我是進程 :4 
      19 我是進程 :2 
      20 我是進程 :9 
      21 我是進程 :3 
      22 我是進程 :6 
      23 我是進程 :5 
      24 p.start()並無當即執行,而是進入就緒隊列,等帶cpu調度,因此不是有序的
      25 """
      代碼演練
    • join方法:阻塞、等待子進程執行完畢後再執行後面的代碼
    •  1 def g(args):
       2     print("in g", args)
       3 
       4 
       5 if __name__ == '__main__':
       6     print('in main')
       7     p = Process(target=g, args=(888,))
       8     p.start()
       9     p.join()
      10     print('i am in main process')
      11 '''
      12 in main
      13 in g 888
      14 i am in main process
      15 結:join老是等子進程執行完畢後再執行接下來的代碼
      16 '''
      代碼演練
    • 多進程中運用join方法:必須保證全部子進程結束,而操做系統建立子進程順序是不定的,故需遍歷每一個子進程,每一個都進行join一下
    •  1 def pro(i):
       2     print('in func', i, os.getpid(), os.getppid())
       3 
       4 
       5 if __name__ == '__main__':
       6     print('in main', os.getpid(),os.getppid())
       7     p_list = []
       8     for i in range(10):
       9         p = Process(target=pro, args=(i,))
      10         p.start()  # 不是運行一個程序,而是調用操做系統命令,要建立子進程,等待操做系統做業,非阻塞
      11         p_list.append(p)
      12     print(p_list)
      13     for p in p_list:  # 遍歷每一個子進程,每一個join一下,若是該子進程已經接收,join失效至關於pass,遍歷完成就能保證每一個子進程都結束了
      14         p.join()      # 阻塞,直到p這個子進程執行完畢以後再繼續執行
      15     print('主進程……')
      16 '''
      17 in main 1480 2064
      18 [<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>, <Process(Process-6, started)>, <Process(Process-7, started)>, <Process(Process-8, started)>, <Process(Process-9, started)>, <Process(Process-10, started)>]
      19 in func 3 6108 1480
      20 in func 7 13756 1480
      21 in func 5 12548 1480
      22 in func 8 12116 1480
      23 in func 4 10948 1480
      24 in func 6 11744 1480
      25 in func 9 11244 1480
      26 in func 1 3968 1480
      27 in func 2 9412 1480
      28 in func 0 14024 1480
      29 主進程……
      30 '''
      代碼演練
    • p.is_alive() 和 p.terminate() :查看子進程生命狀態及強制結束子進程(terminate是非阻塞,並不會等待子進程完全結束才執行後面的代碼,只是發一信息要終結,就無論了)
    •  1 # p.is_alive方法:查看子進程生存狀態
       2 # p.terminate() 強制結束子進程--非阻塞
       3 def gro(i):
       4     time.sleep(1)
       5     print('in func', i, os.getpid(), os.getppid())
       6 
       7 
       8 if __name__ == '__main__':
       9     print("in main")
      10     p1 = Process(target=gro, args=(1,))
      11     p1.start()
      12     # time.sleep(2)  # 若是等待一下子,就會執行函數,若是不等,就無論操做系統去建子進程,而直接執行後面的代碼,因此可能比建立子進程前就執行了
      13     print(p1.is_alive())  # 檢測子進程是否還在執行任務
      14     p1.terminate()  # 強制結束子進程,非阻塞,不會等待狀態改變,會立刻執行後面代碼
      15     print(p1.is_alive())
      16     print('主進程的代碼執行結束了……')
      17 '''
      18 in main
      19 True
      20 True
      21 主進程的代碼執行結束了……
      22 
      23 結:由於直接執行,主進程執行快些,子進程函數不會執行
      24 '''
      代碼演練
    • 經過面向對象的方法建立子進程(重點:重寫run方法,繼承Process類,繼承父類__init__方法)
    •  1 class Myprocess(Process):
       2     def __init__(self, name):
       3         super().__init__(self)  # 需繼承父類的init方法
       4         self.name = name  # 添加須要本身的屬性
       5 
       6     def run(self):
       7         print(self.name)  # 只有重寫run方法才能將參數傳入
       8         print(os.getppid(), os.getpid())
       9 
      10 
      11 if __name__ == '__main__':
      12     p = Myprocess('小強')
      13     p.start()
      代碼演練  
    • 進程與進程之間內存中的數據是隔離的!!!
      • 進程與進程之間是不能自由的交換內存數據的【內存空間是不能共享的】
      • 全局的變量在子進程中修改,其餘進程是感知不到的【子進程的執行結果父進程是獲取不到的】
      • 進程與進程之間想要同行,必須借用其餘手段,且兩個進程都是自願的【父子進程通訊是經過socket】
    •  1 from multiprocessing import Process
       2 
       3 n = 100
       4 
       5 
       6 def func():
       7     global n
       8     n = n - 1
       9     return 111
      10 
      11 
      12 if __name__ == '__main__':
      13     n_l = []
      14     for i in range(100):
      15         p = Process(target=func)
      16         p.start()
      17         n_l.append(p)
      18     for p in n_l: p.join()
      19     print(n)
      20 
      21 結果爲:100
      22 
      23 總結:說明子進程沒法改變主進程的全局變量,本質是沒法自由通訊,但子進程中的n確定減小了,只是無法拿出來
      代碼演練
    • 守護進程  p.daemon = True
    • 特色:守護進程的生命週期只和主進程的代碼有關係,和其餘子進程沒有關係,會隨着主進程結束而結束
    • 做用:報活,監控主進程生命狀態
    • 主進程建立守護進程
    • 守護進程會在主進程代碼結束後就終止
    • 守護進程內沒法再開子進程,否咋拋出異常 AssertionError: daemonic processes are not allowed to have children
    • 守護進程的屬性,默認是False,若是設置程True,就表示設置這個子進程爲一個守護進程
    • 設置守護進程的操做應該在開子進程以前即p.start()以前
    •  1 from multiprocessing import Process
       2 import time
       3 def func1():
       4     print('begin')
       5     time.sleep(3)
       6     print('wawww')
       7 
       8 # if __name__ == '__main__':
       9 #     p = Process(target=func1)
      10 #     # p.daemon = True
      11 #     p.start()
      12 #     time.sleep(1)
      13 #     print('in main')
      14 '''
      15 結果:
      16 begin
      17 in main
      18 
      19 結論:守護進程隨着主進程結束而結束,那怕守護進程任務沒有執行完畢
      20 '''
      21 
      22 def f1():
      23     print('begin fun1')
      24     time.sleep(3)
      25     print('baidu')
      26 
      27 def f2():
      28     while True:
      29         print('in f2')
      30         time.sleep(0.5)
      31 
      32 if __name__ == '__main__':
      33     Process(target=f1,).start()
      34     p = Process(target=f2)
      35     p.daemon = True
      36     # 守護進程的屬性,默認是False,若是設置成True,就表示設置這個子進程爲一個守護進程
      37     # 設置守護進程的操做應該在開啓子進程以前
      38     p.start()
      39     time.sleep(1)
      40     print('in main')   # 主進程in main執行完後,守護進程就會結束,但主進程並無結束而是等另外一個子進程結束後才結束
      41 
      42 
      43 # 設置成守護進程以後 會有什麼效果呢?
      44 # 守護進程會在主進程的代碼執行完畢以後直接結束,不管守護進程是否執行完畢
      45 
      46 # 應用
      47     # 報活 主進程還活着
      48     # 100臺機器 100個進程  10000進程
      49     # 應用是否在正常工做 - 任務管理器來查看
      50     # 守護進程如何向監測機制報活???send/寫數據庫
      51     # 爲何要用守護進程來報活呢?爲何不用主進程來工做呢???
      52         # 守護進程報活幾乎不佔用CPU,也不須要操做系統去調度
      53         # 主進程不能嚴格的每60s就發送一條信息
      代碼演練
    • 進程同步控制

    • 進程的同步控制 - 進程之間有一些簡單的信號傳遞,可是用戶不能感知,且用戶不能傳遞本身想傳遞的內容
    • 鎖:multiprocessing.Lock   *********  【互斥鎖】
    • lock = Lock()            # 創造一把鎖
    • lock.acquire()           # 獲取了這把鎖的鑰匙、阻塞,鎖未換就一直阻塞着
    • lock.release()           # 歸還這把鑰匙
    • 解決多個進程共享一段數據的時候,數據會出現不安全的現象,經過加鎖來維護數據的安全性,同一刻,只容許一個線程修改數據
    •  1 import json
       2 import time
       3 from multiprocessing import Lock
       4 from multiprocessing import Process
       5 
       6 #
       7 lock = Lock()  # 創造了一把鎖
       8 lock.acquire()  # 獲取了這把鎖的鑰匙
       9 lock.release()  # 歸還這把鑰匙,其餘進程就能夠拿鎖了
      10 
      11 
      12 # 搶票的故事
      13 # 需求:每一個人都能查看餘票、買相同車次票同一刻只能一人買完,另外一人才能買
      14 
      15 
      16 def search(i):
      17     with open('db', encoding='utf-8') as f:
      18         count_dic = json.load(f)
      19         time.sleep(0.2)  # 模擬網絡延遲
      20         print('person %s 餘票:%s 張' % (i, count_dic.get('count')))
      21         return count_dic.get('count'), count_dic  # 返回餘票數,及字典
      22 
      23 
      24 def buy(i):
      25     count, count_dict = search(i)
      26     if count > 0:
      27         count_dict['count'] -= 1  # 有票就能夠買
      28         print('person %s 買票成功'% i)
      29     time.sleep(2)
      30     with open('db', 'w', encoding='utf-8') as f:
      31         json.dump(count_dict, f)  # 更改餘票額度
      32 
      33 
      34 def task(i, lock):
      35     search(i)
      36     lock.acquire()  # 若是以前已經被acquire了 且 沒有被release 那麼進程會在這裏阻塞
      37     buy(i)
      38     lock.release()
      39 
      40 
      41 if __name__ == '__main__':
      42     lock = Lock()
      43     for i in range(1, 11):
      44         Process(target=task, args=(i, lock)).start()
      45 
      46 # 當多個進程共享一段數據的時候,數據會出現不安全的現象,
      47 # 須要加鎖來維護數據的安全性
      48 
      49 '''
      50 D:\install\Python36\python.exe D:/install/project/七、併發編程/三、鎖.py
      51 person 6 餘票:5 張
      52 person 5 餘票:5 張
      53 person 8 餘票:5 張
      54 person 2 餘票:5 張
      55 person 4 餘票:5 張
      56 person 10 餘票:5 張
      57 person 1 餘票:5 張
      58 person 9 餘票:5 張
      59 person 3 餘票:5 張
      60 person 7 餘票:5 張
      61 person 6 餘票:5 張
      62 person 6 買票成功
      63 person 5 餘票:4 張
      64 person 5 買票成功
      65 person 8 餘票:3 張
      66 person 8 買票成功
      67 person 2 餘票:2 張
      68 person 2 買票成功
      69 person 4 餘票:1 張
      70 person 4 買票成功
      71 person 10 餘票:0 張
      72 person 1 餘票:0 張
      73 person 9 餘票:0 張
      74 person 3 餘票:0 張
      75 person 7 餘票:0 張
      76 
      77 Process finished with exit code 0
      78 '''
      代碼演練-搶票的故事

      注:進程間的數據交互,本質也用到了socket通訊,不過都是本地的,基於文件的,能夠經過將py名寫成socket來看報錯得知。node

    • 1 #加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
      2 雖然能夠用文件共享數據實現進程間通訊,但問題是: 3 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 4 2.須要本身加鎖處理 5 
      6 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
      7 隊列和管道都是將數據存放於內存中 8 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 9 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
    • 信號量:multiprocessing.Semaphore
    • 資源有限,同時容許必定數量的進程訪問修改數據,即一把鎖對應多把鑰匙
    • 本質:鎖+ 計數器  
    •  1 互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
       2 假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
       3 實現:
       4 信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
       5 信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
       6 
       7 
       8 import time, random
       9 from multiprocessing import Semaphore, Process
      10 
      11 # ktv 只有4個房間,即同時只能四我的進去,其餘人必須等其中的人出來才能進去
      12 #
      13 # sem = Semaphore(4)  # 設置信號量個數,併發數
      14 # sem.acquire()
      15 # print('進去1我的,關門阻塞中')
      16 # sem.acquire()
      17 # print('進去第2我的,關門阻塞中')
      18 # sem.acquire()
      19 # print('進去第3我的,關門阻塞中')
      20 # sem.acquire()
      21 # print('進去第4我的,關門阻塞中')
      22 # sem.release()  # 必須歸還一把,才能繼續下面的代碼,否則一直阻塞中
      23 # sem.acquire()
      24 # print(6666)
      25 # sem.release()
      26 '''
      27 D:\install\Python36\python.exe D:/install/project/七、併發編程/四、信號量.py
      28 進去1我的,關門阻塞中
      29 進去第2我的,關門阻塞中
      30 進去第3我的,關門阻塞中
      31 進去第4我的,關門阻塞中
      32 6666
      33 
      34 Process finished with exit code 0
      35 '''
      36 
      37 
      38 def ktv(num, sem):
      39     sem.acquire()
      40     print('person %s 進入了ktv' % num)
      41     time.sleep(random.randint(1, 4))
      42     print('person %s 進出了ktv' % num)
      43     sem.release()
      44 
      45 
      46 if __name__ == '__main__':
      47     sem = Semaphore(4)
      48     for i in range(10):
      49         Process(target=ktv, args=(i, sem)).start()
      50 
      51 '''
      52 最開始是4個同時進入,以後又人出,纔能有人進
      53 D:\install\Python36\python.exe D:/install/project/七、併發編程/四、信號量.py
      54 person 2 進入了ktv
      55 person 8 進入了ktv
      56 person 9 進入了ktv
      57 person 7 進入了ktv
      58 person 2 進出了ktv
      59 person 6 進入了ktv
      60 person 7 進出了ktv
      61 person 5 進入了ktv
      62 person 8 進出了ktv
      63 person 1 進入了ktv
      64 person 9 進出了ktv
      65 person 0 進入了ktv
      66 person 1 進出了ktv
      67 person 4 進入了ktv
      68 person 6 進出了ktv
      69 person 3 進入了ktv
      70 person 0 進出了ktv
      71 person 5 進出了ktv
      72 person 4 進出了ktv
      73 person 3 進出了ktv
      74 
      75 Process finished with exit code 0
      76 '''
      代碼演練-kvt的故事
    • 事件:multiprocessing.Eventpython

    • 定義:全局定義了一個‘flag’,若是標誌爲False,當程序執行event.wait方法時就會阻塞,若是爲True,那麼event.wait方法時便再也不阻塞。redis

    • 做用:主進程控制其餘線程的執行,實現兩個或多個線程間的交互
    • 使用:
      • 事件創立之初,默認是Faslse,即阻塞狀態
      • e = Event()            # 建立事件,默認False
      • print(e.is_set())     #  查看狀態
      • e.set()                   #   將標誌設置爲True
      • e.clear()                #   將標誌設置爲False
      • e.wait()                 #    等待,當標誌爲False,那麼阻塞,當標誌爲True,那麼非阻塞,wait什麼也不作直接pass
      • e.wait(timeout=10)     # 若是信號在阻塞10s以內變爲True,那麼就不繼續阻塞直接pass,若是就阻塞10s以後狀態仍是沒變,那麼繼續阻塞
      •  1 from multiprocessing import Process, Event
         2 import time, random
         3 
         4 
         5 def car(e, n):
         6     while True:
         7         if not e.is_set():  # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
         8             print('\033[31m紅燈亮\033[0m,car%s等着' % n)
         9             e.wait()    # 阻塞,等待is_set()的值變成True,模擬信號燈爲綠色
        10             print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
        11             time.sleep(random.randint(3, 6))
        12             if not e.is_set():   #若是is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
        13                 continue
        14             print('車開遠了,car', n)
        15             break
        16 
        17 
        18 def police_car(e, n):
        19     while True:
        20         if not e.is_set():# 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
        21             print('\033[31m紅燈亮\033[0m,car%s等着' % n)
        22             e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s以後沒有等到綠燈就闖紅燈走了
        23             if not e.is_set():
        24                 print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
        25             else:
        26                 print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
        27         break
        28 
        29 
        30 
        31 def traffic_lights(e, inverval):
        32     while True:
        33         time.sleep(inverval)
        34         if e.is_set():
        35             print('######', e.is_set())
        36             e.clear()  # ---->將is_set()的值設置爲False
        37         else:
        38             e.set()    # ---->將is_set()的值設置爲True
        39             print('***********',e.is_set())
        40 
        41 
        42 if __name__ == '__main__':
        43     e = Event()
        44     for i in range(10):
        45         p=Process(target=car,args=(e,i,))  # 建立是個進程控制10輛車
        46         p.start()
        47 
        48     for i in range(5):
        49         p = Process(target=police_car, args=(e, i,))  # 建立5個進程控制5輛警車
        50         p.start()
        51     t = Process(target=traffic_lights, args=(e, 10))  # 建立一個進程控制紅綠燈
        52     t.start()
        53 
        54     print('============》')
        代碼演練-紅綠燈的故事
    • 進程間通訊-隊列和管道

    • IPC 進程之間的通訊 multiprocessing.Queue/Pipe:進程之間的內存是不共享的,隔離的,但隊列、管道有使之通訊的功能。
    • 隊列 multiprocessing.Queue
    • from queue import Queue
      # 隊列  先進先出FIFO,有序
      # 應用:維護秩序的時候用的比較多,買票,搶票
      q = Queue(5)  # 設置隊列的長度,即元素個數,即只能放入5個元素
      ret = q.qsize()  # 得到當前隊列中的元素個數,此方法不許,在多進程中,此刻獲取結果時,也許其餘進程向裏面加入了元素
      q.put(1111)  # 向隊列中放入對象,若是隊列已滿,則阻塞等待,一直到空間可用爲止
      '''
      參數:
      item:項目、元素、對象
      block:默認True,隊列滿一直等待阻塞,False則爲不阻塞,滿則直接主動自定義報錯
      timeout:阻塞等待的時間,時間到了,還不能放,則報錯Queue.Empty異常
      '''
      q.put_nowait(2222)  # 放入元素,滿了,不等直接報錯
      q.get()  # 返回q即隊列中的元素,隊列中爲空,則阻塞一直等待有值爲止,通向可設置timeout
      q.get_nowait()  # 隊列爲空時,直接報錯
      q.empty()  # 判斷是否爲空,空則返回True,一樣在多進程中不許
      q.full()   # 判斷是否滿了,滿了則返回True,多進程中不許,主要是進程是異步操做
      數據類型中的隊列及隊列中的方法,在進程、線程中通用
    • 定義:先進先出、有序
    • 本質:管道 + 鎖
    • 做用:因爲先進先出的特色+進程通訊的功能+數據進程安全,常常用它來完成進程之間的通訊
    • # 例子1  一進程放   一進程取
      from multiprocessing import Queue, Process
      
      
      def con(q):
          print(q.get())  # 從隊列中拿,沒有直到等到有,因此那麼它比其餘進程快,最後也能拿到數據
      
      
      def pro(q):
          q.put(112)   # 向隊列中放入112
      
      
      if __name__ == '__main__':
          q = Queue()
          p = Process(target=con, args=(q,))
          p.start()
          p = Process(target=pro, args=(q,))
          p.start()
          print('我在主進程中……')
      
      '''
      看出隊列能夠實現進程間的通訊
      '''
      # 主放, 子取
      from multiprocessing import Queue, Process
      def f(q):
          print(q.get())
      
      if __name__ == '__main__':
          q = Queue()
          Process(target=f, args=(q,)).start() # create son_process
          q.put(666)
      '''看出主進程可和子進程通訊'''
      代碼演練-主子通訊/子子通訊
    • 生成者消費者模型:可解決大部分併發問題
    • 併發中的問題:
      • 生產數據快,消費數據慢,內存空間的浪費,【產生的數據不能丟只能放在內存中等着處理】 
      • 生產數據慢,消費數據快,效率低下 【老是要等着生產數據,啥也幹不了】
    • 做用:- 解決創造(生成)數據和處理(消費)數據的效率不平衡問題
    • 實現:將創造數據和處理數據放在不一樣的進程中,根據他們的效率來調整進程的個數
    •  1 import time
       2 import random
       3 from multiprocessing import Process, Queue
       4 
       5 
       6 def consumer(q,name):
       7     while 1:
       8         food = q.get()
       9         print('%s 吃了 %s ' % (name, food))
      10         time.sleep(random.random())
      11 
      12 
      13 def producer(q,name,food,n=10):
      14     for i in range(1, n):  # 定義生產10個食物
      15         time.sleep(random.random())  # 模擬生產慢,消費快
      16         fd = food + str(i)
      17         print('%s 生產了 %s' %(name,fd))
      18         q.put(fd)
      19 
      20 if __name__ == '__main__':
      21     q = Queue(10)
      22     for person in range(6):  # 定義消費者多
      23         Process(target=consumer, args=(q, 'person'+ str(person))).start()
      24     Process(target=producer, args=(q,'小強','米飯')).start()
      25     Process(target=producer, args=(q,'小東','麪條')).start()
      26     Process(target=producer, args=(q,'小hua','麪條')).start()
      27     Process(target=producer, args=(q,'小cai','麪條')).start()
      代碼演練-生產者生產完告終束,而消費者一直在get阻塞中,待解決
    • 問題:
      消費者一直在等着拿數據,生產者生產完了就結束了,生產者須要告訴消費者生產完了才合理,即向隊列中放入stop信號
    • 重點:消費者有多少個,就必需要發多少個stop信號
    • 解決:
      •   方案一:生產者子進程中發,缺陷:但生產者數必須和消費者數同樣,消費者進程才能所有關閉
      •  1 def consumer(q,name):
         2     while 1:
         3         food = q.get()
         4         if food == 'stop':break
         5         print('%s 吃了 %s ' % (name, food))
         6         time.sleep(random.random())
         7 
         8 
         9 def producer(q,name,food,n=10):
        10     for i in range(1, n):  # 定義生產10個食物
        11         time.sleep(random.random())  # 模擬生產慢,消費快
        12         fd = food + str(i)
        13         print('%s 生產了 %s' %(name,fd))
        14         q.put(fd)
        15     q.put('stop')
        16 
        17 if __name__ == '__main__':
        18     q = Queue(10)
        19     for person in range(4):  # 定義消費者多
        20         Process(target=consumer, args=(q, 'person'+ str(person))).start()
        21     Process(target=producer, args=(q,'小強','米飯')).start()
        22     Process(target=producer, args=(q,'小東','麪條')).start()
        23     Process(target=producer, args=(q,'小hua','麪條')).start()
        24     Process(target=producer, args=(q,'小cai','麪條')).start()
        25 
        26 #  生產者加入結束信號,消費者收到後就結束,不存在還有進程在使用數據,由於隊列是先進先出的
        27 
        28 # 且有多少個消費者就須要發多少個stop信號,否則就會致使有的的進程還在等待中
        代碼演練
      •   方案二:主進程中發,但必需要等生產者都結束,那麼就加入p.join() ,仍是得有多少個消費者,發多少個stop信號,沒法自動
      •  1 from multiprocessing import Queue, Process
         2 import random, time
         3 
         4 
         5 def consumer(q, name):
         6     while 1:
         7         food = q.get()
         8         if food == 'stop': break
         9         print('%s 吃了 %s ' % (name, food))
        10         time.sleep(random.random())
        11 
        12 
        13 def producer(q, name, food, n=10):
        14     for i in range(1, n):  # 定義生產10個食物
        15         time.sleep(random.random())  # 模擬生產慢,消費快
        16         fd = food + str(i)
        17         print('%s 生產了 %s' % (name, fd))
        18         q.put(fd)
        19 
        20 
        21 if __name__ == '__main__':
        22     q = Queue(10)
        23     for person in range(4):  # 定義消費者4個
        24         Process(target=consumer, args=(q, 'person'+ str(person))).start()
        25     p1 = Process(target=producer, args=(q,'小強','米飯'))
        26     p1.start()
        27     p2 = Process(target=producer, args=(q,'小東','麪條'))
        28     p2.start()
        29     p1.join()   # 保證p生產者結束
        30     p2.join()
        31     q.put('stop')   # 必須得發四個stop信號
        32     q.put('stop')
        33     q.put('stop')
        34     q.put('stop')
        代碼演練
      •       方案三:
      • JoinableQueue([maxsize])
        建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 
      • 1 JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:
        2 
        3 q.task_done() 
        4 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。
        5 
        6 q.join() 
        7 生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 
        8 下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
        JoinableQueue使用方法
         1 from multiprocessing import Process,JoinableQueue
         2 import time,random,os
         3 def consumer(q):
         4     while True:
         5         res=q.get()
         6         time.sleep(random.randint(1,3))
         7         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
         8         q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了
         9 
        10 def producer(name,q):
        11     for i in range(10):
        12         time.sleep(random.randint(1,3))
        13         res='%s%s' %(name,i)
        14         q.put(res)
        15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
        16     q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。
        17 
        18 
        19 if __name__ == '__main__':
        20     q=JoinableQueue()
        21     #生產者們:即廚師們
        22     p1=Process(target=producer,args=('包子',q))
        23     p2=Process(target=producer,args=('骨頭',q))
        24     p3=Process(target=producer,args=('泔水',q))
        25 
        26     #消費者們:即吃貨們
        27     c1=Process(target=consumer,args=(q,))
        28     c2=Process(target=consumer,args=(q,))
        29     c1.daemon=True
        30     c2.daemon=True
        31 
        32     #開始
        33     p_l=[p1,p2,p3,c1,c2]
        34     for p in p_l:
        35         p.start()
        36 
        37     p1.join()
        38     p2.join()
        39     p3.join()
        40     print('') 
        41     
        42     #主進程等--->p1,p2,p3等---->c1,c2
        43     #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
        44     #於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
        代碼演練
      • 消費者每消費處理完一個數據,就向生產者發送消息,經過q.task_done
      • 生產者每生產一個數據就在計數器+1,每接到消費者發得q.task_done,就-1,消費者消費完了,生產者計數也爲0了,故纔會結束,經過q.join()
      • 爲了讓做業完成,程序能關閉,等生產者結束,主進程也跟着關閉便可,即生成者.join()一下便可
      • 那麼就能夠把消費者設置成守護進程,主進程結束,守護進程也就結束了,這樣就ok了。
      • ==》消費者消費完畢
      • ==》生產者結束
      • ==》主進程代碼結束,從而守護進程進程結束
      • ==》消費者結束
      • 注:q.join() 只是保證生產者結束便可,那麼在子進程join和主進程中join就同樣了。
      • 注:棧 ==> 先進後出----算法算法

    • 管道:multiprocessing.Pipe
      數據庫

    • 定義:IPC通訊的一種機制,隊列就是基於管道來完成的通訊的,可是管道是原生的通訊方式,在進程之間會產生數據不安全的狀況,須要本身手動加鎖來處理,管道在數據傳輸過程當中,還涉及到一個端口管理,這個須要咱們在代碼中作處理才能使代碼更完善。
    • 管道使用中待解決的問題:
      • 一、多進程通訊數據不安全
      • 答:不安全是因多個進程可能會在同一刻同時去取同一個數據,也可能同一刻拿走一個數據,位置就空了,別的應該放在空位置上,
                而因同一刻時,另外一個進程會認爲該位置上有數據,就放在後面了,這就會致使數據異常,解決方法就是在拿數據前加鎖,拿完歸還鎖。django

      • 二、端口問題,生產者或消費者沒有使用的端口須要關閉,否則,消費者就會認爲還有數據要接收,就會一直recv中,所以就須要生成者關閉數據的輸出端,對應的消費者就須要關閉數據的輸入端,才能保證當消費者recv不到數據了經過報錯EOFerror而中止。
    •  1 from multiprocessing import Pipe, Process
       2 
       3 
       4 # 管道
       5 # 隊列是基於管道實現的
       6 # 隊列 進程間數據安全的
       7 # 管道 進程間數據不安全的
       8 # 隊列 = 管道 + 鎖
       9 
      10 # left, right = Pipe()
      11 # print(right.recv())
      12 # (<multiprocessing.connection.PipeConnection object at 0x000002817A7FB128>, <multiprocessing.connection.PipeConnection object at 0x000002817A65C128>)
      13 # 管道對象返回的是一個元組
      14 
      15 
      16 # 全部的IPC通訊都是經過socket實現的
      17 
      18 # 左邊放,右邊出,一樣能夠左收,右發,全雙工模式
      19 
      20 # 管道必須在建立進程前建立
      21 def consumer(left, right):
      22     left.close()  # 消費者用右邊接那麼就把左邊關閉
      23     while 1:
      24         try:
      25             print(right.recv())
      26         except EOFError:  # 再也接不到數據了,從而報錯,才能退出
      27             break
      28 
      29 
      30 if __name__ == '__main__':
      31     left, right = Pipe()
      32     p = Process(target=consumer, args=(left, right))
      33     p.start()
      34     right.close()  # 用右邊發送,那麼左邊就關閉
      35     for i in range(1, 11):
      36         left.send(3333)
      37     left.close()  # 不用了就關閉
      38 
      39 # EOF異常的觸發
      40     # 在這一個進程中 若是不在用這個端點了,應該close
      41     # 這一在recv的時候,若是其餘端點都被關閉了,就可以知道不會在有新的消息傳進來
      42     # 此時就不會在這裏阻塞等待,而是拋出一個EOFError
      43     # * close並非關閉了整個管道,而是修改了操做系統對管道端點的引用計數的處理
      代碼演練-相關問題注意
    • 進程間數據共享

    • 進程之間數據共享
      消息傳遞的併發是趨勢
      線程是經過線程集合,用消息隊列來交換數據
      進程間應儘可能避免通訊,由於可能不安全,想安全就必須加鎖,加鎖就會影響效率。
      redis分佈式、數據庫解決進程之間數據共享問題
      '''
      進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
      雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此
      '''
      # Manager模塊
      # 全部的數據類型 都可以進行數據共享
      # 一部分都是不加鎖 不支持數據進程安全
      # 不安全的解決辦法 加鎖
    •  1 from multiprocessing import Manager,Process,Lock
       2 def work(d,lock):
       3     with lock:
       4         d['count'] -= 1
       5 
       6 if __name__ == '__main__':
       7     lock = Lock()
       8     m = Manager()
       9     dic = m.dict({'count':100})
      10     p_l = []
      11     for i in range(100):  # 開了100進程
      12         p = Process(target=work, args=(dic, lock))
      13         p_l.append(p)
      14         p.start()
      15     for p in p_l:
      16         p.join()
      17     print(dic)
      18 '''
      19 結果:{'count':0}
      20 '''
      21 
      22 # with as 的機制
      23 # __enter__
      24 # __exit__
      代碼演練
    • 進程池:multiprocessing.Pool

    • 概念:
      • 幾個CPU就能同時運行幾個進程,並行, 進程的個數不是無限開啓的 會給操做系統調度增長負擔【開啓進程慢】
      • 且真正能被同時執行的進程最多也就和CPU個數相同等
      • 進程的開啓和銷燬都要消耗資源和時間
      • 進程池中的進程不會重複開啓和關閉,而是一直在那被使用,減小時間及操做系統調度的開銷
      • 進程池中的進程數保持一致,保證同一時間最多有固定數量的進程再運行,減小操做系統調度難度的同時實現併發
      • 若是任務數大於池子中進程數,就只能等着,有點像信號量,但信號量進程是新建的,而進程池中的進程一直在
    • 使用場景:
      • 面向高計算型的場景,沒有或比較少IO型的程序 採用多進程
      • 但願並行 最充分的使用CPU
    • 對象方法:
      • p = Pool(num)    建立進程池,指定池中進程數
      • p.apply(func,args=(i,) )  同步,會等func返回值後,阻塞,下一個任務才能繼續
      • p.apply_async(func,args=(j,)) 異步
      • p.close()  不是關閉進程池中的進程讓進程不工做了,而是關閉進程池,讓任務再也不提交,已經接到的任務繼續執行直至執行結束
      • p.join()   等待進程任務中的全部任務都執行完畢
    • 建立一個進程池
    •  1 import os
       2 import time
       3 from multiprocessing import Pool
       4 print(os.cpu_count())  # 獲取cpu個數
       5 
       6 
       7 def wahaha():
       8     time.sleep(1)
       9     print(os.getpid())
      10     return True
      11 
      12 
      13 if __name__ == '__main__':
      14     p = Pool(5)   # 進程池中進程數通常爲cpu個數或者cpu+1,不要超過10個
      15     for i in range(20):
      16         # p.apply(func=wahaha)     # 同步,通常不用,還不如一個進程去循環作,進程池有返回值,基於ipc通訊,本身能夠經過q來通訊
      17         p.apply_async(func=wahaha) # async   異步的
      18     p.close() # 關閉進程池,進程池中的進程不工做了,讓任務不能再繼續提交了,
      19     p.join()  # 等待這個池中提交的任務都執行完,就結束
      代碼演練
    • 異步提交,不獲取返回值編程

    • def wahaha():
          time.sleep(1)
          print(os.getpid())
      
      if __name__ == '__main__':
          p = Pool(5)  # CPU的個數 或者 +1
          ret_l = []
          for i in range(20):
             ret = p.apply_async(func = wahaha) # async  異步的
             ret_l.append(ret)
          p.close()  # 關閉 並非進程池中的進程不工做了
                     # 而是關閉了進程池,讓任務不能再繼續提交了
          p.join()   # 等待這個池中提交的任務都執行完
          # # 表示等待全部子進程中的代碼都執行完 主進程才結束
      代碼演練
    • 異步提交,得到返回值,等待全部任務都執行完畢以後再統一獲取結果json

    •  1 # 異步提交,獲取返回值,等待全部任務都執行完畢以後再統一獲取結果
       2 def wahaha():
       3     time.sleep(1)
       4     print(os.getpid())
       5     return True
       6 
       7 if __name__ == '__main__':
       8     p = Pool(5)  # CPU的個數 或者 +1
       9     ret_l = []
      10     for i in range(20):
      11        ret = p.apply_async(func = wahaha) # async  異步的
      12        ret_l.append(ret)
      13     p.close()  # 關閉 不是進程池中的進程不工做了
      14                # 而是關閉了進程池,讓任務不能再繼續提交了
      15     p.join()   # 等待這個池中提交的任務都執行完
      16     for ret in ret_l:
      17         print(ret.get())
      代碼演練
    • 異步提交,得到返回值,一個任務執行完畢以後就能夠獲取到一個結果(順序是按照提交任務的順序)【用在任務關聯不大的時候】flask

    •  1 def wahaha():
       2     time.sleep(1)
       3     print(os.getpid())
       4     return True
       5 
       6 if __name__ == '__main__':
       7     p = Pool(5)  # CPU的個數 或者 +1
       8     ret_l = []
       9     for i in range(20):
      10        ret = p.apply_async(func = wahaha) # async  異步的
      11        ret_l.append(ret)
      12     for ret in ret_l:
      13         print(ret.get())
      代碼演練,不用p.close()和p.join(),ret.get()會阻塞主進程按順序一個個獲取結果
    • 總結bootstrap

    •  1 # 異步的 apply_async
       2 # 1.若是是異步的提交任務,那麼任務提交以後進程池和主進程也異步了,
       3     #主進程不會自動等待進程池中的任務執行完畢
       4 # 2.若是須要主進程等待,須要p.join
       5     # 可是join的行爲是依賴close
       6 # 3.若是這個函數是有返回值的
       7     # 也能夠經過ret.get()來獲取返回值
       8     # 可是若是一邊提交一遍獲取返回值會讓程序變成同步的
       9     # 因此要想保留異步的效果,應該講返回對象保存在列表裏,全部任務提交完成以後再來取結果
      10     # 這種方式也能夠去掉join,來完成主進程的阻塞等待池中的任務執行完畢
      總結
    • 進程池解決原生socket,同一時刻只能和一個客戶端鏈接【這種方式的弊端是同時最多隻能和進程池中的數量相同,其它用戶等待】

    •  1 import socket
       2 from multiprocessing import Pool
       3 
       4 
       5 def talk(conn):
       6     try:
       7         while 1:
       8             msg = conn.recv(1024).decode('utf-8')
       9             print(msg)
      10             conn.send(b'hello')
      11     finally:
      12         conn.close()
      13 
      14 
      15 if __name__ == '__main__':
      16     sk = socket.socket()
      17     sk.bind(('127.0.0.1', 9999))
      18     sk.listen()
      19     pool = Pool(5)
      20     try:
      21         while 1:
      22             conn, addr = sk.accept()
      23             pool.apply_async(talk, args=(conn,))
      24     finally:
      25         conn.close()
      26         sk.close()
      server
    •  1 import socket
       2 
       3 ip_port = ('127.0.0.1', 9999)
       4 sk = socket.socket()
       5 sk.connect(ip_port)
       6 
       7 while 1:
       8     msg = input('>>>>:').strip()
       9     if len(msg) == 0: continue
      10     sk.send(msg.encode('utf-8'))
      11     content = sk.recv(1024).decode('utf-8')
      12     print(content)
      client
    • Pool中的回調函數callback

    • 定義:將一個進程的的執行結果的返回值,會當callback參數來執行callback函數,從而減小了ret.get()等I/O操做浪費的時間了。
    • 做用:進程池中任何一個任務一旦處理完了,就當即告知主進程,我已執行完畢,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數就是回掉函數。
    • 重點:回調函數是主進程中執行【若是有兩個任務,個人第二個任務在第一個任務執行完畢以後可以當即被主進程執行】
    •  1 import os
       2 import time
       3 import random
       4 from multiprocessing import Pool
       5 
       6 # 異步提交,獲取返回值,從頭至尾一個任務執行完畢以後就能夠獲取到一個結果
       7 def wahaha(num):
       8     time.sleep(random.random())
       9     print('pid : ',os.getpid(),num)
      10     return num
      11 
      12 def back(arg):
      13     print('call_back : ',os.getpid(),arg)
      14 
      15 if __name__ == '__main__':
      16     print('主進程',os.getpid())
      17     p = Pool(5)  # CPU的個數 或者 +1
      18     for i in range(20):
      19        ret = p.apply_async(func = wahaha,args=(i,),callback=back) # async  異步的
      20     p.close()
      21     p.join()
      22 
      23 # 回調函數 _ 在主進程中執行
      24 # 在發起任務的時候 指定callback參數
      25 # 在每一個進程執行完apply_async任務以後,返回值會直接做爲參數傳遞給callback的函數,執行callback函數中的代碼
      26 
      27 
      28 # 北京  30min   30min +5min   5min + 5min
      29 # 建設  20min   直接辦 + 5min  5min
      30 # 中國  1h      20min +5       25min + 5min
      31 # 農業  2h      55min + 5min   55min + 5min
      32 # 工商  15min   5min            15min + 5min
      33 
      34 # 2h10min
      35 # 2h05min
      代碼演練

3、python中的併發編程-線程

  • 1 程序:是指令的集合,它是進程運行的靜態描述文本
    2 進程:是程序的一次執行活動,是動態概念,計算機中最小資源分配單位
    3 線程:cpu調度的最小單位,從屬於進程,任務的實際執行者
    程序、進程、線程
  • 進程:計算機中最小的資源分配單位,在利用多個cpu執行的過程當中,對多個程序的資源進行管理和隔離
    • 優勢:實現併發,多進程異步執行多個任務,內存地址隔離,IPC通訊經過Queue,Pipe
    • 缺陷:創立、撤銷、切換都會有較大的時空開銷,過多的進程會形成操做系統調度的壓力,同一時刻,只能運行一個任務
  • 線程:cpu 調度的最小單位
  • 特色:一、輕型實體(包含程序、數據、tcb,基本不佔資源)
  •            二、獨立調度和分派的基本單位(線程與線程的切換很是快,開銷小)
  •            三、共享進程資源,同一進程內線程間可相互通訊
  •            四、併發執行
  • 進程與線程區別
    •   一、線程屬於進程、進程負責獲取操做兄臺那個分配的資源、線程負責任務執行
    •   二、每一個進程中至少有一個線程
    •   三、地址空間和其餘資源,多進程之間內存相互隔離,多線程間共享同一進程數據
    •   四、通訊,進程間不能自由通訊,通訊需藉助管道,隊列等工具,線程能夠直接讀寫,自由通訊,共用主進程Id
    •   五、調度和切換,多進程開啓、結束時間開銷大、切換效率低,多線程開銷小、切換效率高
    •   六、多線程操做系統中,進程不是一個可執行的實體
  • cpython 解釋器下有全局解釋器鎖
    •  在同一個進程中的多個線程在同一時刻只能有一個線程訪問cpu,致使多線程沒法並行處例任務,利用多核處理器
    • 注:程序計算的時候纔會用到cpu,sleep或I/O阻塞的時候是不會用到CPU的
    • jpython/pypy解釋器沒有全局解釋器鎖  
  • python線程管理:threading模塊 

  • 建立線程:共享主進程資源,故每一個線程的進程ID同樣,但各個線程有本身的id經過get_ident(),需導入類get_ident
    •  1 # 經過類Thread                                                                                                                                     
       2 import os                                                                                                                                       
       3 import time                                                                                                                                     
       4 from threading import Thread,get_ident                                                                                                          
       5 def func(name):                                                                                                                                 
       6     time.sleep(0.1)                                                                                                                             
       7     print('線程:%s,該線程的進程id爲:%s,線程id爲:%s' %(name,os.getpid(),get_ident()))   # get_ident()爲類get_ident中的方法,做用是獲取線程id                               
       8 for i in range(10):                                                                                                                             
       9     t = Thread(target=func, args=(i,))                                                                                                          
      10     t.start()                                                                                                                                   
      11                                                  
      12 》》》》結果:
      13 線程:3,該線程的進程id爲:14140,線程id爲:13352
      14 線程:0,該線程的進程id爲:14140,線程id爲:1672
      15 線程:1,該線程的進程id爲:14140,線程id爲:15808
      16 線程:2,該線程的進程id爲:14140,線程id爲:10136
      17 線程:7,該線程的進程id爲:14140,線程id爲:4508
      18 線程:6,該線程的進程id爲:14140,線程id爲:15068
      19 線程:4,該線程的進程id爲:14140,線程id爲:13532
      20 線程:5,該線程的進程id爲:14140,線程id爲:10836
      21 線程:9,該線程的進程id爲:14140,線程id爲:9520
      22 線程:8,該線程的進程id爲:14140,線程id爲:15568
      代碼演練-方式一:經過類Thread
    •  1 # 自定義類建立多線程   【繼承Thread類,同時重些run方法,如需添加本身的屬性或者說加入參數繼承父類init方法】
       2 import os
       3 from threading import Thread, get_ident
       4 
       5 
       6 class Mythread(Thread):
       7     def __init__(self, args):
       8         super().__init__()
       9         self.args = args
      10 
      11     def run(self):
      12         """
      13         執行函數
      14         :return:
      15         """
      16         print(self.args)  # 就能夠調用本身自定義的屬性了
      17         print('in thread 子線程Id:', get_ident(), '進程Id:', os.getpid())
      18 
      19 
      20 print("父進程python解釋器:", os.getppid())
      21 print('進程即執行py文件:', os.getpid())
      22 print('主線程:', get_ident())
      23 t_obj = Mythread('china')
      24 t_obj.start()
      25 '''
      26 結果:
      27 父進程python解釋器: 1168
      28 進程及執行py文件: 2724
      29 主線程: 7468
      30 china
      31 in thread 子線程Id: 8180 進程Id: 2724
      32 '''
      代碼演練-方式二:面向對象自定義類
  • 多線程與多進程效率對比:線程開閉切遠遠高於進程
    •  1 '''效率:多線程開閉切開銷遠遠小於進程隔了大幾百倍'''
       2 
       3 def func(a):
       4     a += 1
       5 
       6 
       7 if __name__ == '__main__':
       8     start = time.time()
       9     t_lis = []
      10     for i in range(50):
      11         t = Thread(target=func, args=(i,))
      12         t.start()
      13         t_lis.append(t)
      14     for t in t_lis:t.join()
      15     print('主線程')
      16     print('時間:%s' % str(time.time() - start))
      17 
      18     start = time.time()
      19     t_lis = []
      20     for i in range(50):
      21         t = Process(target=func, args=(i,))
      22         t.start()
      23         t_lis.append(t)
      24     for t in t_lis: t.join()
      25     print('主進程')
      26     print('時間:%s' % str(time.time() - start))
      27 
      28 '''
      29 效率測試:
      30 主線程
      31 時間:0.008229732513427734
      32 主進程
      33 時間:5.307320833206177
      34 '''
      代碼演練
  • 多線程數據共享:可以改變進程全局變量,不加鎖,數據不安全
    •  1 '''線程間數據共享'''
       2 from threading import Thread
       3 
       4 n = 100  # 全局變量,存在主進程內存空間中
       5 
       6 
       7 def func():
       8     global n
       9     n -= 1
      10     print(n)
      11 
      12 
      13 if __name__ == '__main__':
      14     t_li = []
      15     for i in range(100):
      16         t = Thread(target=func)  # 始終在同一個內存空間中
      17         t.start()
      18         t_li.append(t)
      19     for t in t_li:
      20         t.join()
      21     print("線程:", n)     
      22 '''
      23 結果:
      24 n = 0
      25 每一個線程中的n從99,開始遞減
      26 '''
      27 
      28     p_li = []
      29     for i in range(100):
      30         p = Process(target=func,)  # 分別建立100個獨立的內存空間,且相互獨立
      31         p.start()
      32         p_li.append(p)
      33     for p in p_li:
      34         p.join()
      35     print('進程:',n)
      36 '''
      37 結果:
      38 n = 100
      39 每一個進程中的n爲99
      40 '''
      代碼演練
  • 守護線程:隨主線程運行結束而結束,同進程不同,進程是主進程代碼結束   【線程不能手動關閉沒有terminate方法,只能經過守護線程來關閉,或者用模塊】
    •  1 '''守護線程等待主線程結束而結束,
       2 # 主線程結束,必須等全部非守護線程結束,才能結束
       3 # 主線程結束,進程就結束了,進程必須保證全部非守護線程結束才行'''
       4 import os
       5 import time
       6 from threading import Thread
       7 
       8 def f1():
       9     print(True)
      10     time.sleep(0.5)
      11     print(os.getpid())
      12 
      13 def f2():
      14     print('in f2 start')
      15     time.sleep(3)
      16     print('in f2 end')
      17     print(os.getpid())
      18 
      19 t = Thread(target=f1)
      20 t.setDaemon(True)
      21 t.start()
      22 
      23 t2 = Thread(target=f2)
      24 t2.start()
      25 print('主線程',os.getpid())
      26 
      27 '''
      28 True
      29 主線程 1440
      30 in f2 start
      31 1440
      32 in f2 end
      33 1440
      34 '''
      代碼演練
  • Thread類方法彙總 
    • t.is_alive()或者t.isAlive()       # 查看主線程是否存活
    • t.getName()                          #  放回線程名如: Thread-1   
    • t.setName()                          #  設置線程名
    • t.start()                                 # 建立線程,調用run方法,運行
    • t.join()                                  # 主線程等待t線程執行結束後才繼續,阻塞
    • threading.currentTread()     #  返回當前的線程變量,即對象:  <_MainThread(MainThread, started 4644)>
    • threading.enumerate()        # 返回正在運行的線程對象列表,列表中線程對象確定包括主線程+開啓的子線程
    • threading.activeCount()      # 返回正在運行的線程數,即    len(threading.enumerate() )
  • 鎖和GIL: 互斥鎖(也叫同步鎖)、遞歸鎖、全局解釋器鎖
    • 在多個進程或線程同時訪問一個數據的時候就會產生數據不安全的現象
    • 只要多進程/多線程用到全局變量,就必須加鎖,保證數據安全  
    • GIL鎖:是同一個進程裏的每個線程同一時間只能有一個線程訪問cpu,鎖的是線程,而不是具體的內存,但並不妨礙多個進程同時訪問數據,這將致使訪問同一數據不安全現象
    • 遞歸鎖:python支持同一進程/線程屢次請求同一資源,從而使資源能夠被屢次require,直到一個線程全部的require都被release,其餘線程才能得到資源。
    • 互斥鎖:同一時刻,只容許一個進程/線程訪問資源,訪問前加鎖require阻塞,訪問處理完畢後release,其餘進程/線程才能訪問,實現同步,保證數據安全。
    • 爲何要有GIL鎖?
    • 在開發python解釋器的時候能夠減小不少細粒度的鎖
    • 互斥鎖與遞歸鎖那個好?
      • 一、遞歸鎖,能快速解決死鎖問題,快速恢復服務
      • 二、但死鎖問題的出現,是程序的設計或邏輯的問題,還應進一步的排除和重構邏輯來保證使用互斥鎖也不會發生死鎖問題
    • 什麼是死鎖?
      • 白話:多把鎖同時應用在多個線程中
      • 兩個共享的數據資源,兩個進程或線程各拿到了一個資源,但未同時拿到兩個資源,都不釋放,這時就會產生死鎖現象,還有就是前面require了,後面忘記release了。
    • 如何解決死鎖?
      • 若是在服務階段 -> 遞歸鎖快速恢復服務 ->排查邏輯 -> 互斥鎖
      • 若是在測試階段 -> ->排查邏輯 -> 互斥鎖
    • 互斥鎖和遞歸鎖的區別?
      • 互斥鎖:就是在一個線程中不能連續屢次acquire
      • 遞歸鎖:能夠在同一線程中acquire任意次,但同時注意相應的release次數要與acquire次數相同  
    • from multiprocessing/threading import RLock  導入遞歸鎖
    • from multiprocessing/threading import Lock     導入互斥鎖
    • 場景:
      多我的圍着桌子吃一盤面,必須拿到叉子和麪才能吃
      若是一我的拿到了叉子,另外一我的拿到了面,就不能吃,就會致使僵直在一塊兒
      
      
      import time
      from threading import Thread,Lock
      lock = Lock()
      noodle_lock = Lock()
      fork_lock = Lock()
      def eat1(name):
          noodle_lock.acquire()
          print('%s拿到了面' % name)
          fork_lock.acquire()
          print('%s拿到了叉子' % name)
          print('%s在吃麪'%name)
          time.sleep(0.5)
          fork_lock.release()  # 0.01
          noodle_lock.release() # 0.01
      
      def eat2(name):
          fork_lock.acquire()  # 0.01
          print('%s拿到了叉子' % name) # 0.01
          noodle_lock.acquire()
          print('%s拿到了面' % name)
          print('%s在吃麪'%name)
          time.sleep(0.5)
          noodle_lock.release()
          fork_lock.release()
      
      eat_lst = ['alex','wusir','太白','yuan']
      for name in eat_lst:  # 8個子線程 7個線程 3個線程eat1,4個線程eat2
          Thread(target=eat1,args=(name,)).start()
          Thread(target=eat2,args=(name,)).start()
      
      結果:
      china拿到了面
      china拿到了叉子
      china在吃麪
      china拿到了叉子
      usa拿到了面    --------程序開始卡死
      科學家吃麪問題-互斥鎖   【終極緣由是兩把鎖,可能會致使不一樣的人各拿了一把鎖,從而致使程序卡主,發生死鎖現象】
    •  1 # 遞歸鎖解決死鎖問題
       2 import time
       3 from threading import Thread, RLock
       4 
       5 lock = RLock()
       6 
       7 
       8 def eat1(name):
       9     lock.acquire()
      10     print('%s拿到了面' % name)
      11     lock.acquire()
      12     print('%s拿到了叉子' % name)
      13     print('%s在吃麪' % name)
      14     time.sleep(0.5)
      15     lock.release()  # 0.01
      16     lock.release()  # 0.01
      17 
      18 
      19 def eat2(name):
      20     lock.acquire()  # 0.01
      21     print('%s拿到了叉子' % name)  # 0.01
      22     lock.acquire()
      23     print('%s拿到了面' % name)
      24     print('%s在吃麪' % name)
      25     time.sleep(0.5)
      26     lock.release()
      27     lock.release()
      28 
      29 
      30 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen']
      31 for name in eat_lst:  # 8個子線程 7個線程 3個線程eat1,4個線程eat2
      32     Thread(target=eat1, args=(name,)).start()
      33     Thread(target=eat2, args=(name,)).start()
      遞歸鎖解決-同一進程容許屢次require
    •  1 import time
       2 from threading import Thread,Lock
       3 lock = Lock()
       4 def eat1(name):
       5     lock.acquire()
       6     print('%s拿到了面' % name)
       7     print('%s拿到了叉子' % name)
       8     print('%s在吃麪'%name)
       9     time.sleep(0.5)
      10     lock.release() # 0.01
      11 
      12 def eat2(name):
      13     lock.acquire()  # 0.01
      14     print('%s拿到了叉子' % name) # 0.01
      15     print('%s拿到了面' % name)
      16     print('%s在吃麪'%name)
      17     time.sleep(0.5)
      18     lock.release()
      19 
      20 eat_lst = ['china', 'beijing', 'shanghai', 'shenzhen']
      21 for name in eat_lst:  # 8個子線程 7個線程 3個線程eat1,4個線程eat2
      22     Thread(target=eat1,args=(name,)).start()
      23     Thread(target=eat2,args=(name,)).start()
      24 
      25 
      26 》》》:
      27 china拿到了面
      28 china拿到了叉子
      29 china在吃麪
      30 china拿到了叉子
      31 china拿到了面
      32 china在吃麪
      33 beijing拿到了面
      34 beijing拿到了叉子
      35 beijing在吃麪
      36 beijing拿到了叉子
      37 beijing拿到了面
      38 beijing在吃麪
      39 shanghai拿到了面
      40 shanghai拿到了叉子
      41 shanghai在吃麪
      42 shanghai拿到了叉子
      43 shanghai拿到了面
      44 shanghai在吃麪
      45 shenzhen拿到了面
      46 shenzhen拿到了叉子
      47 shenzhen在吃麪
      48 shenzhen拿到了叉子
      49 shenzhen拿到了面
      50 shenzhen在吃麪
      互斥鎖解決-更改設計邏輯,同時拿到叉子和麪
  • 信號量:同進程用法同樣,池的效率高於信號量,不管進程,線程    【鎖 + 計數器】
    •  1 # 進程中建立信號量,與開啓進程池效率對比
       2 # 池效率高於信號量
       3 def ktv1(sem, i):
       4     sem.acquire()
       5     i += 1
       6     sem.release()
       7 
       8 
       9 def ktv2(i):
      10     i += 1
      11 
      12 
      13 # process
      14 if __name__ == '__main__':
      15     sem = Semaphore(5)  # 同時只執行5個任務
      16     start_time = time.time()
      17     p_list = []
      18     for i in range(20):  # 開啓20個任務
      19         p = Process(target=ktv1, args=(sem, i))  # 開啓進程20個
      20         p.start()
      21         p_list.append(p)
      22     for p in p_list: p.join()
      23     print('process_semaphore:', time.time() - start_time)
      24 
      25     pool = Pool(5)  # 開啓進程池,同一時間執行5個任務
      26     start_time1 = time.time()
      27     pool_list = []
      28     for i in range(20):  # 開啓20個任務
      29         ret = pool.apply_async(func=ktv2, args=(i,))  # 異步
      30         pool_list.append(ret)
      31     pool.close()  # 關閉進程池,再也不受理任務
      32     pool.join()  # 等待全部進程池中的任務結束
      33     print('process_pool:', time.time() - start_time1)
      34 '''
      35 process_semaphore: 2.2986388206481934
      36 process_pool: 0.5303816795349121
      37 '''
      38 
      39 ==========================================
      40 # thread 類中沒有線程池
      41 # 但concurrent.futures中有
      42 from threading import Thread,Semaphore,currentThread
      43 from concurrent.futures import ThreadPoolExecutor
      44 def f(sem,i):
      45     sem.acquire()
      46     i += 1
      47     # print("sem",currentThread().getName())  # 獲取線程名
      48     sem.release()
      49 
      50 def f2(i):
      51     i += 1
      52     # print("pool",currentThread().getName())
      53 
      54 start = time.time()
      55 t_sem = Semaphore(5)   # 線程信號量, 同時5個任務
      56 t_list = []
      57 for i in range(20):    # 20個任務,開啓20個線程
      58     t = Thread(target=f, args=(t_sem, i))
      59     t.start()
      60     t_list.append(t)
      61 for t in t_list:t.join()
      62 print("in thread sem:" , time.time() - start)
      63 
      64 start = time.time()
      65 t_pool = ThreadPoolExecutor(5)
      66 
      67 for i in range(20):
      68     ret = t_pool.submit(f2,i)
      69 t_pool.shutdown()
      70 end = time.time()
      71 print("in thread pool:", end- start)
      72 '''
      73 in thread: 0.00498652458190918
      74 in thread pool: 0.001001596450805664
      75 '''
      代碼演練    
  • 事件:event,同進程用法同樣
    •  1 '''事件:event,一個任務依賴另外一個任務的狀態才進行下一步
       2 wait  等待事件內部的信號變成True就不阻塞了
       3 set   將標誌改成True
       4 clear 改爲False
       5 is_set  查看標誌是否爲True
       6 '''
       7 # 數據庫鏈接
       8 import time
       9 import random
      10 from threading import Event,Thread
      11 
      12 
      13 def check(e):
      14     '''檢測是否可以連通數據庫,網絡'''
      15     print('正在檢測兩臺機器之間的網絡狀況……')
      16     time.sleep(random.randint(2,5))
      17     e.set()   # 改爲True,非阻塞
      18 
      19 
      20 def connet_db(e):
      21     print("status:", e.is_set())
      22     e.wait()
      23     print("status:", e.is_set())
      24     print('鏈接數據庫……')
      25     print('鏈接數據庫成功~~~')
      26 
      27 # e = Event()
      28 # Thread(target=connet_db, args=(e,)).start()
      29 # Thread(target=check, args=(e,)).start()
      30 '''
      31 status: False
      32 正在檢測兩臺機器之間的網絡狀況……
      33 status: True
      34 鏈接數據庫……
      35 鏈接數據庫成功~~~
      36 '''
      37 
      38 
      39 def check(e):
      40     '''檢測是否可以連通數據庫,網絡'''
      41     print('正在檢測兩臺機器之間的網絡狀況……')
      42     time.sleep(random.randint(2,5))
      43     e.set()   # 改爲True,非阻塞
      44 
      45 def connet_db(e):
      46     '''3次鏈接不上就退出'''
      47     n = 0
      48     while n < 3:
      49         if e.is_set():
      50             break  # 退出循環,執行鏈接庫
      51         else:
      52             e.wait(timeout=0.5)
      53             n += 1
      54     if n == 3:
      55         raise TimeoutError
      56     print('鏈接數據庫……')
      57     print('鏈接數據庫成功~~~')
      58 
      59 
      60 e = Event()
      61 Thread(target=connet_db, args=(e,)).start()
      62 Thread(target=check, args=(e,)).start()
      63 
      64 '''
      65 正在檢測兩臺機器之間的網絡狀況……
      66 Exception in thread Thread-1:
      67 Traceback (most recent call last):
      68   File "D:\install\Python36\lib\threading.py", line 916, in _bootstrap_inner
      69     self.run()
      70   File "D:\install\Python36\lib\threading.py", line 864, in run
      71     self._target(*self._args, **self._kwargs)
      72   File "D:/install/project/七、併發編程/十一、threading_信號量.py", line 140, in connet_db
      73     raise TimeoutError
      74 TimeoutError
      75 '''
      代碼演練-經過事件來實現檢測網絡的狀態,從而決定是否鏈接數據庫
  • 線程池:threading 中沒有數據池的類實現,數據池是另外一個包concurrent.futures中ThreadPoolExecutor實現,這個包一樣能夠啓用進程池
  • 隊列:數據安全,先進先出,自帶鎖
  •  1 # from multiprocessing import Queue,JoinableQueue  # 進程IPC隊列
     2 from queue import Queue  # 線程隊列  先進先出的
     3 from queue import LifoQueue  # 後進先出的
     4 #方法: put get put_nowait get_nowait full empty qsize
     5 # 隊列Queue
     6     # 先進先出
     7     # 自帶鎖 數據安全
     8 # 棧 LifoQueue
     9     # 後進先出
    10     # 自帶鎖 數據安全
    11 # lq = LifoQueue(5)
    幾種鎖
  • 條件:使得線程等待,只有知足某條件時,才釋放n個線程
    • 1 Python提供的Condition對象提供了對複雜線程同步問題的支持。
      2 Condition被稱爲條件變量,除了提供與Lock相似的acquire和release方法外,還提供了wait和notify方法。
      3 線程首先acquire一個條件變量,而後判斷一些條件。
      4 若是條件不知足則wait;
      5 若是條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
    •  1 from threading import Condition
       2 # acquire
       3 # release
       4 # wait    阻塞
       5 # notify  讓wait解除阻塞的工具
       6 # wait仍是notify在執行這兩個方法的先後 必須執行acquire和release
       7 from threading import Condition,Thread
       8 def func(con,i):
       9     con.acquire()
      10     # 判斷某條件
      11     con.wait()
      12     print('threading : ',i)
      13     con.release()
      14 
      15 con = Condition()
      16 for i in range(20):
      17     Thread(target=func,args=(con,i)).start()
      18 con.acquire()
      19 # 幫助wait的子線程處理某個數據直到知足條件
      20 con.notify_all()
      21 con.release()
      22 while True:
      23     num = int(input('num >>>'))
      24     con.acquire()
      25     con.notify(num)
      26     con.release()
      代碼演練
  • 定時器:阻塞  定時器,指定n秒後執行某個操做
    •  1 from threading import Timer
       2 
       3 
       4 def func():
       5     print('執行我啦')
       6 
       7 
       8 # interval 時間間隔
       9 Timer(0.2, func).start()  # 定時器
      10 # 建立線程的時候,就規定它多久以後去執行
      代碼演練 
  • 內置包:concurrent.futures  高度封裝,管理數據池【可建進程/線程池】!!!

  •  1 #1 介紹
     2 concurrent.futures模塊提供了高度封裝的異步調用接口  3 ThreadPoolExecutor:線程池,提供異步調用  4 ProcessPoolExecutor: 進程池,提供異步調用  5 Both implement the same interface, which is defined by the abstract Executor class.  6 
     7 #2 基本方法
     8 #submit(fn, *args, **kwargs)
     9 異步提交任務 10 
    11 #map(func, *iterables, timeout=None, chunksize=1) 
    12 取代for循環submit的操做 13 
    14 #shutdown(wait=True) 
    15 至關於進程池的pool.close()+pool.join()操做 16 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 17 wait=False,當即返回,並不會等待池內的任務執行完畢 18 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 19 submit和map必須在shutdown以前 20 
    21 #result(timeout=None)
    22 取得結果 23 
    24 #add_done_callback(fn)
    25 回調函數

     

  • 總結:
  • 一、併發:每一個線程共用進程ID(因此共享進程資源),每一個子線程擁有本身的pid,經過get_ident()【需導入類get_ident】,由於GIL的存在,多線程沒法並行
  • 二、數據共享:多線程之間數據共享,可改變主進程全局變量。
  • 三、效率:多線程開閉,切時間開銷遠遠小於多進程
  • 四、守護線程:主線程結束,守護線程結束,(主線程結束的條件是全部非守護線程結束)故,守護線程結束,意味着全部線程都結束了
  • 五、守護進程:主進程代碼結束,守護進程結束,此時非守護進程並不必定都結束了,但主進程會等全部非守護進程結束纔會結束,從而回收資源。
  • 六、爲何還要用GIL:GIL爲cpython解釋器獨有,歷史遺留,但開發python解釋器時可減小不少細粒度的鎖
  • 七、開啓多線程會出現主線程ID(執行整個文件代碼 get_ident()),進程ID,子線程ID
  • 八、線程開啓不用放在 if__name__== ‘__main__’ 下,由於共享同一個進程資源,進程開啓需求,由於每一個進程會另開新的內存空間,同時將主進程變量都導入執行。
  • 九、解決併發三個工具就能解決大部分問題:join()   同步控制,用戶獲取結果,鎖  保證數據安全,池   提升效率,解決併發
  • 十、進程池中的回調函數在主進程運行,線程池中的回調函數在子線程運行。
  • 進程/線程中各類對比*****

    • 進程/線程中的信號量和線程/進程池的區別?
      1. 信號量/進程池都是同一時刻容許固定數量的進程/線程訪問修改數據,實現併發效果。
      2. 信號量是有多少個任務,就開啓多少個進程/線程,進程/線程池是不管任務多少,都只開啓指定數量的進程。
      3. 信號量減小了操做系統進程/線程切換的負擔,但增長了進程/線程開啓,關閉的時間開銷,進程/線程池是進程/線程開啓後並不因任務結束而關閉,而是等待新任務,這樣就減小了進/線程開閉的時間開銷,也減少了操做系統的調度壓力。
      4. 從效率上看,進程池的效率要遠遠高於信號量開啓多進程的效率。
      5. 注:信號量100我的開100個任務,同時只能幹5個任務,進程池100個任務,5我的幹,同時也只能幹5個任務
    • 何時用多進程,何時用多線程?
      1. 開多進程/線程都是爲了實現高併發
      2. 當任務是高併發計算型的任務,爲了充分利用cpu,使用多進程較好
      3. 當任務爲網絡/文件/數據庫等IO操做時,使用多線程較好
      4. 當任務爲既有高併發計算型又有少許IO操做的,能夠啓用多進程和多線程一塊兒較好
    • 進程與線程的區別?
      1. 線程不能獨立存在,必須在一個進程裏
      2. 線程的開閉及切換的時間開銷遠遠小於進程
      3. 同一個進程間的多個線程數據共享,多進程間異步,數據隔離不共享
      4. cpython解釋器中全局解釋器鎖CIL的存在,致使同一進程多個線程同一時刻只能一個線程訪問cpu,從而不能充分利用多核處理器,而多進程就沒有這個限制

4、python中的併發編程-協程

    • 什麼是協程?
    • 是一種用戶態的輕量級線程,實現單線程下的併發,由用戶程序而非操做系統調度控制,在一個線程內的多個任務之間互相切換。
    • python中最簡單的切換以及狀態的保存,使用原生的yield就是協程最基本的體現。【yeild 只有程序之間的切換,沒有重利用任何IO操做的時間
    • 通常狀況下都是經過gevent這個模塊來完成協程的工做,它內部使用的協程機制是greenlnet,它可以幫助咱們規避IO操做,最大限度的利用cpu。
    • 協程比起線程的區別和優點?
    • 區別:
      • 一、線程的切換是由操做系統控制調度的,協程的切換是由用戶級別自行控制調度的。
      • 二、線程開閉,切換有時間開銷,協程切換,開閉的開銷更小,可經過gevent實現自動切換從而無時間開銷(由於協程是在同一線程下進行的,任務之間的切換是用戶級的)
      • 三、線程是在進程下開啓的,協程是在線程下開啓的
    • 優點:
      • 一、單線程內就就能夠實現併發的效果,最大限度的理由CPU【優勢】,協程程序級別的切換開銷更小,操做系統徹底感知不到,於是更加輕量級【優勢】。
      • 二、協程能在一條線程的基礎上,遇到IO在多個任務之間互相切換,節省了線程開啓的開銷,充分的利用了一條線程來提升cpu的工做效率。
      • 三、協程不存在數據不安全的問題(由於協程是遇IO切,同一刻,只有一個任務運行,同步效果)
      • 四、程序不會由於協程中某一個任務進入阻塞狀態而使整條線程阻塞。
    • 缺點:
      • 一、本質是單線程下,沒法利用多核
      • 二、協程一旦出現阻塞,將會阻塞整條線程
    • 特色:
      • 一、必須在只有一個單線程裏實現併發
      • 二、修改共享數據不須要枷鎖
      • 三、用戶程序裏本身保存多個控制流的上下文棧
      • 四、遇到IO就切換,原生協程自能手動切
      • 五、gevent模塊select機制能夠幫助實現監測IO,使一個協程遇到IO操做自動切換到其餘協程
      • 六、yield,greenlet都沒法實現監測IO,自動切換,遇到阻塞仍是在阻塞。 
相關文章
相關標籤/搜索