「Python 面試」第三次更新

圖片描述

閱讀本文大約須要 10 分鐘。

14.說一下進程、線程、以及多任務(多進程、多線程和協程)

  • 進程html

    • 概念
      一個程序對應一個進程,這個進程被叫作主進程,而一個主進程下面還有許多子進程。
    • 實現方式python

      • fork()
        示例:程序員

        import os
                 
                 
        print('current_pid :%d' % os.getpid())
             
        res = os.fork()
             
        # 子進程返回的是 0
        if res == 0:
        print('res: %d' % res)
        print('sub_pid: %d' % os.getpid())
             
        # 主進程返回的是子進程的 pid
        else:
            print('main_pid: %d' % os.getpid())
            print('res:%d' % res)
             
        # 結果爲
        current_pid :12775
        main_pid: 12775
        res:12776
        res: 0
        sub_pid: 12776multiprocessing.Process
      • multiprocessing.Process
        示例:算法

        from multiprocessing import Process
        import os, time
             
             
        print('man_process pid : %d' % os.getpid())
             
        class NewProcess(Process):
            def __init__(self):
                Process.__init__(self)
             
            def run(self):
                time.sleep(3)
                print('%d process was runing' % os.getpid())
             
        np = NewProcess()
        np.start()
             
        # 結果爲
        man_process pid : 7846
        7847 process was runing
      • multiprocessing.Pool安全

        • 同步(apply)多線程

          示例:併發

          from multiprocessing import Pool
          import time, os, random
               
               
          print('main_process pid: %d' % os.getpid())
               
          def run():
              time.sleep(random.random())  # random.random() 隨機生成一個小於 1 的浮點數
              print('%d process was runing' % os.getpid())
               
          p = Pool(3)
               
          for i in range(4):
              p.apply(run, args=())
               
          p.close()
          print('waiting for sub_process')
               
          while True:
              # 獲取 Pool 中剩餘的進程數量
              count = len(p._cache)
              if count != 0:
                  print('there was %d sub_process' % count)
                  time.sleep(random.random())
              else:
                  break
                       
          print('sub_process has done')
               
          # 結果爲
          main_process pid: 4295
          4297 process was runing
          4296 process was runing
          4298 process was runing
          4297 process was runing
          wating for sub_process
          sub_process has done
        • 異步(apply_async)
          示例:app

          from multiprocessing import Pool
          import time, os, random
                    
                    
          print('main_process pid: %d' % os.getpid())
                    
          def run():
              # random.random() 隨機生成一個小於 1 的浮點數
              time.sleep(random.random())  
              print('%d process was runing' % os.getpid())
             
          p = Pool(3)
                    
          for i in range(4):
              p.apply_async(run, args=())
                    
              p.close()
                    
          while True:
              # 獲取 Pool 中剩餘的進程數量
              count = len(p._cache)
              if count != 0:
                  print('there was %d sub_process' % count)
                  time.sleep(random.random())
              else:
                  break
                            
          print('wiating for sub_process..')
          p.join()
                    
          print('sub_process has done')
                    
          # 結果爲
          main_process pid: 4342
          wiating for sub_process..
          there was 4 sub_process
          4344 process was runing
          there was 3 sub_process
          4345 process was runing
          4344 process was runing
          4343 process was runing
          sub_process has done
    • 優缺點dom

      • fork()是計算機最底層的進程實現方式,一個fork()方法建立出來的進程有兩個:主進程、子進程。fork()建立出來的進程,主進程不會等待子進程。
      • multiprocessing模塊經過將fork方法封裝成一個Process類,該類有一個start()方法,當調用該方法時,會自動調用run()方法,開啓一個進程。而且由Process建立出來的進程,可使用join()方法,使得主進程堵塞,被迫等待子進程。
      • multiprocess下另外一種開啓進程的方式是經過Pool進程池來實現。進程池能夠開啓多個進程來執行多個任務,可是進程數最大不會超過系統 CPU 核數。一樣的,由Pool建立出來的進程,主進程也不會等待子進程,經過join()方法能夠迫使主進程等待子進程,或者使用apply()同步的方式。
    • 進程通訊
      進程之間的通訊能夠經過隊列(Queue)來進行,多個進程一部分向隊列裏寫入數據,一部分從隊列裏讀取數據,從而完成多進程之間的通訊問題。
      示例:異步

      from multiprocessing import Process, Queue
      import random, time, os
        
        
      def write(q):
          if not q.full():
              for i in range(4):
                 q.put(i)
                 print('%d was writing data[%d] to queue' % (os.getpid(), i))
                    time.sleep(random.random())
          else:
              print('queue is full')
        
      def read(q):
          # 等待隊列被寫入數據
          time.sleep(random.random())
          while True:
              if not q.empty():
                  data = q.get()
                  print('%d was reading data{%d} from queue' % (os.getpid(), data))
              else:
                  print('queue is empty')
                  break
            
      # 建立通訊隊列,進程之間,全局變量不共享
      q = Queue()
      pw = Process(target=write, args=(q,))
      pr = Process(target=read, args=(q,))
            
      pw.start()
      pr.start()
            
      pw.join()
      pr.join()
      print('end')
            
      # 結果爲
      4640 was writing data[0] to queue
      4640 was writing data[1] to queue
      4640 was writing data[2] to queue
      4641 was reading data{0} from queue
      4641 was reading data{1} from queue
      4641 was reading data{2} from queue
      queue is empty
      4640 was writing data[3] to queue
      end

      因爲進程的執行順序問題,形成了 pr 先於 pw 執行,因此 pr 未讀取到數據,pr 進程任務結束,堵塞解開,主進程繼續向下運行,最後 pw 任務結束。

    • 進程通訊改良
      示例:

      from multiprocessing import Process, Queue
      import random, time, os
          
          
      def write(q):
          if not q.full():
              for i in range(4):
                  q.put(i)
                  print('%d was writing data[%d] to queue' % (os.getpid(), i))
                        time.sleep(random.random())
          else:
              print('queue is full')
          
          def read(q):
              # 等待隊列被寫入數據
              time.sleep(random.random())
              while True:
                  data = q.get()
                  print('%d was reading data{%d} from queue' % (os.getpid(), data))
          
      # 建立通訊隊列,進程之間,沒有全局變量共享之說
      q = Queue()
      pw = Process(target=write, args=(q,))
      pr = Process(target=read, args=(q,))
          
      pw.start()
      pr.start()
          
      pw.join()
      # pr 進程馬上結束
      pr.terminate()
      print('end')
          
      # 結果爲
      12898 was writing data[0] to queue
      12898 was writing data[1] to queue
      12898 was writing data[2] to queue
      12899 was reading data{0} from queue
      12899 was reading data{1} from queue
      12899 was reading data{2} from queue
      12898 was writing data[3] to queue
      12899 was reading data{3} from queue
      end
  • 線程

    • 概念
      線程是進程下的一部分,進程下負責執行代碼程序的就是線程,一個進程下會有不少個線程。一樣的,一個主線程下面也有不少子線程。

      另外,Python 中的線程依據的是 Java 中的線程模型,若是有興趣的同窗能夠研究一下。

    • 實現方式

      示例:

      import threading, time
        
        
      def run():
          time.sleep(1)
          # currentThread() 返回的是當前的線程對象信息
          print('%s was runing' % threading.currentThread())
          print('current thread\'name: %s' % threading.currentThread().getName())
        
      # 建立一個線程
      t = threading.Thread(target=run, args=())
        
      # 啓動線程
      t.start()
        
      # get_ident 返回的是當前線程對象所在的內存地址(id),該地址是惟一能夠驗證線程的數據
      # 也可以使用 currentThread().getName() 來簡單的區分線程
      print('current thread\'name: %s' % threading.currentThread().getName())
      print('main_thread tid: %s' % threading.get_ident())
        
      # 結果爲
      current thread'name: MainThread
      main_thread tid: 140427132020480
      <Thread(Thread-1, started 140427100555008)> was runing
      current thread'name: Thread-1
    • 線程通訊

      • 通訊隊列
        通訊隊列做爲相對來講最爲安全的線程通訊手段,其中Queue模塊自身擁有全部所需的鎖,這使得通訊隊列中的對象能夠安全的在多線程之間共享。

        這裏用常見的「生產者-消費者模型」來介紹。

        示例:

        import threading, queue, time, random
            
        flag = object()
            
        def producter(q):
            for i in range(4):
                q.put(i)
            print('%s put data{%d} in queue' % (threading.currentThread().getName(), i))
            time.sleep(random.random())
            q.put(flag)
            
        def consumer(q):
            time.sleep(random.random())
            while True:
                res = q.get()
                if res == flag:
                    q.put(flag)
                    break
                else:
                    print('%s get data{%d} from queue' % (threading.currentThread().getName(), res))
            
        # 建立隊列
        q = queue.Queue()
            
        # 建立線程
        pro = threading.Thread(target=producter, args=(q,))
        con = threading.Thread(target=consumer, args=(q,))
            
        pro.start()
        con.start()
            
        # 結果爲
        Thread-1 put data{0} in queue
        Thread-1 put data{1} in queue
        Thread-2 get data{0} from queue
        Thread-2 get data{1} from queue
        Thread-1 put data{2} in queue
        Thread-2 get data{2} from queue
        Thread-1 put data{3} in queue
        Thread-2 get data{3} from queue
        end

        這裏有一個細節。在多線程下,當生產者任務完成以後,向隊列queue裏添加了一個特殊對象(終止信號)flag,這樣當消費者從queue中取出任務時,當取到flag時,意味着全部任務被取出,並再次將flag添加至queue中,這樣其餘線程中的消費者在接收到這個終止信號後,也會得知當前生產者任務已經所有發佈。

      • 輪詢
        經過爲數據操做添加while循環判斷,迫使線程被迫等待操做。(爲了優化等待時間,應在最核心的位置添加判斷條件)

        示例:

        import threading
                
                
        class NewThread(threading.Thread):
            flag = 0
            g_num = 0
                
            def __init__(self):
                 super().__init__()
                
            def run(self):
                print('%s was runing' % threading.currentThread().getName())
                if self.name == 'Thread-1':
                    self.add_num()
                    NewThread.flag = 1
                else:
                    # 輪詢
                    # Thread-2 被迫等待 Thread-1 完成任務以後才能執行
                    while True:
                        if NewThread.flag:
                            self.add_num()
                            break
                
            @classmethod
            def add_num(cls):
                global g_num
                for i in range(1000000):
                    cls.g_num += 1
                print('on the %s, g_num: %d' % (threading.currentThread().getName(), cls.g_num))
                
        t1 = NewThread()
        t2 = NewThread()
                
        t1.start()
        t2.start()
                
        # 結果爲
        Thread-1 was runing
        Thread-2 was runing
        on the Thread-1, g_num: 1000000
        on the Thread-2, g_num: 2000000
      • 互斥鎖
        互斥鎖是專門爲了針對線程安全而設計的一種結構,鎖能夠強制線程排序,保護線程安全,可是加鎖、解鎖會消耗系統 CPU 資源。
      • 互斥鎖優化

        示例:

        import threading
              
              
        class NewThread(threading.Thread):
            g_num = 0
            # 生成鎖對象
            lock = threading.Lock()
              
            def __init__(self):
                 super().__init__()
              
                 def run(self):
                       # 判斷當前線程是否上鎖,若未上鎖,則一直嘗試上鎖(acquire)直至成功
                     with NewThread.lock:
                         print('%s was runing' % self.name)
                         self.add_num()
              
                 @classmethod
                 def add_num(cls):
                     for i in range(1000000):
                         cls.g_num += 1
                     print('on the %s g_num: %d' % (threading.currentThread().getName(), cls.g_num))
              
        t1 = NewThread()
        t2 = NewThread()
              
        t1.start()
        t2.start()
              
        # 結果爲
        Thread-1 was runing
        on the Thread-1 g_num: 1000000
        Thread-2 was runing
        on the Thread-2 g_num: 2000000
      • 死鎖問題
        當多線程下出現多個鎖,判斷條件又是另外一個線程裏的鎖時,就會出現一種狀況:當另外一個線程任務執行時間過長,或是線程結束,未解鎖。當前線程因爲遲遲沒法上鎖,程序始終阻塞,此時就會陷入死鎖問題。
      • 死鎖問題解決

        • 設置超時時間threading.Lock().acquire(timeout=3)只要在上鎖時設置超時時間timeout=,只要超過期間,線程就會再也不等待是否解鎖,而是直接運行。可是這種方式很危險,可能會帶來大量的等待時間。
        • 爲每一個鎖添加一個特殊編號,多線程在獲取鎖的時候嚴格按照該編號的升序方式來獲取,至關於爲線程排序,這樣就避免了多線程由於資源爭搶,而陷入死鎖的可能。
        • 銀行家算法
  • 進程與線程的區別

    • 線程和進程的執行順序都是同樣的,都是由操做系統的調度算法決定,不是根據程序的編寫順序來決定。
    • 進程是資源分配的單位,而線程是 CPU 調度的單位。
    • 進程在主程序結束後,程序立馬結束,須要手動利用join()方法使得主程序發生堵塞,來等待子進程。而主線程的任務結束後,程序會等待子線程結束纔會結束。故不須要特地使用join()方法來使主線程等待子線程。
    • 多進程適合 CPU 密集型,多線程適合 I/O 密集型。
  • 協程

    • 概念
      線程下的一種,也叫微線程,單線程自身控制切換任務時機,達到多任務的效果。避免了因爲系統在處理多進程或者多線程時,切換任務時須要的等待時間。這一點很像操做系統裏的中斷。
    • 實現方式

      • 生成器(yield)
        生成器相關內容可看問題 13。

        這裏以一個簡單的「生產者-消費者模型」來解釋如何使用生成器實現協程。

        示例:

        import threading
             
             
        def producter(c):
            next(c)
            n = 4
            print('%s was running' % threading.currentThread().getName())
            
            while n:
                print('product data: %d' % n)
                res = c.send(n)
                print(res)
                n -= 1
            print('sale out')
             
             
        def consumer():
            res = ''
             
            print('%s was running' % threading.currentThread().getName())
            while True:
                n = yield res
             
                print('consume data: %d' % n)
                res = '200 OK'
             
        print('%s was running' % threading.currentThread().getName())
        c = consumer()
             
        producter(c)
             
        # 結果爲
        MainThread was running
        MainThread was running
        MainThread was running
        product data: 4
        consume data: 4
        200 OK
        product data: 3
        consume data: 3
        200 OK
        product data: 2
        consume data: 2
        200 OK
        product data: 1
        consume data: 1
        200 OK
        sale out

        能夠看到,生產者事先不知道消費者具體要消費多少數據,生產者只是一直在生產。而消費者則是利用生成器的中斷特性,consumer函數中,程序每一次循環遇到yield關鍵字就會停下,等待producter函數啓動生成器,再繼續下一次循環。

        在這中間只有一個線程在運行,任務的切換時機由程序員本身控制,避免了因爲多線程之間的切換消耗,這樣就簡單實現了協程。

      • 異步 I/O(asyncio)
        因爲生成器在將來的 Python 3.10 版本中將不在支持協程,而是推薦使用asyncio庫,該庫適用於高併發。

        本身目前不會,就不瞎 BB 了,具體可看文檔。

        asyncio 中文文檔

未寫完,下次更新補上

相關文章
相關標籤/搜索