線程池的原理:
線程池是預先建立線程的一種技術。線程池在尚未任務到來以前,
建立必定數量的線程,放入空閒隊列中。這些線程都是處於睡眠狀態,
即均爲啓動,不消耗CPU,而只是佔用較小的內存空間。當請求到來以後,
緩衝池給此次請求分配一個空閒線程,把請求傳入此線程中運行,進行處理。
當預先建立的線程都處於運行狀態,即預製線程不夠,線程池能夠自由建立必定數量的新線程,
用於處理更多的請求。當系統比較閒的時候,也能夠經過移除一部分一直處於停用狀態的線程。
進程間的通訊原理:
OS提供了溝通的媒介供進程之間「對話」用。既然要溝通,如同人類社會的溝通同樣,
溝通要付出時間和金錢,計算機中也同樣,必然有溝通須要付出的成本。
出於所解決問題的特性,OS提供了多種溝通的方式,每種方式的溝通成本也不盡相同,
使用成本和溝通效率也有所不一樣。咱們常常聽到的 管道、消息隊列、共享內存都是OS提供的供進程之間對話的方式。
Process(target, name, args, kwargs)
name:
給
進程取
名字
默認爲
Process-1,Process-2.....
p.name 查看進程名
args:
以
元組的形式
給target函數傳參
kwargs:
以
字典的形式
給對應鍵的值傳參
進程對象的其餘
經常使用屬性方法:
p.name p.start() p.join()
p.pid:
獲取建立進程的
pid號
p.is_alive():
判斷進程是處於alive狀態
p.daemon:
默認爲Flase 若是
設置爲True
主進程結束時
殺死全部子進程
daemon屬性
必定要在start()前設置
設置daemon
爲True 通常
不須要加join()
daemon不是真正意義上的守護進程
守護進程:
不受終端控制
後臺自動運行
生命週期長
多進程copy一個文件拆分爲兩個進行保存
import os
from multiprocessing import Process
from time import sleep
#獲取文件的大小
size = os.path.getsize("./timg.jpeg") # 獲取文件的字節數
# f = open("timg.jpeg",'rb')
#複製前半部分
def copy1(img):
f = open(img,'rb') # 二進制讀取要複製的文件
n = size // 2
fw = open('1.jpeg','wb') # 二進制建立文件
while True:
if n < 1024: # 判斷文件大小是否大於1024字節 若是小於則直接讀取寫入
data = f.read(n)
fw.write(data)
break
data = f.read(1024) # 不然每次循環讀取1024字節並寫入
fw.write(data)
n -= 1024
f.close()
fw.close()
#複製後半部分
def copy2(img):
f = open(img,'rb') # 讀取文件必需要每次讀取 若是在父進程中打開文件流對像
# 子進程會通同時調用一個文件流對像 因爲文件流對象特性會記錄遊標
# 如若先執行後半部複製這前半部會致使讀取不到數據
fw = open('2.jpeg','wb')
f.seek(size // 2,0)
while True:
data = f.read(1024)
if not data:
break
fw.write(data)
fw.close()
f.close()
p1 = Process(target = copy1,args = ('timg.jpeg',)) # 建立子進程並讓子進程分別同時複製
p2 = Process(target = copy2,args = ('timg.jpeg',))
p1.start()
p2.start()
p1.join()
p2.join()
os.path.getsize('./1.txt'):
讀取文件大小
注:
1.若是
多個子進程拷貝同一個
父進程的對象則多個
子進程
使用的是同一個對象(如文件隊形,套接字,隊列,管道。。。)
2.若是在
建立子進程後單首創建的對象,則多個
子進程各不相同
建立子自定義進程類
1.編寫類
繼承Process
2.在自定義類中
加載父類__init__以獲取父類屬性,
同時能夠
自定義新的屬性
3
.重寫run方法 在調用start時自動執行該方法
示例:
from multiprocessing import Process
import time
class ClockProcess(Process):
def __init__(self,value):
#調用父類init
super().__init__()
self.value = value
#重寫run方法
def run(self):
for i in range(5):
time.sleep(self.value)
print("The time is {}".format(time.ctime()))
p = ClockProcess(2)
#自動執行run
p.start()
p.join()
進程的缺點:
進程在
建立和銷燬的過程當中
消耗的
資源相對
較多
進程池技術:
產生緣由:
若是有大量的任務須要多進程完成,而調用週期比較短且須要頻繁建立
此時可能產生大量進程頻繁建立銷燬的狀況 消耗計算機資源較大
使用方法:
1.
建立進程池,
在池內放入適當數量的進程
2.
將事件封裝成函數。
放入到
進程池
3.事件不斷運行,
直到全部放入進程池事件運行完成
4.
關閉進程池,
回收進程
from multiprocessing import pool
pool(Process)
功能:
建立進程池對象
參數:進程數量
返回值:進程池對象
pool = pool()
pool.apply_async(fun, args, kwds)(異步執行)
功能:
將事件放入進程池內
參數:
fun:要執行的
函數
args:以
元組形式爲fun
傳參
kwds:以
字典形式爲fun
傳參
返回值:
返回一個事件對象,經過p.
get()函數能夠獲取fun的返回值
pool.close():
功能:
關閉進程池,沒法再加入新的事件,並等待已有事件結束執行
pool.join()
功能:
回收進程池
pool.apply(fun, args, kwds)(同步執行)
功能:將事件放入進程池內
參數:
fun:要執行的函數
args:以元組形式爲fun傳參
kwds:以字典形式爲fun傳參
沒有返回值
示例:
from multiprocessing import Pool
from time import sleep,ctime
def worker(msg):
sleep(2)
print(msg)
return ctime()
#建立進程池對象
pool = Pool(processes = 4)
result = []
for i in range(10):
msg = "hello %d"%i
#將事件放入進程池
r = pool.apply_async(func = worker,args = (msg,))
result.append(r)
#同步執行
# pool.apply(func = worker,args = (msg,))
#關閉進程池
pool.close()
#回收
pool.join()
#獲取事件函數返回值
for i in result:
print(i.get())
pool.map(func, iter)
功能:
將要執行的
事件放入進程池
參數:
func 要執行的
函數
iter
可迭代對象
示例:
from multiprocessing import Pool
import time
def fun(n):
time.sleep(1)
print("執行 pool map事件",n)
return n ** 2
pool = Pool(4)
#在進程池放入6個事件
r = pool.map(fun,range(6)) # map高階函數 fun和iter執行6次
print("返回值列表:",r)
pool.close()
pool.join()
進程間的通訊(IPC)
因爲進程
空間獨立,
資源沒法共享,
此時在
進程間通信就
須要專門的通信方法
通訊方法:
管道、消息隊列、共享內存
信號、信號量、套接字
管道通訊:
在內存中
開闢一塊內存空間,
造成管道結構
多個進程使用同一個管道,便可經過
對管道的
讀寫操做進行通信
multiprocessing --> Pipe
fd1,fd2 = Pipe(duplex=True)
功能:
建立管道
參數:
默認表示
雙向管道
若是設置爲
False則爲
單向管道
返回值:
倆個管道對象的,分別表示管道的兩端
若是是
雙向管道則都可讀寫
若是是
單向管道則
fd1只讀,
fd2只寫
fd.recv()
功能:從管道
讀取信息
返回值:讀取到的內容
當
管道爲空則阻塞
fd.send(data)
功能:
向管道寫入內容
參數:要寫入的內容
當
管道滿時會阻塞
可以
寫入幾乎全部
Python全部數據類型
隊列通訊:
在
內存中
開闢隊列結構空間,多個進程可見,
多個進程操做同一個隊列對象能夠
實現消息存取工做
在取出時
必須按照存入
順序取出(
先進先出)
q = Queue(maxsize=0)
功能:
建立隊列對象
參數:
maxsize 默認表示根據系統分配空間
儲存消息
若是
傳入一個正整數則
表示最多
存放多少條消息
返回值:隊列對象
q.put(data,[block,timeout])
功能:向隊列
存入消息
參數:
data:存入消息(
支持Python數據類型)
block:默認
True表示當隊
滿時阻塞
設置爲
False 則爲
非阻塞
timeout:當
block爲True是表示
超時檢測
data = q.get([block,timeout])
功能:取出消息
參數:
block:設置爲
True 當隊列爲
空時阻塞
設置爲
False表示
非阻塞
timeout:
當
block爲True是表示
超時檢測
q.full()
判斷隊列是否爲滿
q.empty() 判斷隊列
是否爲空
q.qsize()
獲取隊列中
消息的數量
q.close()
關閉隊列
共享內存通訊:
在
內存中開闢一段空間存儲數據對多個進程可見,
每次寫入共享內存中的內容
都會覆蓋以前內容
對內存的
讀操做不會改變內存中的內容
form multiprocessing import Value,Array
shm = Value(ctype,obj)
功能:
共享內存共享
空間
參數:
ctype:字符串 要轉換的c語言的數據類型
obj:共享內存的
初始數據
返回值:返回共享內存對象
shm.value:
表示共享內存
的值
示例:
from multiprocessing import Process,Value
import time
import random
#建立共享內存
money = Value('i',6000)
#存錢
def deposite():
for i in range(100):
time.sleep(0.05)
#對value的修改就是對共享內存的修改
money.value += random.randint(1,200)
#花銷
def withdraw():
for i in range(100):
time.sleep(0.04)
#對value的修改就是對共享內存的修改
money.value -= random.randint(1,200)
d = Process(target = deposite)
w = Process(target = withdraw)
d.start()
w.start()
d.join()
w.join()
print(money.value)
shm = Array(ctype,obj)
功能:
開闢共享內存
空間
參數:
ctype:要轉換的數據類型
obj:
要存入共享內容的的數據(
結構化數據)
列表、字符串
表示要存入得內容
要求
數據結構內的
類型相同
整數
表示要
開闢幾個單元的空間
返回值:
返回共享內存對象
可迭代對象
示例:
from multiprocessing import Process,Array
import time
#建立共享內存
shm = Array('c',b"hello") #字符類型要求是bytes
#開闢5個整形單元的共享內存空間
# shm = Array('i',5)
def fun():
for i in shm:
print(i)
shm[0] = b"H"
p = Process(target = fun)
p.start()
p.join()
print(shm.value) #從首地址打印字符串
# for i in shm:
# print(i)
三種進程間通訊區別:
管道通訊: 消息隊列: 共享內存:
開闢空間: 內存 內存 內存
讀寫方式: 兩端讀寫 先進先出 每次覆蓋上次內容
單向/雙向
效率: 通常 通常 較快
應用: 多用於父子進程 應用靈活普遍 複雜,須要同步互斥