Python網絡編程(進程池、進程間的通訊)

 

線程池的原理:
       線程池是預先建立線程的一種技術。線程池在尚未任務到來以前,
       建立必定數量的線程,放入空閒隊列中。這些線程都是處於睡眠狀態,
       即均爲啓動,不消耗CPU,而只是佔用較小的內存空間。當請求到來以後,
       緩衝池給此次請求分配一個空閒線程,把請求傳入此線程中運行,進行處理。
       當預先建立的線程都處於運行狀態,即預製線程不夠,線程池能夠自由建立必定數量的新線程,
       用於處理更多的請求。當系統比較閒的時候,也能夠經過移除一部分一直處於停用狀態的線程。
 
進程間的通訊原理:
         OS提供了溝通的媒介供進程之間「對話」用。既然要溝通,如同人類社會的溝通同樣,
         溝通要付出時間和金錢,計算機中也同樣,必然有溝通須要付出的成本。
         出於所解決問題的特性,OS提供了多種溝通的方式,每種方式的溝通成本也不盡相同,
         使用成本和溝通效率也有所不一樣。咱們常常聽到的 管道、消息隊列、共享內存都是OS提供的供進程之間對話的方式。
 
 
Process(target, name, args, kwargs)
     name
       給 進程
        默認Process-1,Process-2..... 
        p.name 查看進程名
     args:
       以 元組的形式 給target函數傳參
     kwargs:
       以 字典的形式 給對應鍵的值傳參
 
進程對象的其餘 經常使用屬性方法
    p.name  p.start()   p.join()
    p.pid:
         獲取建立進程的 pid
     p.is_alive():
        判斷進程是處於alive狀態
     p.daemon:
        默認爲Flase 若是 設置爲True  主進程結束殺死全部子進程
daemon屬性 必定要在start()前設置
設置daemon 爲True 通常 不須要加join()
daemon不是真正意義上的守護進程
         守護進程
      不受終端控制
      後臺自動運行
      生命週期長
 
 
多進程copy一個文件拆分爲兩個進行保存
 

import os 
from multiprocessing import Process 
from time import sleep

#獲取文件的大小
size = os.path.getsize("./timg.jpeg")  # 獲取文件的字節數
# f = open("timg.jpeg",'rb')
#複製前半部分
def copy1(img):
    f = open(img,'rb')  # 二進制讀取要複製的文件
    n = size // 2
    fw = open('1.jpeg','wb')  # 二進制建立文件

    while True:
        if n < 1024:  # 判斷文件大小是否大於1024字節 若是小於則直接讀取寫入
            data = f.read(n)
            fw.write(data)
            break
        data = f.read(1024)  # 不然每次循環讀取1024字節並寫入
        fw.write(data)
        n -= 1024
    f.close()
    fw.close()

#複製後半部分
def copy2(img):
    f = open(img,'rb')  # 讀取文件必需要每次讀取 若是在父進程中打開文件流對像 
                        # 子進程會通同時調用一個文件流對像 因爲文件流對象特性會記錄遊標
                        # 如若先執行後半部複製這前半部會致使讀取不到數據
    fw = open('2.jpeg','wb')
    f.seek(size // 2,0)
    while True:
        data = f.read(1024)
        if not data:
            break 
        fw.write(data)
    fw.close()
    f.close()

p1 = Process(target = copy1,args = ('timg.jpeg',))  # 建立子進程並讓子進程分別同時複製
p2 = Process(target = copy2,args = ('timg.jpeg',))
p1.start()
p2.start()
p1.join()
p2.join()

 



os.path.getsize('./1.txt'):
    讀取文件大小
注:
    1.若是 多個子進程拷貝同一個 父進程的對象則多個 子進程
       使用的是同一個對象(如文件隊形,套接字,隊列,管道。。。)
    2.若是在 建立子進程後單首創建的對象,則多個 子進程各不相同
 
 
建立子自定義進程類
    1.編寫類 繼承Process
    2.在自定義類中 加載父類__init__以獲取父類屬性
      同時能夠 自定義新的屬性
    3 .重寫run方法 在調用start時自動執行該方法
示例:

from multiprocessing import Process 
import time 

class ClockProcess(Process):
    def __init__(self,value):
        #調用父類init
        super().__init__()
        self.value = value 
    #重寫run方法
    def run(self):
        for i in range(5):
            time.sleep(self.value)
            print("The time is {}".format(time.ctime()))

p = ClockProcess(2)
#自動執行run
p.start()

p.join()

 



 
進程的缺點:
    進程在 建立和銷燬的過程當中 消耗資源相對 較多
 
進程池技術:
     產生緣由:
        若是有大量的任務須要多進程完成,而調用週期比較短且須要頻繁建立
此時可能產生大量進程頻繁建立銷燬的狀況  消耗計算機資源較大
     使用方法:
        1. 建立進程池在池內放入適當數量的進程
2. 將事件封裝成函數放入進程池
3.事件不斷運行, 直到全部放入進程池事件運行完成
4. 關閉進程池回收進程
 
from multiprocessing import pool
     pool(Process)
       功能: 建立進程池對象
       參數:進程數量
       返回值:進程池對象
    pool = pool()
     pool.apply_async(fun, args, kwds)(異步執行)
       功能: 將事件放入進程池內
       參數:
           fun:要執行的 函數
   args:以 元組形式爲fun 傳參
   kwds:以 字典形式爲fun 傳參
       返回值:
          返回一個事件對象,經過p. get()函數能夠獲取fun的返回值
     pool.close():
        功能:
    關閉進程池,沒法再加入新的事件,並等待已有事件結束執行
     pool.join()
        功能: 回收進程池
     pool.apply(fun, args, kwds)(同步執行)
       功能:將事件放入進程池內
       參數:
          fun:要執行的函數
  args:以元組形式爲fun傳參
  kwds:以字典形式爲fun傳參
       沒有返回值
示例:

from multiprocessing import Pool 
from time import sleep,ctime 

def worker(msg):
    sleep(2)
    print(msg)
    return ctime()

#建立進程池對象
pool = Pool(processes = 4)

result = []
for i in range(10):
    msg = "hello %d"%i 
    #將事件放入進程池
    r = pool.apply_async(func = worker,args = (msg,))
    result.append(r)
    
    #同步執行
    # pool.apply(func = worker,args = (msg,))

#關閉進程池
pool.close()
#回收
pool.join()

#獲取事件函數返回值
for i in result:
    print(i.get())

 

 
     pool.map(func, iter)
        功能:
    要執行的 事件放入進程池
參數:
    func  要執行的 函數
    iter   可迭代對象
示例:

from multiprocessing import Pool
import time 

def fun(n):
    time.sleep(1)
    print("執行 pool map事件",n)
    return n ** 2 

pool = Pool(4)

#在進程池放入6個事件
r = pool.map(fun,range(6))  # map高階函數 fun和iter執行6次
print("返回值列表:",r)

pool.close()
pool.join()

 



 
進程間的通訊(IPC
     因爲進程 空間獨立資源沒法共享, 
    此時在 進程間通信須要專門的通信方法
     通訊方法:
       管道、消息隊列、共享內存
       信號、信號量、套接字
 
       管道通訊:
           在內存中 開闢一塊內存空間造成管道結構
    多個進程使用同一個管道,便可經過 對管道讀寫操做進行通信
    multiprocessing --> Pipe
   fd1,fd2 = Pipe(duplex=True)
       功能: 建立管道
       參數:
           默認表示 雙向管道
   若是設置爲 False則爲 單向管道
       返回值:
           倆個管道對象的,分別表示管道的兩端
   若是是 雙向管道則都可讀寫
   若是是 單向管道fd1只讀fd2只寫
    fd.recv()
        功能:從管道 讀取信息
返回值:讀取到的內容
   當 管道爲空則阻塞
    fd.send(data)
        功能: 向管道寫入內容
參數:要寫入的內容
   當 管道滿時會阻塞
    寫入幾乎全部 Python全部數據類型
 
隊列通訊:
   在 內存開闢隊列結構空間,多個進程可見,
    多個進程操做同一個隊列對象能夠 實現消息存取工做
   在取出時 必須按照存入 順序取出先進先出
    q = Queue(maxsize=0)
     功能:
 建立隊列對象
     參數:
          maxsize 默認表示根據系統分配空間 儲存消息
 若是 傳入一個正整數表示最多 存放多少條消息
     返回值:隊列對象
    q.put(data,[block,timeout])
      功能:向隊列 存入消息
      參數:
          data:存入消息( 支持Python數據類型
  block:默認 True表示當隊 滿時阻塞
         設置爲 False 則爲 非阻塞
  timeout:當 block爲True是表示 超時檢測
    data = q.get([block,timeout])
       功能:取出消息
       參數:
            block:設置爲 True 當隊列爲 空時阻塞
           設置爲 False表示 非阻塞
    timeout:
         當 block爲True是表示 超時檢測
 
    q.full()  判斷隊列是否爲滿
    q.empty()  判斷隊列 是否爲空
    q.qsize()   獲取隊列中 消息的數量
    q.close()   關閉隊列
 
        共享內存通訊:
    在 內存中開闢一段空間存儲數據對多個進程可見,
     每次寫入共享內存中的內容 都會覆蓋以前內容
    對內存的 讀操做不會改變內存中的內容
     form multiprocessing import Value,Array
     shm = Value(ctype,obj)
        功能: 共享內存共享 空間
參數:
     ctype字符串  要轉換的c語言的數據類型
    obj:共享內存的 初始數據
返回值:返回共享內存對象
     shm.value
         表示共享內存 的值
 
示例:

from  multiprocessing import Process,Value 
import time 
import random  

#建立共享內存
money = Value('i',6000)

#存錢
def deposite():
    for i in range(100):
        time.sleep(0.05)
        #對value的修改就是對共享內存的修改
        money.value += random.randint(1,200)
#花銷
def withdraw():
    for i in range(100):
        time.sleep(0.04)
        #對value的修改就是對共享內存的修改
        money.value -= random.randint(1,200)

d = Process(target = deposite)
w = Process(target = withdraw)

d.start()
w.start()

d.join()
w.join()
print(money.value)

 


 
 
     shm = Array(ctype,obj)
        功能:
     開闢共享內存 空間
參數:
     ctype:要轉換的數據類型
    obj:
        要存入共享內容的的數據( 結構化數據
列表、字符串  表示要存入得內容
要求 數據結構內類型相同
整數  表示開闢幾個單元的空間
返回值:
     返回共享內存對象   可迭代對象
 
示例:

from multiprocessing import Process,Array
import time 

#建立共享內存
shm = Array('c',b"hello") #字符類型要求是bytes

#開闢5個整形單元的共享內存空間
# shm = Array('i',5)

def fun():
    for i in shm:
        print(i)
    shm[0] = b"H"

p = Process(target = fun)
p.start()
p.join()

print(shm.value) #從首地址打印字符串
# for i in shm:
#     print(i)

 


 
 
 
三種進程間通訊區別:
                    管道通訊:                           消息隊列:                          共享內存:
開闢空間:     內存                                      內存                                    內存
讀寫方式: 兩端讀寫                               先進先出                        每次覆蓋上次內容
                  單向/雙向 
效率:            通常                                     通常                                     較快
應用:   多用於父子進程                        應用靈活普遍                   複雜,須要同步互斥
 
 
 
 
相關文章
相關標籤/搜索