併發編程 進程基礎

操做系統

  • 多道 、分時、實時html

  • 同步異步python

    • 同步:一件事情完成後再作另外一件事
    • 異步:同時作多件事
  • 阻塞和非阻塞編程

    • 阻塞:recv,accept,recvfrom
      • 會讓整個進程進入阻塞隊列
    • 非阻塞:進程只會在就緒和 運行狀態中切換
  • 進程三狀態:就緒 運行 阻塞json

  • 併發並行安全

    • 併發是包含並行的
    • 併發:宏觀上多個程序同時運行,實際是同一時間只運運行了一次
    • 並行:微觀上多個程序同時運行
  • 子進程和主進程併發

    • pid ppid
  • 多併發的tcp服務端app

    • import socket
      from multiprocessing import Process
      def communicate(conn):
          while True:
              conn.send("hello".encode("utf-8"))
              print(conn.recv(1024))
      if __name__ == '__main__':
          sk = socket.socket()
          sk.bind(('127.0.0.1',9001))
          sk.listen()
          while True:
              conn,addr = sk.accept()
              Process(target=communicate,args=(conn,)).start()
      import socket
      sk = socket.socket()
      sk.connect(('127.0.0.1',9001))
      while True:
          print(sk.recv(1024))
          mv = input(">>>>>>>>>>:").strip()
          sk.send(mv.encode("utf-8"))
  • 進程是操做系統中最小的資源分配單位dom

  • 進程異步

    • multiprocessing
    • multiprocessing.Process
    • 如何開啓一個子進程
  • Process 開啓子進程

    • 第二種開啓子進程的方式socket

      • def func(index):
            time.sleep(random.random())
            print('第%s個郵件已經發送完畢'%index)
        if __name__ == '__main__':
            p_lst = []
            for i in range(10):
                p = Process(target=func,args=(i,))
                p.start()
                p_lst.append(p)
            for p in p_lst:
                p.join()
            print('所有發送完畢')
    • join控制子進程

      • #子進程同步,執行完畢後才執行主程序後面的程序
        # import time
        # from multiprocessing import Process
        # def f(name):
        #     print("hello",name)
        #     time.sleep(1)
        # if __name__ == '__main__':
        #     for i in range(5):
        #         p = Process(target=f,args=(i,))
        #         p.start()
        #         p.join()      #阻塞,
        #     print("主進程執行")
        
        #子程序異步執行,執行完了阻塞結束
        import time
        from multiprocessing import Process
        def f(name):
            print("hello",name)
            time.sleep(1)
        if __name__ == '__main__':
            p_list = []
            for i in range(10):
                p = Process(target=f,args=(i,))
                p.start()
                p_list.append(p)
            for i in p_list:
                i.join()
            print("主進程執行完畢")
    • 守護進程 daemon

      • 守護進程會隨着主進程代碼執行完畢而結束

      • 守護進程內沒法再開啓子進程,不然會拋出異常

      • 注意:進程之間是相互獨立的,主進程代碼運行結束,守護進程也會隨即終止

      • import time
        from multiprocessing import Process
        def func1():
            count = 1
            while True:
                time.sleep(0.5)
                print(count*"*")
                count += 1
        def func2():
            print("func strat")
            time.sleep(5)
            print("func2 end")
        if __name__ == '__main__':
            p1 = Process(target=func1)
            p1.daemon = True      #定義爲守護進程
            p1.start()          #執行
            Process(target=func2).start()
            time.sleep(3)
            print("主進程")
        #輸出
        # func strat
        # *
        # **
        # ***
        # ****
        # *****
        # 主進程
        # func2 end

        若是主進程執行完畢那麼守護進程也會結束,可是其餘子進程若是沒執行完還會繼續執行

    • 做業:在進程之間保證數據安全性

    • from multiprocessing import Process,Lock

    • lock= Lock()實例對象

    • lock.acquire() 取鑰匙開門

    • lock.release() 關門放鑰匙

    • 例題 模擬搶票

    • import time
      import json
      from multiprocessing import Process,Lock
      def search(person):         #查票
          with open("ticket") as f:   #文件中保存着一個字典{"count":4}
              dic = json.load(f)   #讀出文件中的字典
          time.sleep(0.2)
          print("%s查詢餘票"%person,dic["count"])
      def get_ticket(person):         #搶票
          with open("ticket") as f:
              dic = json.load(f)
          time.sleep(0.2)             #模擬延遲
          if dic["count"] >0:
              print("%s買到票了"%person)
              dic["count"] -= 1
              time.sleep(0.2)
              with open("ticket","w") as f:
                  json.dump(dic,f)    #寫回文件
          else:
              print("%s沒買到票"%person)
      def ticket(person,lock):
          search(person)
          lock.acquire()      #開門,一次只能進一個
          get_ticket(person)
          lock.release()      #關門
      if __name__ == '__main__':
          lock = Lock()
          for i in range(10):
              p = Process(target=ticket,args=("person%s"%i,lock))
              p.start()

      爲了保證數據的安全,在異步的狀況下,多個進程又可能同時修改同一份數據的時候,須要給這個數據上鎖

    • 加鎖的做用

      • 下降了程序的效率,讓原來可以同時執行的代碼編程順序執行了,異步變同步的過程,保證了數據的安全
  • 同步控制

    • import time
      from multiprocessing import Process,Lock
      def func(num,lock):
          time.sleep(1)
          print("異步執行",num)
          lock.acquire()
          time.sleep(0.5)
          print("同步執行",num)
          lock.release()      #同步執行是依次執行,間隔0.5秒
      if __name__ == '__main__':
          lock = Lock()
          for i in range(10):
              p = Process(target=func,args=(i,lock))
              p.start()
  • 信號量 機制:計數器+鎖實現的 Semaphore

    • 主程序控制必定數量的子程序同時執行,這些數量的子程序執行完一個就會有下一個子程序補充進來

    • import time
      import random
      from multiprocessing import Process,Semaphore
      def ktv(person,sem):
          sem.acquire()       #進
          print("%s走進KTV"%person)
          time.sleep(random.randint(1,3))     #隨機延遲一到三秒
          print("%s走出ktv"%person)
          sem.release()       #出
      if __name__ == '__main__':
          sem = Semaphore(4)      #信號量爲4,默認爲1
          for i in range(10):
              Process(target=ktv,args=(i,sem)).start()
  • 事件 Event

    • 阻塞事件 wait() 方法

      • wait 是否阻塞是看event對象你不的一個屬性
    • 控制這個屬性的值

      • set()將這個屬性的值改爲True

      • clear() 將這個屬性的值改爲False

      • is_set() 判斷當前屬性是否爲True

      • #模擬紅綠燈,只有所有車經過後才中止
        import time
        import random
        from multiprocessing import Process,Event
        def traffic_light(e):
            print("紅燈亮")
            while True:
                if e.is_set():
                    time.sleep(2)
                    print("紅燈亮")
                    e.clear()
                else:
                    time.sleep(2)
                    print("綠燈亮")
                    e.set()
        def car(e,i):
            if not e.is_set():
                print("car%s在等待"%i)
                e.wait()
            print("car%s經過了"%i)
        if __name__ == '__main__':
            e = Event()
            p = Process(target=traffic_light,args=(e,))
            p.daemon =True    #變成守護進程
            p.start()
            p_list = []
            for i in range(10):
                time.sleep(random.randrange(0,3,2))
                p = Process(target=car,args=(e,i))
                p.start()
                p_list.append(p)
            for p in p_list:p.join()
  • 進程之間的通訊(IPC)
    • 多個進程之間有一些固定的通訊內容

    • socket給予文件家族通訊

    • 進程之間雖然內存不共享,可是能夠通訊,

    • 進程隊列 Queue
      • 進程之間數據是安全的,先進先出
    • 隊列是基於管道 + 鎖 實現的

    • 管道(Pipe)是基於socket,pickle實現的

    • def consume(q):
          print('son-->',q.get())
          q.put('abc')
      if __name__ == '__main__':
          q = Queue()
          p = Process(target=consume,args=(q,))
          p.start()
          q.put({'123':123})
          p.join()
          print('Foo-->',q.get())
    • 簡單的生產消費模型

      def consume(q):
          print('son-->',q.get())
          q.put('abc')
      if __name__ == '__main__':
          q = Queue()
          p = Process(target=consume,args=(q,))
          p.start()
          q.put({'123':123})
          p.join()
          print('Foo-->',q.get())
    • 相同的原理 JoinableQueue

      • task_done 通知隊列已經有一個數據被處理了

      • q.join() 阻塞直到放入隊列中全部的數據都被處理掉(有多少個數據就接受到多少taskdone)

      • import time
        import random
        from multiprocessing import Process,JoinableQueue
        def consumer(q,name):
            while True:
                food = q.get()
                time.sleep(random.uniform(0.3,0.8))
                print("%s吃了一個%s"%(name,food))
                q.task_done()
        def producer(q,name,food):
            for i in range(10):
                time.sleep(random.uniform(0.3,0.8))
                print("%s生產了%s%s"%(name,food,i))
                q.put(food+str(i))
        if __name__ == '__main__':
            jq = JoinableQueue()
            c1 = Process(target=consumer,args=(jq,"alex"))
            c1.daemon = True
            p1 = Process(target=producer,args=(jq,"libai","包子"))
            c1.start()
            p1.start()
            p1.join()
            jq.join()
  • 管道 進程之間數據不安全 且存取數據複雜

  • 進程池(Pool) multiprocessing起進程池

    • 進程池開啓的個數:默認是CPU的個數

    • 開啓過多的進程並不能提升你的效率,反而會下降效率

    • 計算密集型 充分佔用CPU 多進程能夠充分利用多核 適合開啓多進程,可是不適合開啓不少多進程

    • IO密集型 大部分時間都在阻塞隊列,而不是在運行狀態 根本不太適合開啓多進程

    • 提交任務:

      • 1.同步提交 apply

        • 返回值:子進程對應函數的返回值

        • 一個一個順序執行的,並無任何的併發效果

        • # import os
          # import time
          # from multiprocessing import Process,Pool
          # def task(num):
          #     time.sleep(0.5)
          #     print("%s: %s"%(num,os.getpid()))
          #     return num ** 2
          # if __name__ == '__main__':
          #     p = Pool(4)
          #     for i in range(20):
          #         res = p.apply(task,args=(i,)) #apply   提交任務方法,同步提交
          #         print("--->",res)
          #四個任務依次執行,輪換
      • 2.異步提交 apply_async

        • 沒有返回值,要想全部任務可以順利的執行完畢
          • p.close()
          • p.join() 必須先close在join,阻塞直到進程池中全部任務都執行完畢
        • 有返回值的狀況下
          • res.get() #get不能再提交任務以後馬上執行,應該是先提交全部的任務再經過get獲取結果
- ```Python
    import os
    import time
    from multiprocessing import Pool
    def task(num):
        time.sleep(1)
        print("%s: %s"%(num,os.getpid()))
        return num **2
    if __name__ == '__main__':
        p = Pool(4)
        for i in range(20):
            res = p.apply_async(task,args=(i,))     #apply_async   異步提交
        p.close()
        p.join()
    #輸出結果同時四個認識執行
    ```

- 3.map()方法

  - 異步提交的簡化版本
  - 自帶close和join方法
  - 直接拿到返回值的可迭代對象
  - 循環能夠拿到返回值
  • 數據共享 Manager

    • 把全部實現了數據共享的比較便捷的類都從新又封裝了一遍,而且在原有的multiprocessing基礎上

    • 支持的數據類型有限

    • list dict都不是安全的數據,你須要本身加鎖來保證數據的安全

    • from multiprocessing import Manager,Process,Lock
      def work(d,lock):
          with lock:
              d["count"] -= 1
      if __name__ == '__main__':
          lock = Lock()
          with Manager() as m:            #使用以後數據就會變成共享
              dic = m.dict({"count":100})
              p_l = []
              for i in range(100):
                  p = Process(target=work,args=(dic,lock))
                  p_l.append(p)
                  p.start()
              for p in p_l:
                  p.join()
              print(dic)
  • 進程池-----回調函數

    • 當func執行完畢後執行callback函數

    • func的返回值做爲callback的參數

    • 回調函數是在主進程實現的

    • 子進程有大量的計算要去作,回調函數等待結果作簡單處理

    • import os
      from multiprocessing import Pool
      def func(i):
          print("第一個任務",os.getpid())      
          return "*"*i
      def call_back(res):
          print("回調函數",os.getpid())       ##pid號爲11420  與主進程pid號相同
          print("res---->",res)
      if __name__ == '__main__':
          p = Pool()
          print("主進程",os.getpid())        #pid爲11420   說明回調函數是主進程實現的
          p.apply_async(func,args=(1,),callback=call_back)
          p.close()
          p.join()
      #基於多進程的共享數據的小爬蟲
      import re
      from urllib.request import urlopen
      url_lst = [
          'http://www.baidu.com',
          'http://www.sohu.com',
          'http://www.sogou.com',
          'http://www.4399.com',
          'http://www.cnblogs.com',
      ]
      from multiprocessing import Pool
      def get_url(url):
          response = urlopen(url)
          ret = re.search("www\.(.*?)\.com",url)
      
          print("%s finished"%ret.group(1),ret.group())
          return ret.group(1),response.read()
      
      def call(content):
          url,con = content
          with open(url+".html","wb") as f:
              f.write(con)
      
      if __name__ == '__main__':
          p = Pool()
          for url in url_lst:
              p.apply_async(get_url,args=(url,),callback=call)
          p.close()
          p.join()
相關文章
相關標籤/搜索