網絡編程——進程池

管道程序員

(1)安全

from multiprocessing import Process, Pipe
def func(con2):
    msg = con2.recv()
    print('>>>>>>',msg)

if __name__ == '__main__':
    con1,con2 = Pipe()
    p = Process(target=func,args=(con2,))
    p.start()
    con1.send(' 小鬼 !!!')

(2)多線程

def func (con1, con2):
    try :
        msg = con2.recv() # 若是主進程沒有發送消息,recv()就會阻塞住
        con2.send(' 哈哈哈 ')
        print('>>>>>>', msg)
        # 若是管道一端關閉了 , 那麼另一端在接收消息的時候會報錯
        msg2 = con2.recv()
    except EOFError:
        print(' 對方管道已關閉 ')
        con2.close()
if __name__ == '__main__':
    con1, con2 = Pipe()
    p = Process(target=func, args=(con1, con2,))
    p.start()
    con1.send(' 小鬼 !!!')
    con1.close()
    # try:
        # mag = con1.recv() # OSError: handle is closed
        # print('>>>>>>>>>',mag)
    # except OSError:
        # print(' 該管道已經關閉 ')
# 父進程中 con1 發消息 , 子進程必須是由 con2 接收
# 父進程中 con2 發消息 , 子進程必須是由 con1 接收
# 父進程中 con1 收消息 , 子進程必須是由 con2 發送
# 父進程中 con2 發消息 , 子進程必須是由 con1 發送

 

數據共享併發

(1)共享數據app

from multiprocessing import Process, Manager
def func (num):
    num[' 姓名 '] = ' 呂三兒 ' # 資源共享 , 在子進程中修改數據 , 父進程中獲得的也是修改事後的
if __name__ == '__main__':
    m = Manager()
    num = m.dict({' 姓名 ': ' 劉二 '}) # 能夠是列表 , 字典 , 同步鎖 , 遞歸鎖 , 命名空間 ,     事件 , 信號量 , 隊列等等
    p = Process(target=func, args=(num,))
    p.start()
    p.join()
    print(' 父進程中的 num 的值 ', num)
    

 

(2)數據共享是不安全的,須要加鎖 異步

from multiprocessing import Process, Manager, Lock
def func (dic, l):
    # l.acquire()
    # dic['count'] -= 1
    # l.release()
    with l: # 自動加鎖 , 等同於上面的代碼
    dic['count'] -= 1
if __name__ == '__main__':
    l = Lock()
    m = Manager()
    lst = []
    dic = m.dict({'count': 100})
    for i in range(20):
        p = Process(target=func, args=(dic, l))
        p.start()
        lst.append(p)
    for e in lst:
        e.join()
    print(' 主進程 ', dic)

 

三.進程池(重點)async

(1)map(func,可迭代對象) 函數

from multiprocessing import Pool
import time
def func (n):
    time.sleep(1)
    print(n)
if __name__ == '__main__':
    p = Pool(4)
    p.map(func, range(100))

 

(2)進程池和多進程的效率對比 學習

from multiprocessing import Pool, Process
import time


def func (n):
    for i in range(5):
    n = n + i


if __name__ == '__main__':
    s1 = time.time()
    p = Pool(4)
    p.map(func, range(100))
    print(' 進程池 ', time.time() - s1)
    s2 = time.time()
    lst = []
        
    for i in range(100):
        p = Process(target=func, args=(i,))
        p.start()
        lst.append(p)
    for e in lst:
        e.join()
    print(' 多進程 ', time.time() - s2
    

(3)同步執行ui

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    return i * i

if __name__ == '__main__':
    p = Pool(4)
    for i in range(100):
        res = p.apply(func, args=(i,)) # 同步執行 , 第一個是要執行的函數 , 第二個是可迭代對象型的參數 , 給任務函數傳的參數
        print(res) # 會等待你的任務返回結果

(4)異步執行

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    return i * i

if __name__ == '__main__':
    p = Pool(4)
    lst = []
    for i in range(100):
        res = p.apply_async(func, args=(i,)) 
# 異步執行 , 第一個是要執行的函數 , 第二個是可迭代對象型的參數 , 給任務函數傳的參數 lst.append(res)
# res 拿到的是對象 , 須要經過 get 方法拿到對象的值 , 而直接用 get 方法的話 ,get 只能一個一個拿去數據,因此會出現阻塞的狀態for e in lst: # 將對象添加到列表裏面 , 經過遍歷列表 , 能夠進行拿值操做 print(e.get())

(5)異步中的具體注意事項(closs和join)

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    print(i * i)

if __name__ == '__main__':
    p = Pool(4)
    for i in range(100):
        res = p.apply_async(func, args=(i,)) 
# 異步執行 , 第一個是要執行的函數 , 第二個是可迭代對象型的參數 , 給任務函數傳的參數 p.close() # 不是關閉進程池 , 而是不容許再有其餘任務進來 p.join() # 感知進程池中任務的方法 , 等待進程池中的全部進程執行完畢 print(' 主進程結束 ')

(6)異步中的回調函數的使用 

from multiprocessing import Pool
import time, os

def func1 (i):
    print('func1 的 pid', os.getpid())
    time.sleep(0.5)
    return i * i
def func2 (n):
    print('func2 的 pid', os.getpid()) # 回調函數是由主進程調用的 , 不是子進程
    print('func2', n)
if __name__ == '__main__':
    print(' 主進程的 pid', os.getpid())
    p = Pool(4)
    for i in range(100):
        res = p.apply_async(func1, args=(i,), # 異步執行 , 第一個是要執行的函數 , 第二個是可迭代對象型的參數 , 給任務函
數傳的參數 ,
        callback=func2) # 第三個是回調函數 , 負責接收子進程的運行結果 , 能夠爲其再進行操做
    p.close() # 不是關閉進程池 , 而是不容許再有其餘任務進來
    p.join() # 感知進程池中任務的方法 , 等待進程池中的全部進程執行完畢
    print(' 主進程結束 ')

(7)異步中的close,join和get方法的運用 

from multiprocessing import Pool
import time

def func (i):
    time.sleep(0.5)
    print(i)
    return i * i

if __name__ == '__main__':
    p = Pool(4)
    lst = []
    for i in range(10):
        res = p.apply_async(func, args=(i,)) # 異步執行 , 第一個是要執行的函數 , 第二個是可迭代對象型的參數 , 給任務函數
傳的參數
        lst.append(res) # res 拿到的是對象 , 須要經過 get 方法拿到對象的值 , 而直接用 get 方法的話 ,get 只能一個一個拿去數
據 , 因此會出現阻塞的狀態
        p.close() # 不是關閉進程池 , 而是不容許再有其餘任務進來
        p.join() # 感知進程池中任務的方法 , 等待進程池中的全部進程執行完畢
        for e in lst: # 將對象添加到列表裏面 , 經過遍歷列表 , 能夠進行拿值操做 , 當前面加了 close 和
join 的時候 , 進程池裏的所
有任務都已經執行完畢 , 因此結果就直接所有打印出來
            print(e.get())

四,初始線程

 

1.什麼是線程?

  指的是一條流水線的工做過程,關鍵的一句話:一個進程內最少自帶一個線程,其實進程根本不能執
行,進程不是執行單位,是資源的單位,分配資源的單位
  線程纔是執行單位
  進程:作手機屏幕的工做過程,剛纔講的
  咱們的py文件在執行的時候,若是你站在資源單位的角度來看,咱們稱爲一個主進程,若是站在代碼
執行的角度來看,它叫作主線程,只是一種形象的說法,其實整個代碼的執行過程成爲線程,也就是幹
這個活兒的自己稱爲線程,可是咱們後面學習的時候,咱們就稱爲線程去執行某個任務,其實那某個任
務的執行過程稱爲一個線程,一條流水線的執行過程爲線程

 

2.進程vs線程

  (1) 同一個進程內的多個線程是共享該進程的資源的,不一樣進程內的線程資源確定是隔離的
  (2) 建立線程的開銷比建立進程的開銷要小的多

3.併發三個任務:

  啓動三個進程:由於每一個進程中有一個線程,可是我一個進程中開啓三個線程就夠了
  同一個程序中的三個任務須要執行,你是用三個進程好,仍是三個線程好?
  例子:
    pycharm 三個任務:鍵盤輸入屏幕輸出自動保存到硬盤
    若是三個任務是同步的話,你鍵盤輸入的時候,屏幕看不到
    我們的pycharm是否是一邊輸入你邊看啊,就是將串行變爲了三個併發的任務
    解決方案:三個進程或者三個線程,哪一個方案可行。若是是三個進程,進程的資源是否是隔離的並
且開銷大,最致命的就是資源隔離,可是用戶輸入的數據還要給另一個進程發送過去,進程之間能直
接給數據嗎?你是否是copy一份給他或者通訊啊,可是數據是同一份,咱們有必要搞多個進程嗎,線程
是否是共享資源的,咱們是否是能夠使用多線程來搞,你線程1輸入的數據,線程2能不能看到,你之後
的場景仍是應用多線程多,並且起線程咱們說是否是很快啊,佔用資源也小,還能共享同一個進程的資
源,不須要將數據來回的copy!

 

4.建立線程的兩種方式

(1)基本建立,導入模塊

from threading import Thread
import time

def func (n):
    time.sleep(1)
    print(n)

if __name__ == '__main__':
    t = Thread(target=func, args=(1,))
    t.start()
    t.join()
    print('主線程')

(2)繼承Thread類

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run (self):
        time.sleep(1)
        print('%s子線程' % self.name)

if __name__ == '__main__':
    t = MyThread('劉二')
    t.start()
    t.join()
    print('主線程結束')

 

進程池

 

  一個池子,裏邊有固定數量的進程,這些進程一直處於待命狀態,一旦有任務來,立刻就有進程去處理。

  由於在實際業務中,任務量是有多有少的,若是任務量特別的多,不可能要開對應那麼多的進程數

  開啓那麼多進程首先須要消耗大量的時間讓操做系統來爲你管理它,其次還須要小號大量時間讓cpu幫你調度它。

  進程池還會幫程序員去管理池中的進程。

  from multiprocessing impor Pool

  p = Pool(os.cpu_count()+1)   ps:通常進程池開本身內核數(os.cpu_count()+1)個

 

 

  進程池有三個方法:

    map(func.iterable)

    func:進程池中的進程執行的任務函數

    iterable: 可迭代對象,是把可迭代對象中的每一個元素一次傳給任務函數當參數

 

    apply(func.args = ()): 同步的效率,也就是說池中的進程一個一個的去執行任務

    func:進程池中的進程執行的任務函數

    args:可迭代對象型的參數,是傳給任務函數的參數

    同步處理任務時,不須要close和join

    同步處理任務時,進程池中的全部進程是普通進程(主進程須要等待其執行結果)

 

    apply_async(func,args= (), callback = None) :異步的效率,也就是說池中的進程一次性都去執行任務

    func: 進程池中的進程執行的任務函數

    args: 可迭代對象型的參數,是傳給任務函數的參數

    callback: 回調函數,就是說每當進程池中有進程處理完任務了,返回的結果能夠交回調函數,由回調函數進行進一步處理,回調函數只有異步纔有,同步是沒有的

    異步處理任務時,進程池中的全部進程是守護進程(主今晨代碼執行完畢守護進程就結束)

    異步處理任務是,必須加上close和join

 

  回調函數的使用:

    進程的任務函數的返回值,被當成回調函數的形參接受,以此進行進一步的處理操做

    回調函數是由主進程調用的,而不是子進程,子進程只負責把結果傳遞給回調函數

相關文章
相關標籤/搜索