併發編程

併發編程:

  • 程序:java

    • 程序就是一堆文件
  • 進程:python

    • 進程是分配資源的基本單位,爲線程提供資源,一個程序能夠開啓多個進程linux

  • 進程被誰運行:編程

    • CPU最終運行你的程序json

      • 操做系統調度做用,將你磁盤上的程序加載到內存,而後交給CPU處理,一個CPU在運行的一個程序,就是開啓了一個進程windows

  • 操做系統:

    • 操做系統定義:
      • 操做系統是存在於硬件與軟件之間,管理,協調,控制軟件與硬件的交互設計模式

    • 操做系統的做用:
      • 若是沒有操做系統,去寫一個程序,你要完成兩層功能:瀏覽器

        • 第一層:你要學會底層硬件:CPU,內存,磁盤是如何工做使用的
        • 第二層:去調度這些底層的硬件
      • 操做系統兩個做用:安全

        • 1,將一些複雜的硬件操做封裝成簡單的接口,便於使用

        • 2,操做系統能夠合理的調度分配多個進程與CPU的關係,讓其有序化

    • 操做系統(計算機)的發展史:
      • 第一代電子計算機:操做插線與你的程序結合

      • 第二代計算機:磁帶存儲,批處理系

      • 第三代計算機:集成電路,多道程序系統

知識點解析:

  • 多道技術解決的問題:

    • 多道技術是在不一樣任務間切換執行,因爲計算機切換速度很是快,用戶是無感狀態

    • 時間複用:
      • 利用閒置時間,進行復用,一個進程佔用cpu時間太長也會切換
    • 空間複用:
      • 一個內存能夠加載多個進程,提升內存的利用率
    • 數據隔離:
      • 解決軟件之間的隔離,互不影響

進程:

  • 程序就是一堆代碼

  • 進程是分配資源的基本單位,爲線程提供資源,一個程序能夠開啓多個進程

  • 概念:
    • 串行:全部的進程有CPU一個一個解決
    • 並行:多個CPU,真正的同時運行多個進程
    • 併發:單個CPU,同時執行多個進程(來回切換),看起來像是同時運行,空間複用
    • 阻塞:遇到IO(recv,input)纔會阻塞
    • 非阻塞:沒有IO
  • tail -f access.log |grep '404'
    執行程序tail,開啓一個子進程,執行程序grep,開啓另一個子進程,兩個進程之間基於管道'|'通信,將tail的結果做爲grep的輸入。
    進程grep在等待輸入(即I/O)時的狀態稱爲阻塞,此時grep命令都沒法運行

  • 進程的建立:
    • 什麼是開啓多個進程:socket:server,client兩個進程

    • python中,若是一次想開啓多個進程,必須是一個主進程,開啓多個子進程

    • linux,windows:有主進程開啓子進程

    • 相同點:主進程開啓子進程,兩個進程都有相互隔離的獨立空間,互不影響

    • 不一樣點:

      • linux:子進程空間的初始數據徹底是從主(父)進程copy一份

      • windows:子進程空間初始數據徹底是從主(父)進程copy一份,可是有所不一樣

建立進程的兩種方法:

  • 函數-建立進程:

  • #這樣的實例雖然建立了子進程,可是在生產環境中子進程結束的時間不定
    from multiprocessing import Process
    import time
    #當前py文件就是主進程,先運行主進程
    def task(name):
        print(f"{name}is running")
        time.sleep(3)  #阻塞
        print(f"{name}is done")
    
    if __name__ == '__main__':                  #windows開啓必須寫在mian下面
         p = Process(target=task,args=("海洋",)) #target要封裝的內容,對象args必定是個元祖形式
         p.start()      #子進程 通知操做系統在內存中開闢一個空間,將p這個進程放進去,讓cpu執行
         print("___主進程")

  • 類-建立進程(瞭解):

  • from multiprocessing import Process
    import time
    class MyProcess(Process):
    
        def __init__(self,name):
            super().__init__()      #放在最上面,必需要繼承父類init
            self.name = name
    
        def run(self):
            print(f"{self.name}is running")
            time.sleep(3)  #阻塞
            print(f"{self.name}is done")
    
    if __name__ == '__main__':
        p = MyProcess("海洋")
        p.start()
        print("====主進程")

進程PID:

  • tasklist | findstr pycharm win查看某個進程

  • import os print(os.getpid()) 查看當前的pid

  • import os print(os.getppid()) 查看父進程

進程之間數據隔離:

  • import time
    from  multiprocessing import  Process
    X = 1000
    
    def task():
        global x
        x = 2
    
    if __name__ == '__main__':
        p1 = Process(target = task,)
        p1.start()
        time.sleep(1)
        print(f"主進程{X}")
        print(f"主進程{X}")
    
    import time
    from  multiprocessing import  Process
    X = 256                                #知足小數據池
    
    def task():
       print(f"子進程{id(X)}")
    
    if __name__ == '__main__':
        print(f"主進程{id(X)}")
        p1 = Process(target = task,)
        p1.start()
        time.sleep(1)
        print()

join方法:

  • join 主進程等待子進程結束以後,在執行

  • join開啓一個進程:

    • from multiprocessing import Process
      import time
      
      def task(name):
          time.sleep(1)
          print(f"{name}is running")
      
      if __name__ == '__main__':
           p = Process(target=task,args=("海洋",))
           p.start()
           p.join()           #告知主進程,p進程結束以後,主進程在結束,join有些阻塞的意思
           print("___主進程")
      
      #      p1.start()
      #      p2.start()       #p1,p2,p3三個子進程前後運行順序不定,start只是通知一下操做系統
      #      p3.start()       #操做系統調用cpu先運行誰,誰先執行
  • join串行:

    • from multiprocessing import Process
      import time
      
      def task(name,sec):
          time.sleep(sec)
          print(f"{name}is running")
      
      if __name__ == '__main__':
           p1 = Process(target=task, args=("海洋",1))
           p2 = Process(target=task, args=("俊麗",2))
           p3 = Process(target=task ,args=("寶寶",3))
           start_time = time.time()
      
           p1.start()
           p1.join()
           p2.start()
           p2.join()
           p3.start()
           p3.join()
      
           print(f"主進程{time.time() - start_time}")
  • join併發:

    • from multiprocessing import Process
      import time
      
      def task(sec):
          time.sleep(sec)
          print(f"is running")
      
      if __name__ == '__main__':
           start_time = time.time()
           list = []
      
           for i in range(1,4):
                p = Process(target=task, args=(i,))
                p.start()
                list.append(p)
      
           for i in list:
                i.join()
      
           print(f"主進程{time.time() - start_time}")

進程對象的其餘屬性:

  • 屬性:

    • from multiprocessing import Process
      import time
      
      def task(name):
          print(f"{name}is running")
          time.sleep(3)
          print(f"{name}is done")
      
      if __name__ == '__main__':
           p = Process(target=task,args=("海洋",),name="俊麗")  #name給進程對象設置name屬性
           p.start()
           # print(p.pid)         #獲取到進程號
      
           time.sleep(1)          #睡一秒,子進程已經執行完成
           p.terminate()          #強制結束子進程,強制執行也會有執行時間
                                  #terminate跟start同樣工做原理,都要通知操做系統開啓子進程
                                  #內存終止或者開啓都要須要時間的
      
           time.sleep(1)          #睡一秒,讓terminate殺死
           print(p.is_alive())    #判斷子進程是否存活,只是查看內存中p子進程是否還運行
           print("主進程")
  • 殭屍進程:

    • init是全部進程的父進程:
      
      殭屍進程,殭屍是什麼,死而沒有消失
      
      主進程建立多個短暫週期的子進程,當子進程退出,是須要等待父進程處理,而父進程沒有及時對子進程回收,那麼子進程的進程符仍然保存在系統中,這種進程就是僵死進程
      
      什麼進程描述符:每個進程都有描述符,io請求,數據指針
      
      from multiprocessing import Process
      import time
      import os
      
      def task(name):
          print(f"{name}is running")
          print(f"子進程開始了:{os.getpid()}")
          time.sleep(50)
      
      
      if __name__ == '__main__':
          for i in range(100):
              p = Process(target=task, args=("海洋",))
              p.start()
              print(f"___主進程:{os.getpid()}")

  • 孤兒進程:

    • 孤兒進程:孤兒進程是由於主進程的退出,他下面的全部子進程都變成孤兒進程了,init會對孤兒進行回收,釋        放掉佔用系統的資源,這種回收也是爲了節省內存。
      
      孤兒進程無害,若是殭屍進程掛了,init會對孤兒進程回收,init是全部進程的祖進程,linux中爲1,0系統
  • 守護進程:

    • 將一個子進程設置成守護進程,當父進程結束,子進程必定會結束,避免孤兒進程產生,應爲回收機制

    • 父進程不能建立子進程

    • #守護進程會在主進程代碼執行結束後終止,守護進程內沒法在開啓子進程
      
      from multiprocessing import Process
      import time
      import os
      
      def task(name):
          print(f"{name}is running")
          print(f"子進程開始了:{os.getpid()}")
          time.sleep(50)
      
      if __name__ == '__main__':
           p = Process(target=task,args=("海洋",))
           p.daemon = True  #將p子進程設置成守護進程,守護子進程,只要主進程結束
                            #子進程不管執行與否都立刻結束,daemon,開啓在start上面
           p.start()
           print(f"___主進程:{os.getpid()}")

進程之間的通訊方式:

  • 第一種:基於文件+鎖的形式:效率低,麻煩

  • 第二種:基於隊列,推薦的使用形式

  • 第三種:基於管道,管道本身加鎖,底層可能會出現數據丟失損壞,隊列和管道都是將數據存放於內存中

互斥鎖:

  • 互斥鎖保證了每次只有一個線程進行寫入操做,只有當這個線程解鎖,在運行其餘資源,上鎖和解鎖都須要本身添加

  • 三臺電腦同時調用打印機去打印,開啓三個進程使用互斥鎖,實現公平搶佔資源

    • #上鎖:
      #必定要是同一把鎖:只能按照這個規律,上鎖一次,解鎖一次
      
      #互斥鎖與join區別:
      #共同點:都是完成了進程之間的串行
      #區別:join認爲控制進程的串行,互斥鎖是解決搶佔的資源,保證公平性
      
      from multiprocessing import Process
      from multiprocessing import Lock
      import time
      import os
      import random
      
      def task1(lock):
          print("test1")                     #驗證CPU遇到IO切換
          lock.acquire()
          print("task1 開始打印")
          time.sleep(random.randint(1,3))
          print("task1 打印完成")
          lock.release()
      
      def task2(lock):
          print("test2")
          lock.acquire()                      #上鎖
          print("task2 開始打印")
          time.sleep(random.randint(1,3))#阻塞,cpu切換任務,別的任務都在鎖,回來繼續執行這個程序
          print("task2 打印完成")
          lock.release()                      #解鎖
      
      def task3(lock):
          print("test2")
          lock.acquire()
          # lock.acquire()                    #死鎖錯誤示例
          print("task3 開始打印")
          time.sleep(random.randint(1,3))
          print("task3 打印完成")
          lock.release()
      
      if __name__ == '__main__':
           lock = Lock()                              #一把鎖
      
           p1 = Process(target=task1,args=(lock,))    #三個進程哪一個先到先執行
           p2 = Process(target=task2,args=(lock,))
           p3 = Process(target=task3,args=(lock,))
      
           p1.start()
           p2.start()
           p3.start()
  • 互斥鎖買票示例:

    • #買票系統:
      #買票以前先要查票,在你查票的同時,100我的也在查看此票
      #買票時,你要從服務端獲取到票數,票數>0 ,買票,而後服務端票數減一,中間有網絡延遲
      
      #多進程原則上是不能互相通訊的,他們在內存級別是有數據隔離,不表明磁盤上的數據隔離,他們能夠共同操做一個文件
      #多個進程搶佔同一個資源,要想公平按照順序,只能串行
      
      from multiprocessing import Process
      from multiprocessing import Lock
      import random
      import json
      import time
      import os
      
      def search():
          time.sleep(random.random())  #一秒以內
          with open("db.json", encoding="utf-8") as f1:
              dic = json.load(f1)
          print(f"剩餘票數{dic['count']}")
      
      def get():
          with open("db.json",encoding="utf-8") as f1:
              dic = json.load(f1)
          time.sleep(random.randint(1,3))  #時間延遲
      
          if dic['count'] > 0:
              dic['count'] -= 1
              with open("db.json",encoding="utf-8",mode="w") as f1:
                  json.dump(dic,f1)
              print(f'{os.getpid()}用戶購買成功')
          else:
              print("沒票了")
      
      def task(lock):
          search()
      
          lock.acquire()
          get()
          lock.release()
      
      if __name__ == '__main__':
          lock = Lock()
          for i in range(5):
              p = Process(target=task,args=(lock,))
              p.start()
      缺點:
          1.操做文件效率低
          2.本身加鎖很麻煩,很容易出現死鎖,遞歸鎖

隊列:

  • 進程之間的通訊最好的方式是基於隊列

  • 隊列是實現進程之間通訊的工具,存在內存中的一個容器,最大的特色是符合先進先出的原則

  • 隊列模式:
    • 多個進程搶佔一個資源:串行,有序以及數據安全,買票

    • 多個進程實現併發的效果:生產者消費模型

隊列參數:

  • from multiprocessing import Queue
    q = Queue(3)                #能夠設置元素個數,當數據已經達到上限,在插入夯住
    
    def func():
        print("in func")
    
    q.put("海洋")               #插入數據
    q.put({"count":1})
    q.put(func)
    q.put("333",block=False)    #默認爲True 當你插入的數據超過最大限度,默認阻塞
    # q.put(333,timeout=8)      #超過八秒在put不進數據,就會報錯
    
    print(q.get())
    print(q.get())
    ret = q.get()
    ret()
    # q.get()  #當你將數據取完,夯住,等待隊列put值,起另外一個進程往隊列中插入數據
    
    #q.put()
    #1,maxsize()    #數據量不易過大,精簡的重要數據
    #2,put bolck    #默認爲True阻塞 當你插入的數據超過最大限度,能夠設置報錯
    #3,put timeout  #延時報錯,超過三秒在put不進數據,就會報錯
    
    #get
    #2,get bolck    #取值爲空報錯
    #3,get timeout  #取值超過三秒報錯

搶售模型 (並行示例):

  • #小米:搶手機,預期發售10個,100人去搶
    from multiprocessing import Queue
    from multiprocessing import Process
    import os
    
    def task(q):
        try:
            q.put(f'{os.getpid()}')
        except Exception:
            return
    
    if __name__ == '__main__':
        q = Queue(10)             #建立隊列,能夠存放十我的
    
        for i in range(100):
            p = Process(target=task,args=(q,))
            p.start()
    
        for i in range(1,11):  #數量超過隊列會取
            print(f'排名第{i}的用戶:{q.get()}') #獲取隊列中的信息,先進來的先取出來
    
    #利用隊列進行進程之間的通訊:簡單,方便,不用本身手動加鎖,隊列自帶阻塞,可持續化取數據

生產者消費者模型(併發示例):

  • 利用隊列進行通訊,生產者生產數據,消費者獲取數據使用,平衡了生產力和消費力,生產者和消費者是一種解耦合性(經過容器解決),可持續化取數據

  • 模型,設計模式,歸一化設計,理論等等,教給你一個編程的思路,之後遇到相似的狀況,之後直接調用就便可

  • 生產者:生產數據的進程

  • 消費者:生產出來的數據進行處理

  • #吃包子:廚師生產包子,不可能直接給你喂到嘴裏,放在一個盆裏,消費者從盆中取出包子食用
    #三個主體:生產者(廚師),容器隊列(盤 緩衝區),消費者(人)
    
    #若是沒有容器,生產者與消費者強解耦性,不合理,因此咱們要有一個容器,緩衝區平衡了生產力與消費力
    
    # 生產者消費者多應用於併發:
    from multiprocessing import Queue
    from multiprocessing import Process
    import time
    import random
    
    def producer(name,q):
        for i in range(1,6):
            time.sleep(random.randint(1,3))
            res = f'{i}號包子'
            q.put(res)
            print(f'生產者{name}:生產了{res}')
    
    def consumer(name,q):
        while 1:
            try:
                time.sleep(random.randint(1, 3))
                ret = q.get(timeout = 5)           #五秒還吃不到退出
                print(f'消費者{name}:吃了{ret}')
            except Exception:
                return
    
    if __name__ == '__main__':
        q = Queue()    #盆
    
        p1 = Process(target=producer,args=("俊麗",q,))  #生產
        p2 = Process(target=consumer,args=("海洋",q,))  #消費
    
        p1.start()
        p2.start()

線程:

  • 進程:進程是分配資源的基本單位,內存中開闢空間,爲線程提供資源,一個程序能夠開啓多個進程

  • 線程:CPU調度的最小單位,執行單位,線程也被稱做爲輕量級的進程,動態的

    • 主線程是進程空間存活在內存中的一個必要條件

  • 開啓QQ:開啓一個進程,在內存中開闢空間加載數據,啓動一個線程執行代碼

  • 線程依賴進程的一個進程能夠包含多個線程,可是必定有一個主線程,線程纔是CPU執行的最小單元

  • 進程線程對比:

    • 1,開啓多進程開銷很是大,10-100倍,而開啓線程開銷很是小

    • 2.開啓多進程速度慢,開啓多線程速度快

    • 3.進程之間數據不共享,線程共享數據

  • 多線程應用場景:

    • 併發:一個CPU能夠來回切換(線程之間切換),多進程併發,多線程的併發

    • 多進程併發:開啓多個進程,併發的執行

    • 多線程併發:開啓線程,併發的執行

    • 若是遇到併發:多線程居多

開啓線程的兩種方式:

  • 線程絕對要比進程開啓速度快

  • 函數開啓:
    • #先打印海洋,線程要比進程速度快,若是是進程先打印主線程
      from threading import Thread
      
      def task(name):
          print(f'{name} is running')
      
      if __name__ == '__main__':
          t = Thread(target=task,args=("海洋",))
          t.start()
          print("主線程")
      
      #子進程睡眠3秒,先運行主進程
      from threading import Thread
      import time
      x = 1000
      
      def task():
          time.sleep(3)
          print('子線程....')
      
      def main():
          print('111')
          print('222')
          print('333')
      
      if __name__ == '__main__':
          t = Thread(target=task)
          t.start()
          main()
  • 類開啓:
    • from threading import Thread
      
      class MyThread(Thread):
          def __init__(self,name):
              super().__init__()
              self.name = name
      
          def run(self):
              print(f'{self.name} is running')
      
      if __name__ == '__main__':
          t = MyThread("海洋")
          t.start()
          print("主線程")
  • 線程pid:
    • #主線程和子線程pid同樣
      from threading import Thread
      import os
      
      def task():
          print(f'子線程:{os.getpid()}')
      
      if __name__ == '__main__':
          t = Thread(target=task,)
          t.start()
          print(f"主線程:{os.getpid()}")
  • 線程之間數據共享:
    • from threading import Thread
      x = 1000
      def task():
          global x
          x = 0
      
      if __name__ == '__main__':
          t = Thread(target=task, )
          t.start()
          t.join()  # 告知主線程,等待子線程運行完畢在執行
          print(f'主線程:{x}')

線程的方法:

  • from threading import Thread
    import threading
    import time
    
    def task(name):
        time.sleep(1)
        print(f'{name} is running')
    
    if __name__ == '__main__':
        for i in range(5):
            t = Thread(target=task,args=("海洋",))
            t.start()              #線程對象的方法
        # print(t.is_alive())     #判斷線程是否存活
    
        #threading模塊的方法
        print(threading.current_thread().name)  #返回線程對象.name
        print(threading.enumerate())            #返回列表,返回的是全部線程對象
        print(threading.active_count())         #獲取活躍的線程數量(包括主線程)
        print("主線程")

守護線程:

  • 守護線程必須等待主線程結束才結束,主線程必須等待全部的非守護線程結束才能結束,由於主線程的結束意味着進程的結束,這就是一個守護機制

  • 多線程是同一個空間,同一個進程,進程表明,空間,資源,靜態的:

  • from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.setDaemon(True) #必須在t.start()以前設置
        t.start()
    
        print('主線程')
        print(t.is_alive())   #判斷進程是否存在也是主線程
    
    from threading import Thread
    import time
    
    def foo():
        print(123)
        time.sleep(3)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(1)
        print("end456")
    
    if __name__ == '__main__':
    
        t1=Thread(target=foo)
        t2=Thread(target=bar)
    
        t1.daemon = True
        t1.start()
        t2.start()              #t2非守護線程,主線程等待子線程結束
        print("main-------")

線程互斥鎖:

  • join:
  • from threading import Thread
    import time
    x = 100
    
    def task(name):
        global x
        temp = x
        time.sleep(3)
        temp -= 1
        x = temp
    
    if __name__ == '__main__':
        t = Thread(target=task,args=("海洋",))
        t.start()
        t.join()
        print(f"主線程{x}")
    
    #多個線程搶佔一個資源
    from threading import Thread
    import time
    x = 100
    
    def task(name):
        global x
        temp = x
        time.sleep(3)
        temp -= 1
        x = temp
    
    if __name__ == '__main__':
        tl = []
        for i in range(100):
            t = Thread(target=task,args=("海洋",))
            tl.append(t)
            t.start()
    
        for i in tl:
            i.join()
    
        print(f"主進程{x}")   #多個線程搶佔一個資源

互斥鎖:

  • 全部線程串行執行,多個 線程共同搶佔一個數據,保證了數據安全:

  • from threading import Thread
    from threading import Lock
    import time
    x = 100
    
    def task(lock):
        lock.acquire()
        global x
        temp = x
        time.sleep(0.1)
        temp -= 1
        x = temp
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        tl = []
        for i in range(100):
            t = Thread(target=task,args=(lock,))
            tl.append(t)
            t.start()
    
        for i in tl:
            i.join()
    
        print(f"主線程{x}")   #多個線程搶佔一個資源,join讓主線程等待子線程執行完成在執行,結果0

線程死鎖現象:

  • 多個線程或者進程競爭資源,若是開啓的互斥鎖過多,遇到互相搶鎖形成互相等待狀況,程序夯住,

  • 還有一種是給同時給一個線程或者進程連續加鎖屢次,利用遞歸鎖解決Rlock

  • from threading import Thread
    from threading import Lock
    import time
    
    lock_A = Lock()
    lock_B = Lock()
    
    class Mtthread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
    
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
            lock_B.release()
    
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
    
            time.sleep(1)
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
            lock_A.release()
    
            lock_B.release()
    
    if __name__ == '__main__':
        t1 = Mtthread()
        t1.start()
    
        t2 = Mtthread()
        t2.start()
    
        t3 = Mtthread()
        t3.start()
        print(f"主進程")

遞歸鎖:

  • 遞歸鎖上有引用次數,每次引用計數+1,解鎖計數-1,只有計數爲0.在運行下個進程

  • #遞歸鎖:
    #遞歸鎖是一把鎖,鎖上有記錄,只要acquire一次,鎖上就計數一次,acquire2次就計數兩次
    #release 1次減一,只要遞歸鎖計數不爲0,其餘線程不能搶
    
    from threading import Thread
    from threading import RLock
    import time
    
    lock_A = lock_B = RLock()
    
    class Mtthread(Thread):
        def run(self):
            # lock_A.acquire()
            # lock_B.acquire()
            # print(111)
            # lock_A.release()
            # lock_B.release()
    
            self.f1()
            self.f2()
    
        def f1(self):
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
    
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
            lock_B.release()
    
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
    
            time.sleep(1)
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
            lock_A.release()
    
            lock_B.release()
    
    if __name__ == '__main__':
        t1 = Mtthread()
        t1.start()
        t2 = Mtthread()
        t2.start()
        t3 = Mtthread()
        t3.start()
        print(f"主進程")

信號量:

  • 信號量准許多個線程或者進程同時進入

  • from  threading import Thread
    from  threading import current_thread
    from  threading import Semaphore
    import time
    import random
    
    sm = Semaphore(4)
    
    def chi():
        sm.acquire()
        print(f"{current_thread().name}正在吃飯")
        time.sleep(random.randint(1,3))
        sm.release()
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=chi)
            t.start()

GIL鎖:

  • 全局解釋器鎖,就是一把互斥鎖,將併發變成串行,同一時刻只能有一個線程進入解釋器,自動加鎖和釋放鎖,犧牲效率保護python解釋器內部數據安全

  • 優勢:
    • 強行加鎖,保證解釋器裏面的數據安全

  • 缺點:
    • 多進程能夠利用多核,多進程的每一個進程裏面都有python解釋器程序

    • 單進程的多線程不能利用多核,python解釋器內部程序,不支持多線程同時解釋

  • 討論:
    • python-單核處理IO阻塞的多線程,java多核處理IO阻塞問題,效率差很少

    • 單核處理三個IO線程,多核處理三個IO線程,多核快些

  • 代碼的執行:
    • CPython獨有GIL鎖:
      • 將你的py文件當作實參傳送給解釋器傳換成c語言字節碼,在交給虛擬機轉換成010101機器碼,這些代碼都是線程執行,進程進行調度資源

    • lpython:交互式解釋器,能夠補全代碼

    • Jpython:java語言字節碼,剩下的同樣

    • pypy:動態編譯,JAT技術,執行效率要比Cpython塊,可是技術還有缺陷bug

驗證Python開發效率:

  • 單核CPU:
    • 一核,都是單進程多線程併發快,由於單核開啓多進程也是串行。

  • 多核CPU:
    • 計算密集型:

      • 多進程的並行比多線程的併發執行效率高不少(由於不一樣進程運行在不一樣核心上,並行執行)

    • IO密集型:

      • 多線程要比多進程處理速度快,由於進程開銷大,而線程處理其實也是串行,只不過處理速度比進程更快些,線程一次只能處理一個事情(空間複用)

      • 開啓150個進程(開銷大,速度慢),執行IO任務耗時長

      • 開啓150個線程(開銷小,速度快),執行IO任務耗時短

  • 若是你的任務是io密集型而且任務數量大,用單進程下的多線程處理阻塞效率高

  • 計算密集型:
    • from multiprocessing import Process
      from threading import Thread
      import time
      import os
      # print(os.cpu_count())
      
      def task1():
          res = 1
          for i in range(1, 100000000):
              res += i
      def task2():
          res = 1
          for i in range(1, 100000000):
              res += i
      def task3():
          res = 1
          for i in range(1, 100000000):
              res += i
      def task4():
          res = 1
          for i in range(1, 100000000):
              res += i
      
      if __name__ == '__main__':
          # 四個進程 四個cpu 並行 效率
          start_time = time.time()
          p1 = Process(target=task1)
          p2 = Process(target=task2)
          p3 = Process(target=task3)
          p4 = Process(target=task4)
      
          p1.start()
          p2.start()
          p3.start()
          p4.start()
      
          p1.join()
          p2.join()
          p3.join()
          p4.join()
          print(f'主: {time.time() - start_time}')   # 10.125909328460693
      
      # 一個進程 四個線程
      #     start_time = time.time()
      #     p1 = Thread(target=task1)
      #     p2 = Thread(target=task2)
      #     p3 = Thread(target=task3)
      #     p4 = Thread(target=task4)
      #
      #     p1.start()
      #     p2.start()
      #     p3.start()
      #     p4.start()
      #
      #     p1.join()
      #     p2.join()
      #     p3.join()
      #     p4.join()
      #     print(f'主: {time.time() - start_time}')  # 22.927688121795654
  • 計算IO密集型:

    • from multiprocessing import Process
      from threading import Thread
      import time
      import os
      # print(os.cpu_count())
      
      def task1():
          res = 1
          time.sleep(3)
      
      if __name__ == '__main__':
      # 開啓150個進程(開銷大,速度慢),執行IO任務, 耗時 8.382229089736938
      #     start_time = time.time()
      #     l1 = []
      #     for i in range(150):
      #         p = Process(target=task1)
      #         l1.append(p)
      #         p.start()
      #     for i in l1:
      #         i.join()
      #     print(f'主: {time.time() - start_time}')
      
      # 開啓150個線程(開銷小,速度快),執行IO任務, 耗時 3.0261728763580322
      #     start_time = time.time()
      #     l1 = []
      #     for i in range(150):
      #         p = Thread(target=task1)
      #         l1.append(p)
      #         p.start()
      #     for i in l1:
      #         i.join()
      #     print(f'主: {time.time() - start_time}')

GIL鎖和互斥鎖關係:

  • 線程計算密集型:
    • 當程序執行,開啓100個線程時,第一個線程先要拿到GIL鎖,而後拿到lock鎖,執行代碼,釋放lock鎖,最後釋放GIL鎖
  • 線程IO密集型:
    • 當程序執行,開啓100個線程時,第一個線程先要拿到GIL鎖,而後拿到lock鎖,遇到阻塞,CPU切走,GIL釋放,第一個線程掛起

    • 第二個線程執行,搶到GIL鎖,進入要搶lock,可是lock鎖還沒釋放,阻塞掛起

  • 本身加互斥鎖,必定要加在處理共享數據的地方,加的範圍不要擴大,範圍過大,影響併發

  • GIL鎖單進程的多線程不能利用多核,不能並行,可是能夠併發

  • 互斥鎖:
    • GIL自動上鎖解鎖,文件中的互斥鎖Lock,手動上鎖解鎖

    • GIL鎖,保護解釋器的數據安全,互斥鎖是保護的文件的數據安全

線程池:

  • 線程池在系統啓動時建立了大量的空閒線程,線程執行直接調用線程池中已經開啓好的空閒線程,當線程執行結束,該線程不會死亡,而是將線程變成空閒狀態,放回進程池。

  • 線程池提升效率,資源複用

  • 進程池:放置進程的一個容器

  • 線程池:放置線程的一個容器

  • 完成一個簡單的socket通訊,服務端必須與一個客戶端交流完畢,而且這個客戶端斷開鏈接以後,服務端才能接待下一個客戶:

  • #開啓進程池或者線程池:
    #線程池好仍是進程池好:io阻塞或者計算密集型
    from  concurrent.futures import ProcessPoolExecutor
    from  concurrent.futures import ThreadPoolExecutor
    import time
    import os
    import random
    
    def task(name):
        # print(name)
        print(f"{os.getpid()}準備接客")
        time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        # p = ProcessPoolExecutor(max_workers=5)  #限制進程數量,默認爲cpu個數
        p = ThreadPoolExecutor()                    #線程默認是CPU個數的五倍
    
        for i in range(23):
            p.submit(task,1)                      #給進程池放置任務啓動,1爲傳參

阻塞,非阻塞:

  • 程序運行中的狀態,阻塞,運行,就緒

  • 阻塞:當你程序遇到IO阻塞掛起,CPU切換,等到IO結束以後再執行

  • 非阻塞:程序沒有IO,或者遇到IO經過某種手段讓cpu去執行其餘任務,儘量的佔用CPU

同步:

  • 任務發出去以後等待,直到這個任務最終結束以後,給我一個返回值,發佈下一個任務

  • 同步示例:
  • from concurrent.futures import ProcessPoolExecutor
    import os
    import time
    import random
    
    def task():
        print(f"{os.getpid()}is running")
        time.sleep(1)
        return f'{os.getpid()} is finish'
    
    if __name__ == '__main__':
        p = ProcessPoolExecutor(4)
    
        for i in range(10):
            obj = p.submit(task,)
            print(obj.result())      #同步等待一個進程內容所有執行完成在執行下一個

異步:

  • 將任務發給進程,無論任務如何,直接運行下一個

  • 異步示例:
  • from concurrent.futures import ProcessPoolExecutor
    import os
    import time
    import random
    
    def task():
        print(f'{os.getpid()} is running')
        time.sleep(random.randint(0,2))
        return f'{os.getpid()} is finish'
    
    if __name__ == '__main__':
        p = ProcessPoolExecutor(4)
        obj_l1 = []
        for i in range(10):
            obj = p.submit(task,)   # 異步發出.
            obj_l1.append(obj)
    
        # time.sleep(3)
        p.shutdown(wait=True)
        # 1. 阻止在向進程池投放新任務,
        # 2. wait = True 十個任務是10,一個任務完成了-1,直至爲零.進行下一行.
        for i in obj_l1:
            print(i.result())

異步+回調機制:

  • 異步發佈任務,就無論任務結果
  • 回調:
    • 回調是你異步發佈任務執行完成後,將結果丟給回調函數add_done_callback,回調函數幫你分析結果,進程繼續完成下一個任務
    • 回調就是對特定的事件或者條件進行響應

  • 爬蟲:遊覽器作的事情很簡單:
    • 瀏覽器 封裝頭部,發送一個請求--->www.taobao.com ----> 服務器獲取到請求信息,分析正確--->給你返回一個文件,--->遊覽器將這個文件的代碼渲染,就成了你看的樣子

    • 爬蟲:利用reauests模塊功能模擬遊覽器封裝頭,給服務器發送一個請求,騙過服務器以後,服務器也會給你返回一個文件,爬蟲拿到文件,進行數據清洗獲取到你想要的信息

  • 爬蟲分兩步:
    • 第一步:爬取服務器端的文件(IO阻塞)

    • 第二部:拿到文件,進行數據分析(非IO,IO極少)

  • 錯誤版本示例:
    • import requests
      from concurrent.futures import ProcessPoolExecutor
      from multiprocessing import Process
      import time
      import random
      import os
      
      def get(url):
          response = requests.get(url)
          print(f'{os.getpid()} 正在爬取:{url}')
          time.sleep(random.randint(1,3))
          if response.status_code == 200:
              return response.text
      
      def parse(text):
          print(f'{os.getpid()} 分析結果:{len(text)}')
      
      if __name__ == '__main__':
          url_list = [
              'http://www.taobao.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.baidu.com',
              'https://www.cnblogs.com/jin-xin/articles/11232151.html',
              'https://www.cnblogs.com/jin-xin/articles/10078845.html',
              'http://www.sina.com.cn',
              'https://www.sohu.com',
              'https://www.youku.com',
          ]
          pool = ProcessPoolExecutor(4)
          obj_list = []
          for url in url_list:
              obj = pool.submit(get, url)
              obj_list.append(obj)
      
          pool.shutdown(wait=True)
      
          for obj in obj_list:          #抓取網頁是串行,輸出的結果
              parse(obj.result())
      
      #爬取一個網頁須要2s,併發爬取10個網頁:2.多s.
      #分析任務: 1s.    10s. 總共12.多秒.
      
      # 如今這個版本的過程:
      # 異步發出10個爬取網頁的任務,而後4個進程併發(並行)的先去完成4個爬取網頁的任務,而後誰先結束,誰進行下一個
      # 爬取任務,直至10個任務所有爬取成功.
      # 將10個爬取結果放在一個列表中,串行的分析.
      
      
      import requests
      from concurrent.futures import ProcessPoolExecutor
      from multiprocessing import Process
      import time
      import random
      import os
      
      def get(url):
          response = requests.get(url)
          print(f'{os.getpid()} 正在爬取:{url}')
          time.sleep(random.randint(1,3))
          if response.status_code == 200:
              parse(response.text)
      
      def parse(text):
          print(f'{os.getpid()} 分析結果:{len(text)}')
      
      if __name__ == '__main__':
          url_list = [
              'http://www.taobao.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.baidu.com',
              'https://www.cnblogs.com/jin-xin/articles/11232151.html',
              'https://www.cnblogs.com/jin-xin/articles/10078845.html',
              'http://www.sina.com.cn',
              'https://www.sohu.com',
              'https://www.youku.com',
          ]
          pool = ProcessPoolExecutor(4)
          for url in url_list:
              obj = pool.submit(get, url)
      
          # pool.shutdown(wait=True)
          print('主')
      #異步發出10個 爬取網頁+分析 的任務,而後4個進程併發(並行)的先去完成4個爬取網頁+分析 的任務,
      #而後誰先結束,誰進行下一個 爬取+分析 任務,直至10個爬取+分析 任務所有完成成功.
  • 正確版本示例:
    • import requests
      from concurrent.futures import ProcessPoolExecutor
      from multiprocessing import Process
      import time
      import random
      import os
      
      def get(url):
          response = requests.get(url)
          print(f'{os.getpid()} 正在爬取:{url}')
          if response.status_code == 200:
              return response.text
      
      def parse(obj):
          time.sleep(1)
          print(f'{os.getpid()} 分析結果:{len(obj.result())}')
      
      if __name__ == '__main__':
      
          url_list = [
              'http://www.taobao.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.baidu.com',
              'https://www.cnblogs.com/jin-xin/articles/11232151.html',
              'https://www.cnblogs.com/jin-xin/articles/10078845.html',
              'http://www.sina.com.cn',
              'https://www.sohu.com',
              'https://www.youku.com',
          ]
          start_time = time.time()
          pool = ProcessPoolExecutor(4)
          for url in url_list:
              obj = pool.submit(get, url)
              obj.add_done_callback(parse)
              # 增長一個回調函數
              # 如今的進程完成的仍是網絡爬取的任務,拿到了返回值以後,結果丟給回調函數add_done_callback,
              # 回調函數幫助你分析結果
              # 進程繼續完成下一個任務.
          pool.shutdown(wait=True)   #阻止發佈新的任務,代替join
      
          print(f'主: {time.time() - start_time}')
      
      # 回調函數是主進程幫助你實現的, 回調函數幫你進行分析任務. 明確了進程的任務: 只有一個網絡爬取.
      # 分析任務: 回調函數執行了.對函數之間解耦.
      
      # 極值狀況: 若是回調函數是IO任務,那麼因爲你的回調函數是主進程作的,因此有可能影響效率.
      
      # 回調不是萬能的,若是回調的任務是IO,
      # 那麼異步 + 回調機制 很差.此時若是你要效率只能犧牲開銷,再開一個線程進程池.

隊列模式:

  • FIFO 先進先出原則:
    • import queue
      q = queue.Queue(3)
      q.put(1)
      q.put(2)
      q.put('海洋')
      
      print(q.get())
      print(q.get())
      print(q.get())
  • LIFO 棧.-先進後出:
    • import queue
      q = queue.LifoQueue()
      q.put(1)
      q.put(3)
      q.put('海洋')
      
      print(q.get())
      print(q.get())
      print(q.get())
  • 優先級隊列:
    • # 須要元組的形式,(int,數據) int 表明優先級,數字越低,優先級越高.
      import queue
      q = queue.PriorityQueue(3)
      
      q.put((10, '垃圾消息'))
      q.put((-9, '緊急消息'))
      q.put((3, '通常消息'))
      
      print(q.get())
      print(q.get())
      print(q.get())

事件Event:

  • 併發的執行某個任務,多進程多線程,幾乎同時執行,一個線程執行到中間時,通知另外一個線程開始執行

  • import time
    from threading import Thread
    from threading import current_thread
    from threading import Event
    
    event = Event()  # 默認是False
    def task():
        print(f'{current_thread().name} 檢測服務器是否正常開啓....')
        time.sleep(3)   # 先運行task阻塞三秒,在將event修改成True
        event.set()     # 改爲了True
    
    def task1():
        print(f'{current_thread().name} 正在嘗試鏈接服務器')
        # event.wait()  # 輪詢檢測event是否爲True,當其爲True,繼續下一行代碼. 阻塞
        event.wait(1)
        # 設置超時時間,若是1s中之內,event改爲True,代碼繼續執行.
        # 設置超時時間,若是超過1s中,event沒作改變,代碼繼續執行.
        print(f'{current_thread().name} 鏈接成功')
    
    if __name__ == '__main__':
        t1 = Thread(target=task1,)
        t2 = Thread(target=task1,)
        t3 = Thread(target=task1,)
    
        t = Thread(target=task)
        t.start()
    
        t1.start()
        t2.start()
        t3.start()

協程:

  • 協程的本質也是一個線程,而使用協程目的是爲了減小系統開銷,協程是咱們經過程序來控制任務切換,協程速度比系統更快,最大限度的利用CPU,更加輕量級

  • 線程協程的區別:

    • 協程沒有鎖,協程又稱微線程
    • 線程和協程不一樣的是,線程是搶佔式調度切換,而協程是須要本身調度
    • 線程和進程,調度是CPU決定的,而協程就是上帝,在一個線程中規定某個代碼塊的執行順序

  • 1,協程切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,更加輕量級

  • 2.單線程內就能夠實現併發的效果,最大限度的利用CPU

  • 3.修改共享的數據不須要加鎖

  • 協程就像線程同樣也是在多任務間來回切換

  • 在其餘語言中,協程的意義不大,多線程便可以解決I/O問題,在python中有GIL鎖,在同一時間只有一個線程在工做,因此一個線程裏面IO操做特別多,協程比較適用

  • 串行:多個任務執行時,一個任務從開始執行,遇到IO等待,等待IO阻塞結束以後再執行下一個

  • 並行:多核多個線程或者進程同時執行,四個CPU同時執行四個任務

  • 併發:多個任務看起來是同時執行,CPU在多個任務之間來回切換,遇到IO阻塞,計算密集型執行時間過長

    • 併發本質:遇到IO阻塞,計算密集型執行時間過長,保持原來的狀態

  • 一個線程實現開發:

    • 多進程:操做系統控制,多個進程的多個任務切換 + 保持狀態

    • 多線程程:操做系統控制,多個線程的多個任務切換 + 保持狀態

    • 協程:程序控制一個線程的多個任務的切換以及保持狀態

      • 微併發,處理任務不宜過多

      • 協程他會調度CPU,若是協程管控的任務中,遇到阻塞,他會快速的(比操做系統快),切換到另外一個任務,而且能將上一個任務掛起(保持狀態),讓操做系統覺得CPU一直在工做

  • 串行和協程對比:
    • 密集型數據串行和協程對比,確定串行速度快,由於協程運行還要來回切換
    • import time
      def task1():
          res = 1
          for i in range(1,100000):
              res += i
      
      def task2():
          res = 1
          for i in range(1,100000):
              res -= i
      
      start_time = time.time()
      task1()
      task2()
      print(f'串行消耗時間:{time.time()-start_time}')  # 串行消耗時間:0.012489557266235352
      
      
      def task1():
          res = 1
          for i in range(1, 100000):
              res += i
              yield res
      
      def task2():
          g = task1()
          res = 1
          for i in range(1, 100000):
              res -= i
              next(g)
      
      start_time = time.time()
      task2()
      print(f'協程消耗時間:{time.time() - start_time}')  # 協程消耗時間:0.02991938591003418
  • 開啓協程:
    • 遇到gevent阻塞切換:
    • import gevent
      import time
      def eat(name):
          print('%s eat 1' %name)     # 1
          gevent.sleep(2)              #協程識別gevent,能夠進行IO切換
          # time.sleep(300)            #協程不識別切換不了,不可切換
          print('%s eat 2' %name)     # 4
      
      def play(name):
          print('%s play 1' %name)    # 2
          gevent.sleep(1)
          # time.sleep(3)
          print('%s play 2' %name)    # 3
      
      g1 = gevent.spawn(eat, '海洋')
      g2 = gevent.spawn(play, name='俊麗')   #協程異步發佈任務
      # g1.join()
      # g2.join()
      #或者gevent.joinall([g1,g2])
      gevent.joinall([g1,g2])                #主線程等待協程執行完畢
      print('主')                            #5
    • 全部IO阻塞均可以切換:
    • import threading
      from gevent import monkey
      monkey.patch_all()         # 將你代碼中的全部的IO都標識.
      
      import gevent              # 直接導入便可
      import time
      
      def eat():
          print(f'線程1:{threading.current_thread().getName()}')    # 1
          print('eat food 1')                                      # 2
          time.sleep(3)          # 加上mokey就可以識別到time模塊的sleep了
          print('eat food 2')                                      # 6
      
      def play():
          print(f'線程2:{threading.current_thread().getName()}')    # 3
          print('play 1')                                          # 4
          time.sleep(1)  
          # 來回切換,直到一個I/O的時間結束,這裏都是咱們個gevent作得,再也不是控制不了的操做系統了。
          print('play 2')                                          # 5
      
      g1=gevent.spawn(eat)
      g2=gevent.spawn(play)
      gevent.joinall([g1,g2])
      print(f'主:{threading.current_thread().getName()}')          # 7
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息