032_進程2

守護進程html

  1)正常的子進程沒有執行完的時候主進程要一直等着python

  2)守護進程的進程的做用:
           守護進程會隨着主進程的代碼執行結束(此時主進程不必定結束)而結束。
  3)守護進程是否結束的判斷標準是 :
           主進程的代碼是否執行完。不是主進程是否結束。
  4)主進程的代碼執行完後會要等着全部子進程結束以後結束。
  5)守護進程不會等待其它子進程結束,和子進程無關。但若是想讓守護進程等待某個子程序結束後再結束,能夠在主程序後使用p.join()設置阻塞,即主程序會等待該子進程,而等待這個代碼是主進程的代碼,即主進程代碼沒有執行完,因此守護進程不會結束。
  6)守護進程 要在start以前設置
  7)守護進程中 不能再開啓子進程編程

1,開一個守護進程json

  1)父進程作本身的事,開一個報時器子進程,每過一秒通報一次。可是父進程的代碼執行完後子進程並不會結束。
  2)設置守護進程後,父進程的代碼執行完後,子進程也會結束。數組

import time
from multiprocessing import Process

def cal_time():        # 定義一個報時器
    while True:
        time.sleep(1)
        print('過去了1秒')

if __name__ == '__main__':
    p = Process(target=cal_time)
# 必定在開啓進程以前設置 # 將接下來開啓的一個進程設置成守護進程
#    p.daemon = True     
    p.start()
    for i in range(100):    # 10s
        time.sleep(0.1)
        print('*'*i)

2,使守護進程等待一個子進程結束以後在結束安全

import time
from multiprocessing import Process
def func():
    print('--'*10)
    time.sleep(15)
    print('--'*10)

def cal_time():
    while True:
        time.sleep(1)
        print('過去了1秒')

if __name__ == '__main__':
    p = Process(target=cal_time)
    p.daemon = True     # 必定在開啓進程以前設置
    p.start()
    p2 = Process(target=func)  # 15s
    p2.start()
    for i in range(100):    # 10s
        time.sleep(0.1)
        print('*'*i)
    p2.join()       # 使守護進程等待一個子進程結束以後在結束
# 此處阻塞,子進程p2不結束主程序代碼就不會執行完

process 中的部分方法網絡

3,process 中的方法  p.is_alive()  p.terminate()併發

  # p.is_alive()   # 是否活着 True表明進程還在 False表明進程不在了
  # p.terminate()  # 結束一個進程,可是這個進程不會馬上被殺;強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖app

import time
from multiprocessing import Process
def func():
    print('wahaha')
    time.sleep(5)
    print('qqxing')
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    print(p.is_alive())     # True
    time.sleep(0.1)
    p.terminate()           # 給操做系統發出結束進程的請求,# 是個異步操做,同步的會等在這。
    print(p.is_alive())     # True   #不會當即結束進程,會有一點延遲。
    time.sleep(1)
    print(p.is_alive())     # False  # 進程結束。
# 結果:
# True
# wahaha
# True
# False

process 中的部分屬性dom

 4,process 中的屬性

  # pid   查看這個進程 進程id
  # name  查看這個進程的名字

def func():
    print('wahaha')
    time.sleep(5)
    print('qqxing')
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    print(p.name,p.pid) 
    p.name = '哇哈哈哈'    # 改進程的名字
    print(p.name)

5,如何能在子進程裏查看該子進程的name,pid。

class MyProcess(Process):
    def run(self):
        print('wahaha',self.name,self.pid)
        time.sleep(5)
        print('qqxing',self.name,self.pid)
if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print(p.pid)

 

1)爲何須要鎖?
        由於實現了異步進程,各進程互不干擾,當多個進程同時訪問同一個文件時會出現問題。例如,文件中記錄有10張票,(多我的)多個進程同時訪問,拿到的記錄是同樣的,每一個進程拿到的都是10,每一個進程都執行的是10-1,而後有都寫入文件,文件記錄爲9張票。事實上,咱們知道,不止賣了一張票。爲防止此情況的出現,使用了鎖的方式。


2)什麼是鎖?
        一個文件設置一個鎖,當一個進程訪問文件時,其它進程就不能進入,當這個進程進行完該文件的操做。其它進程才能訪問。
       至關於,一個上鎖的房間,門外掛着一把鑰匙,要訪問的進程都去強這把鑰匙,搶到的進程拿着鑰匙進入房間並鎖上房間,處理完出來後在把鑰匙還掛在門外,其它進程在搶。


3)鎖 就是在併發編程中 保證數據安全

7,建立鎖

from multiprocessing import Lock
lock = Lock()   # 建立鎖對象
lock.acquire()  # 鎖   拿鑰匙
lock.acquire()  # 鎖   阻塞   當第一個拿鑰匙的釋放了鑰匙才能拿鑰匙

lock.release()  # 釋放鎖  還鑰匙

 7.1,多進程 實現 併發

import json 
import time
import random
from multiprocessing import Lock
from multiprocessing import Process

def search(i):
    with open('ticket') as f:
        print(i,json.load(f)['count'])

def get(i):
    with open('ticket') as f:
        ticket_num = json.load(f)['count']
    time.sleep(random.random())
    if ticket_num > 0:
        with open('ticket','w') as f:
            json.dump({'count':ticket_num-1},f)
        print('%s買到票了'%i)
    else:
        print('%s沒票了'%i)

def task(i,lock):
    search(i)   # 查看票
    lock.acquire()
    get(i)      # 搶票
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(20):  # 20我的同時搶票
        p = Process(target=task,args=(i,lock))
        p.start()

信號量

  就是一個上鎖的文件,外面掛了設定數量的鑰匙,即房間最多存在設定數量的人數。

from multiprocessing import Semaphore
sem = Semaphore(4)      # 設置爲四個
sem.acquire()  # 須要鑰匙
print(0)
sem.acquire()  # 須要鑰匙
print(1)
sem.acquire()  # 須要鑰匙
print(2)
sem.acquire()  # 須要鑰匙
print(3)
sem.release()
sem.acquire()  # 須要鑰匙
print(4)

8, 實例

import time
import random
from multiprocessing import Semaphore
from multiprocessing import Process
def sing(i,sem):
    sem.acquire()
    print('%s : 進入 ktv'%i)
    time.sleep(random.randint(1,10))
    print('%s : 出 ktv'%i)
    sem.release()
# 迷你唱吧  20我的,同一時間只能有4我的進去唱歌
if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(20):
        Process(target=sing,args=(i,sem)).start()

 

事件

 

9,

# recv    accept    input    sleep  同步阻塞
# lock鎖   是異步阻塞
# 事件——異步阻塞
# 事件是一個通知,標誌,能夠同時使全部進程 都陷入阻塞

from multiprocessing import Event   #事件
e = Event()  # 實例化一個事件  標誌       /交通訊號燈
e.wait()      # 剛實例化出來的一個事件對象,阻塞信號      /是紅燈
                  # 執行到wait,要先看燈,綠燈行紅燈停,若是在停的過程當中燈綠了,就變成非阻塞了
e.set()        # 將標誌變成非阻塞        /交通燈變綠
e.clear()     # 將標誌又變成阻塞      /交通燈變紅

e.is_set()    # 判斷是否阻塞     True就是綠燈 False就是紅燈

10,實現一個紅綠燈處隨機經過的車輛,每一個數表明一個車輛(進程)

import time
import random
for i in range(100):       # 一百輛車
    if i%random.randint(3,6) == 0 :      #隨機生成一個數,用車號除以生成數。
        time.sleep(random.randint(1,3))  #知足條件的車,等待隨機幾秒在經過
        print('發生等待:',end=' ')                              # 標識
    print('%s號車經過'%i) 

11,實例:紅綠燈

import time
import random
from multiprocessing import Process
from multiprocessing import Event
def traffic_light(e):
    while True:
        if e.is_set():
            time.sleep(3)
            print('紅燈亮')
            e.clear()      # 綠變紅
        else:
            time.sleep(3)
            print('綠燈亮')
            e.set()        # 紅變綠

def car(i,e):
    e.wait()  
    print('%s車經過'%i)

if __name__ == '__main__':
    e = Event()   # 立一個紅燈
    tra = Process(target=traffic_light,args=(e,))
    tra.start()   # 啓動一個進程來控制紅綠燈
    for i in range(100):
        if i%6 == 0 :
            time.sleep(random.randint(1,3))
        car_pro = Process(target=car, args=(i,e))
        car_pro.start()

 

進程間通訊——隊列

12,隊列

  1.進程之間通訊 可使用multiprocessing 的 Queue模塊
  2.隊列有兩種建立方式 第一種不傳參數 這個隊列就沒有長度限制 ;傳參數,建立一個有最大長度限制的隊列
  3.提供兩個重要方法;put()    get()
  4.qsize  可能出錯

from multiprocessing import Queue
q = Queue(3)    # 能夠設置長度
q.put(1)
q.put(2)
q.put(3)
q.put(4)

print(q.get())隊列
print(q.get())
print(q.get())
print(q.get())  # 若是隊列裏已經沒有值了 就會阻塞等待有一個值

13,進程間通訊

  一個進程往隊列裏放數據,一個進程能拿到隊列裏的數據

from multiprocessing import Process
from multiprocessing import Queue

def q_put(q):
    q.put('hello')

def q_get(q):
    print(q.get())

if __name__ =='__main__':
    q = Queue()
    p = Process(target=q_put,args=(q,))
    p.start()
    p1 = Process(target=q_get, args=(q,))
    p1.start()

14, 生產者消費者模型

  我要生產一個數據 而後 給一個函數 讓這個函數依賴這個數據進行運算  拿到結果  —— 同步過程

# 但生產者生成數據很快,而消費者處理數據很慢,可使用多個相同的進程處理生產的數據。

# 作包子 和 吃包子

import time
def producer(q):  # 生產者
    for i in  range(100):    # 生產一百個包子
        q.put('包子%s'%i)

def consumer(q): #  消費者
    for i in range(100):
        time.sleep(1)
        print(q.get())

if __name__ == '__main__':
    q = Queue(10)   # 托盤  # 限制隊列長度十,當隊列滿了,只有用一個才能在放一個新的。
		# 防止佔用內存
    p = Process(target=producer,args=(q,))
    p.start()
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.start()      #用兩個消費者進程,處理數據。
    c2.start()

15,解決 數據的供需不平衡

  # 同步 生產數據 使用數據
  # 異步 主進程 生產數據 子進程使用數據   —— 問題:500個子進程去處理數據 生產過慢
  # 異步 子進程 生產數據 子進程處理數據   —— 問題:不知道生產慢仍是處理慢

  15.1,

# 3個子進程 生產包子
# 2個子進程 吃包子
# 生產的快 吃的慢  包子溢出
# 生產的慢 吃得快  包子不夠吃
# 若是增長生產者 —— 讓生產變快
# 減小生產者 —— 找一個容器放 約束容器的容量

import time
import random
from multiprocessing import Queue
from multiprocessing import Process
def producer(q,food):
    for i in range(5):
        q.put('%s-%s'%(food,i))
        print('生產了%s'%food)
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
    q.put(None)   # 有幾個消費者進程,就要放入幾個
    q.put(None)
    q.put(None)

def consumer(q,name):
    while True:
        food = q.get()   # 生產者不生產仍是生產的慢
        if food == None:break
        print('%s 吃了 %s'%(name,food))

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,'包子'))
    p1.start()
    p2 = Process(target=producer, args=(q, '骨頭'))
    p2.start()
    c1 = Process(target=consumer, args=(q, 'alex'))
    c1.start()
    c2 = Process(target=consumer, args=(q, 'egon'))
    c2.start()
    c3 = Process(target=consumer, args=(q, 'jin'))
    c3.start()

# 隊列不會亂,由於一次出一個,先進先出。
# 如今 經過queue
# 生產者消費者模型
    #1.消費者要處理多少數據是不肯定的
    #2.因此只能用while循環來處理數據 ,可是while循環沒法結束
    #3.須要生產者發送信號
    #4.有多少個消費者 就須要發送多少個信號
    #5.可是發送的信號數量須要根據 生產者和消費者的數量進行計算,因此很是不方便

16,JoinableQueue   解決上面問題

  # 使用上面的方式建立的隊列對象,有join和task_done兩個方法。
  # q.task_done() 代表完成隊列中一個數據的處理。放到消費者代碼後,當消費者處理完這個數據後執行這個代碼。
  #  q.join() 放到生產者代碼後,與上一個方法搭配使用,檢查到隊列中的全部數據都處理完了(taskdone)不在阻塞。生產者代碼結束。
  # 在主進程後加上每一個生產者進程的阻塞,消費者沒處理完數據,生產者就不會結束,主進程中設置的生產者阻塞就不經過,即主進程代碼不會執行完。
  # 將每一個消費者進程設置成 守護進程,這樣當主進程代碼執行完,即生產者已經生產了全部數據並處理完畢。守護進程結束,即消費者進程結束。

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue

# def producer(q,food):
#     for i in range(5):
#         q.put('%s-%s'%(food,i))
#         print('生產了%s'%food)
#         # time.sleep(random.randint(1,3))
#         time.sleep(random.random())
#     q.join()  # 等待 消費者 把全部的數據都處理完
#
# def consumer(q,name):
#     while True:
#         food = q.get()   
#         print('%s 吃了 %s'%(name,food))
#         q.task_done()   # 這個數據處理完了
#
# if __name__ == '__main__':
#     q = JoinableQueue()
#     p1 = Process(target=producer,args=(q,'泔水'))
#     p1.start()
#     p2 = Process(target=producer, args=(q, '骨頭'))
#     p2.start()
#     c1 = Process(target=consumer, args=(q, 'alex'))
#     c1.daemon = True    # 將消費者設置成守護進程
#     c1.start()
#     c2 = Process(target=consumer, args=(q, 'egon'))
#     c2.daemon = True
#     c2.start()
#     c3 = Process(target=consumer, args=(q, 'jin'))
#     c3.daemon = True
#     c3.start()
#
#     p1.join()  # 等待p1執行完畢
#     p2.join()  # 等待p2執行完畢
#
# # 生產者生產的數據所有被消費 —— 生產者進程結束 —— 主進程代碼執行結束 —— 消費者守護進程結束

 

管道 pipe

17,pipe與兩個重要方法 send()  recv()

from multiprocessing import Process
from multiprocessing import Pipe
p1,p2 = Pipe()  #支持雙向通訊
p1.send('hello')
print(p2.recv())
p2.send('hi')
# p2.close()     # p2端關閉
print(p1.recv())
print(p1.recv())  # 若是p2端沒有關閉,會發生阻塞 
        # p2端關閉,管道內沒有數據可接收時,報錯 EOFError

18,進程中用

from multiprocessing import Process
from multiprocessing import Pipe
def func(son)
    print(son.recv())  # 接收到
    print(son,recv())  #接收不到報錯

if __name__ == '__main__':
    foo,son = Pipe()
    p = Process(target=func,args=(son,))
    p.start()
    foo.send('hello')
    foo.close()

19,下面的狀況關閉後沒有報錯而是阻塞

from multiprocessing import Process
from multiprocessing import Pipe
def func(p)
    foo,son = p   # 此處形成了一個管道兩端各有兩個口
#     foo.close()     # 關閉子進程的foo,recv沒有接收到會報錯。
	            # 可是子進程foo的關閉主進程沒關閉,還會阻塞。
    print(son.recv())  # 接收到
    print(son,recv())  # 阻塞

if __name__ == '__main__':
    foo,son = Pipe()
    p = Process(target=func,args=((foo,son),)) 
    p.start()
    son.close()
    foo.send('hello')
    foo.close()    # 關閉的是主進程的foo ,子進程的foo沒有關閉。

  # 因此主進程要及時關閉son ,子進程要關閉不用的foo
  # 管道不安全,若是向上面同樣,一個管道有多個發送端口有多個接收端口,當管道中有一個數據時,若是,兩個接收端同時去拿這各數據,會形成這兩個接收端都拿到了這個數據。

20,用管道也能實現生產者消費者模型

  管道+鎖

def func(p,l): foo, son = p foo.close() while True: try : l.acquire() print(son.recv())  # EOFError
 l.release() except EOFError: l.release() son.close() break

def func2(p): foo, son = p son.close() for i in range(10): foo.send(i) foo.close() if __name__ == '__main__': foo,son = Pipe() l = Lock() p = Process(target=func,args=((foo,son),l)) p1 = Process(target=func,args=((foo,son),l)) p2 = Process(target=func,args=((foo,son),l)) p.start() p1.start() p2.start() p3 = Process(target=func2, args=((foo, son),)).start() p4 = Process(target=func2, args=((foo, son),)).start() p5 = Process(target=func2, args=((foo, son),)).start() p6 = Process(target=func2, args=((foo, son),)).start() p7 = Process(target=func2, args=((foo, son),)).start() son.close() foo.close()
管道實現消費者模型

21,管道或隊列的先擇

  管道:多個機器多個進程間通訊

  

  隊列:用在同一臺機器的多個進程之間通訊

 

Manager模塊

22,Manager 模塊

# Pipe 管道 :  雙向通訊   數據不安全
# Queue 管道+鎖:  雙向通訊 數據安全
# JoinableQueue :數據安全
   # put 和 get的一個計數機制 ,每次get數據以後發送task_done,put端接收到計數-1,直到計數爲0就能感知到
# Manager是一個類 就提供了能夠進行數據共享的一個機制 提供了不少數據類型 dict list

if __name__ == '__main__':
    m = Manager()
    d = m.dict()          # 建立的字典進程間能共享的字典
    print(d)

  22.1,例

# Manager : dict list pipe ,並不提供數據安全的支持
# def func(dic):
#     print(dic)
#     # while True:
#     #     print(dic)
#     #     time.sleep(3)
# if __name__ == '__main__':
#     m = Manager()
#     d = m.dict({'count':0})
#     print(d)
#     # print(d)
#     # d['count'] = 0
#     # print(d)
#     # d = {}
#     p = Process(target=func,args=(d,))
#     p.start()
    # d['count'] = 0

  22.2,例

# from multiprocessing import Manager,Process,Lock
# def work(d,lock):
#     lock.acquire()
#     d['count'] -= 1
#     lock.release()
#
# if __name__ == '__main__':
#         lock= Lock()
#         m = Manager()
#         dic=m.dict({'count':100})   # 共享的數據
#         l = []
#         for i in range(100):
#             p=Process(target=work,args=(dic,lock))
#             p.start()
#             l.append(p)
#         [p.join() for p in l]
#         print(dic)

  

進程池

# 一個電腦四個核,無論開幾個進程一次最多同時處理四個進程。
# 有100 個任務,要處理,造一個池子, 放四個進程(或五個不限,但多了沒啥用)用來完成任務。
# 開啓進程關進程,也須要時間,開的進程 多了花費時間就多
# 開的進程再多,一次最多處理四個進程。而進度池,是開四個或五個進程,反覆使用完成任務。

概念介紹:

 

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

 

  參數:

numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組

  主要方法:

1) p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。

   '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
2) p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
   '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''   
3) p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
4) P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

  其它方法

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
1) obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
2) obj.ready():若是調用完成,返回True
3) obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
4) obj.wait([timeout]):等待結果變爲可用。
5) obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

 

23,建立進程池

  23.1,

import os
import time
from multiprocessing import Pool
def func(i):
    time.sleep(1)     # 結果是五個五個的出來,即同時作五個任務。
    print(i,os.getpid())  # 每五個進程的pid是同樣的。
# 注意其實輸出沒順序,各進程是異步的。

if __name__ == '__main__':
    p = Pool(5)          # 建立能容納5個進程的進程池對象  
    p.map(func,range(20))  # 向池中提交任務

  23.2,

import os
import time
from multiprocessing import Pool
def func(i):
    time.sleep(1)     # 結果是五個五個的出來,即同時作五個任務。
    print(i,os.getpid())  # 每五個進程的pid是同樣的。
# 注意其實輸出沒順序,各進程是異步的。

if __name__ == '__main__':
    p = Pool(5)          
    p.map(func,range(20))  
    p.close()   # 是不容許再向進程池中添加任務,不是關閉對象。
    p.join()    # 必須close以後在join
    print('====') # 加上上兩句,才保證任務處理完以後,在執行這一句。
	           # 由於是異步的,不加上兩句,這句可能會在,任務沒處理完就執行了。

  23.3,

import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i += 1

if __name__ == '__main__':
# 設置進度池,完成一百個任務(用時短,佔內存小)
# 只開可5個進程
    p = Pool(5)          # 建立了5個進程
    start = time.time()
    p.map(func,range(1000))   # target = func  args=next(iterable) 必須是可迭代的 
		              #要傳多個用元組 [(1,2,3),1,2,3,4]  
    p.close()  
    p.join()
    print(time.time() - start)
    start = time.time()

#  開一百個進程,完成一百個任務(用時長,佔內存多)
#  開了一百個進程
    l = []
    for i in range(1000):
        p = Process(target=func,args=(i,))  # 建立了一百個進程
        p.start()
        l.append(p)
    [i.join() for i in l]    # 等待全部進程結束
    print(time.time() - start)

24,進程池的  apply  方法

 

import time
from multiprocessing import Pool

def func(i):
    time.sleep(1)
    i += 1

if __name__ == '__main__':
    p = Pool(5)
    for i in range(20):
        # p.apply(func,args=(i,))     # apply是同步提交任務的機制,執行完一個任務才提交下一個任務。多進程等於沒開。
        p.apply_async(func,args=(i,))  # apply_async是異步提交任務的機制
    p.close()
    p.join()

 

回調函數

25,回調函數

  25.1,回調函數的使用

import os
from multiprocessing import Pool
def func(i):    # 多進程中的io多,
    print('子進程%s:%s'%(i,os.getpid()))   # 子進程pid
    return i*'*'

def call(arg):   # 回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值
    print('回調 :',os.getpid())       # 回調函數pid
    print(arg)

if __name__ == '__main__':
    print('---->',os.getpid())    # 主進程pid
    p = Pool(5)
    for i in range(10):
        p.apply_async(func,args=(i,),callback=call)   
	# callback設置回調函數,即執行完func後返回的值不會再返回到調用它的地方,而是放回給回調函數作參數。
    p.close()
    p.join()

  25.2,例子1

# 請求網頁
    # 網絡延時 IO操做
# 單進程
    # 10個頁面  同時訪問多個   —> 用多進程
    # 分析頁面:——>用回調函數

# from urllib.request import urlopen       # 這個模塊須要安裝
# import requests
# ret = requests.get('http://www.baidu.com')
# print(ret.text)   # 整個網頁的源代碼
# print(ret.status_code)  # 訪問網頁的狀態碼

  25.2,例子2

from urllib.request import urlopen
import requests
from multiprocessing import Pool

def get_url(url):
    ret = requests.get(url)
    return {'url':url,
            'status_code':ret.status_code,
            'content':ret.text}


def parser(dic):
    print(dic['url'],dic['status_code'],len(dic['content']))
    # 把分析結果寫到文件裏

if __name__ == '__main__':
    url_l = [
        'http://www.baidu.com',
        'http://www.sogou.com',
        'http://www.hao123.com',
        'http://www.yangxiaoer.cc',
        'http://www.python.org'
    ]
    p = Pool(4)
    for url in url_l:
        p.apply_async(get_url,args=(url,),callback=parser)
    p.close()
    p.join()

  25.3,爬蟲實例

 

import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')
    return pattern,response

def parse_page(info):
    pattern,page_content=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)
if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()
爬蟲實例

 

  25.4,若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中全部進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到全部結果
    print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理
無需回調函數

進程池的其餘實現方式:https://docs.python.org/dev/library/concurrent.futures.html

相關文章
相關標籤/搜索