進程模塊multiprocessing

進程模塊multiprocessing

進程的建立 -用Process

  • 註冊進程Precess類安全

    #導入Process類
      from multiprocessing import Process
    
      #建立一個函數
      def fun():pass   
      #將這個函數的運行註冊進一個新的進程中
      p = Process(target=fun) #註冊進程,傳入的是地址
      p.start()   #開始這個進程

    進程號的查看 -os.getpid() -os.getppid()

  • 查看進程的進程號os.getpid()app

    import os
      from multiprocessing improt Process
      #建立一個函數,獲取這個進程的進程號
      def func():
          print('子進程:'os.getpid())
    
      p = Process(target=fun) #註冊進程
      p.start #開始進程
      print('父進程: ', os.getpid()) #打印主進程的進程號
    
      #結果
      子進程: 1234
      父進程: 6789
  • 查看父進程os.getppid()dom

    import os
      from multiprocessing improt Process
      #建立一個函數,獲取這個進程的進程號
      def func():
          print('子進程的父進程:'os.getpiid())
    
      p = Process(target=fun) #註冊進程
      p.start #開始進程
      print('父進程的父進程: ', os.getpiid())    #打印主進程的進程號
    
      #結果
      子進程的父進程: 6789   #當前進程 
      父進程的父進程: 1470   #pycharm運行的進程

    給進程傳入參數 -args=(tuple)

  • 新進程給函數傳參數函數

    '''註冊進程'''
      from multiprocessing import Process
      def fun(args):pass      #函數須要傳入一個參數 
    
      p = Process(target=fun, args=(canshu1,))    #給args傳入就行,可是傳入的是一個元祖   
    
      p.start()   #開始這個進程

進程的開始結束 -主進程和子進程的存活關係

  • 默認狀況下
    • 個進程互不干擾,主進程的結束不影響子進程的結束ui

      import time
        from multiprocessing import Process
        def fun():
            time.sleep(3)
            print('子進程結束')
        if __name__ == '__main__':  #啓動進程必須判斷這個
            p = Process(target=fun)
            p.start()
      
            print('主進程結束')
        #結果
        主進程結束
        子進程結束
  • join()待子進程結束後程序繼續進行操作系統

    import time
      from multiprocessing import Process
      def fun():
          print(1234)
          time.sleep(3)
      if __name__ == '__main__':    
          p = Process(target=fun)
          p.start()
          p.join()    #這裏是子進程的結束位置,父進程等待子進程結束後繼續運行
          print('您好')
      #結果
      1234
      您好  #停頓3秒後您好才被顯示出來

    開始多個子進程

  • 開始多個子進程code

    from multiprocessing import Process
      def fun(num):
          print('進程{}正在運行'.format(num))
    
      if __name__ == '__main__':  
          for i in range(3):
              p = Process(target=fun, i)
              p.start()
      #結果
      進程0正在運行
      進程1正在運行
      進程2正在運行
  • 等待所有子進程結束後運行下面的代碼orm

    from multiprocessing import Process
      def fun(num):
          print('進程{}正在運行'.format(num))
    
      if __name__ == '__main__':
          p_list = []
          for i in range(3):
              p = Process(target=fun, i)
              p_list.append(p)
              p.start()
          [p.join() for p in p_list]  #關鍵之處
          prnt('全部的子進程所有結束')
      #結果 #__結果的輸出進程不必定是按照順序輸出的__
      進程2正在運行
      進程0正在運行
      進程1正在運行
      全部的子進程所有結束

    利用類建立進程

    from multiprocessing import Process
      import os
      class MyProcess(Process):
          def __init__(self, arg1, arg2):
              super().__init__()  #必須繼承父類的__init__()
              self.arg1 = arg1
              self.arg2 = arg2
    
          def run(self):  #必須有一個run()方法
              print(os.getpid())
      if __name__ == '__main__':
          p = MyProcess()
          p.start()       #調用run()方法

多進程之間的數據隔離問題

  • 父進程和子進程之間的數據是隔離開的

守護進程

  • 子進程-->守護進程 -設置daemon = True
    • 守護進程:主進程結束,守護進程強制被結束
    • 守護進程結束看主進程是否結束,不看其餘子進程是否結束

進程類的其餘方法及其屬性

方法

  • p.is_alive() 檢測子進程是否存在
  • p.terminate() 結束一個子進程,並非當即結束,須要操做系統響應
  • p.start() 開始一個進程

屬性

  • p.name 查看進程的名字
  • p.pid 查看進程號
  • p.daemon = Ture 將進程轉換成守護進程

進程鎖 - lock模塊 -涉及數據安全問題

  • 給一段代碼加鎖以後只能被一個進程同時調用對象

    import lock
      lock = lock() 
    
      lock.acquire()  #鑰匙
      '''被加鎖的代碼塊'''
      lock.release()  #上交鑰匙

進階系列

進程的進階

信號量 -multiprocessing.Semaphore()

  • 限制進程訪問的代碼塊
  • 有多把鑰匙的房間繼承

    from multiprocessing import Semaphore
      from multiprocessing import Process
      from time import sleep
    
      def func(i, sem):
          sem.acquire()
          print('{}進去唱歌了'.format(i))
          sleep(1)
          print('{}唱歌結束出來了'.format(i))
          sem.release()
    
      if __name__ == '__main__':
          sem = Semaphore(4)
          for i in range(10):
              p = Process(target=func, args=(i, sem))
              p.start()
      #結果
      0進去唱歌了
      1進去唱歌了
      2進去唱歌了
      3進去唱歌了
      0唱歌結束出來了
      4進去唱歌了
      1唱歌結束出來了
      5進去唱歌了
      2唱歌結束出來了
      6進去唱歌了
      3唱歌結束出來了
      7進去唱歌了
      4唱歌結束出來了
      8進去唱歌了
      5唱歌結束出來了
      9進去唱歌了
      6唱歌結束出來了
      7唱歌結束出來了
      8唱歌結束出來了
      9唱歌結束出來了

事件 -multiprocessing.Event()

  • 一個信號可使得全部的進程進入阻塞狀態
  • 也能夠控制全部的進程接觸阻塞
  • 一個事件被建立出來默認是阻塞狀態

    #事件建立默認是阻塞的
      from multiprocessing import Event
    
      event = Event() #建立一個事件叫event
      print(event.is_set())   #檢查事件的阻塞,False爲阻塞,True爲非阻塞狀態
      event.wait()    #阻塞狀態會等待,非阻塞狀態會運行下面代碼
      print('這句話不能被打印')
      #結果
      False
      ————————————————————————————————————————————————
      #下面是經過設置改變阻塞狀態
    • .is_set()用來查看事件的阻塞狀態
  • .set().is_set()設置成True,變成非阻塞狀態

    #關閉事件的阻塞
      from multiprocessing import Event
    
      event = Event()
      event.set()
      event.wait()
      print(event.is_set())
      print('如今阻塞被關閉')
      #結果
      True
      如今阻塞被關閉
  • enent.clear()再次打開阻塞

    #再次打開阻塞
      from multiprocessing import Event
    
      event = Event()
      event.set()
      event.wait()
      print('正常打印消息')
    
      event.clear()   #開啓阻塞
      event.wait()
      print('這條消息將不被輸出')
      #結果
      正常打印消息

    紅綠燈案例

    from multiprocessing import Event, Process
      import time, random
    
      def light(e,):
          while 1:
              if e.is_set():
                  e.clear()  # 開啓阻塞
                  print('\033[31mO\033[0m')  # 紅燈
              else:
                  e.set()
                  print('\033[32mO\033[0m')  # 綠燈
    
              time.sleep(2)
    
    
      def car(e, i):
          if not e.is_set():
              print('{}正在等待'.format(i))
              e.wait()
          print('{}經過路口'.format(i))
    
      if __name__ == '__main__':
    
          e = Event()
          tra = Process(target=light, args=(e,))
          tra.start()
          i = 0
          while 1:
              p = Process(target=car, args=(e, i))
              p.start()
              time.sleep(random.random())
              i += 1
      #結果
      綠燈
      0經過路口
      1經過路口
      2經過路口
      3經過路口
      4經過路口
      紅燈
      5正在等待
      6正在等待
      7正在等待
      8正在等待
      綠燈
      5經過路口
      7經過路口
      8經過路口
      6經過路口
      9經過路口
      10經過路口
      11經過路口
      12經過路口
      紅燈
      13正在等待
      14正在等待
      15正在等待
      16正在等待

進程之間的通訊 -隊列和管道

進程間的通訊 -IPC

隊列

  • 先進先出
  • 滿了或者隊列空了都會發生阻塞
    • from multiprocessing import Queue 導入隊列
    • q = Queue() 建立隊列對象
    • .put() 放數據,滿了就阻塞
    • .get() 取數據,取空就阻塞
    • .full() 判斷隊列是否滿了
    • .enpty() 判斷隊列是否爲空
    • .put_nowait() 向隊列放元素,隊列滿了就報錯
    • .get_nowait() 取元素,取空就報錯

      from multiprocessing import Queue
      
        q = Queue(5)    #隊列容許放置5個元素,可選參數
        q.put(1)
        q.put(2)
        q.put(3)
        q.put(4)
        q.put(5)
        print(q.pull()) #判斷隊列是否滿了
        print(q.empty())    #判斷隊列是否空了
        for i in range(5):
            print(q.get())

利用隊列完成兩個進程的數據交換 -IPC

from multiprocessing import Queue, Process

def put_thing(q):
    for i in range(5):
        q.put(i)

def get_thing(q): 
    print(q.get())

if __name__ == '__main__':

    q = Queue()
    p_put = Process(target=put_thing, args=(q,))
    p_put.start()

    for i in range(5):
        p_get = Process(target=get_thing, args=(q,))
        p_get.start()

隊列模型 -生產者消費者模型

模型一 Queue模塊

  • 須要向消費者告知結束關鍵字

    from multiprocessing import Process, Queue
      import time
      import random
    
      def producer(q, name, food):
          for i in range(3):
              time.sleep(random.random())
              msg = '{}生產了第{}泡{}'.format(name, i, food)
              print(msg)
              q.put(i)
    
      def customer(q, name, food):
          while 1:
              i = q.get()
              print(i)
              if i == 'nihao':
                  print('getNoneR')
                  break
    
              time.sleep(random.random())
              print('{}消費了第{}泡{}'.format(name, i, food))
    
      if __name__ == '__main__':
          q = Queue()
          pname = 'Alex'
          cname = '你'
          food = '狗屎'
          produ_proce = Process(target=producer, args=(q, pname, food))
          cust_proce = Process(target=customer, args=(q, cname, food))
    
          produ_proce.start()
          cust_proce.start()
          produ_proce.join()
          q.put('nihao')  #生產者消費結束向隊列添加一個結束關鍵字

模型二 -改進版JoinableQueue()

  • 改進版結合守護進程使用
  • JoinableQueue()Queue()的區別
    • JoinableQueue()加入了兩個方法
      • .task_done() 消費者每消費一次就要運行一次,用來減去模塊中的計數
      • .join() 等待隊列中元素被取完,不然阻塞
    • from multiprocessing import Process, JoinableQueue
       import time
       import random
      
       def producer(q, name, food):
           for i in range(10):
               time.sleep(random.random())
               msg = '\033[31m{}生產了第{}泡{}\033[0m'.format(name, i, food)
               print(msg)
               q.put(i)
           q.join()    #判斷隊列是否被取完,取完後才執行,不然阻塞
      
       def customer(q, name, food):
           while 1:
               i = q.get()
               time.sleep(random.random())
               print('{}消費了第{}泡{}'.format(name, i, food))
               q.task_done()   #每取一個元素,讓隊列的計數器減1
      
       if __name__ == '__main__':
           q = JoinableQueue()
           pname = 'Alex'
           cname = '你'
           food = '狗屎'
           produ_proce = Process(target=producer, args=(q, pname, food))
           cust_proce = Process(target=customer, args=(q, cname, food))
           produ_proce.daemon = True   #將子進程設置成守護進程
           cust_proce.daemon = True    #將子進程設置成守護進程
      
           produ_proce.start()
           cust_proce.start()
      
           produ_proce.join()
相關文章
相關標籤/搜索