守護進程html
1)正常的子進程沒有執行完的時候主進程要一直等着python
2)守護進程的進程的做用:
守護進程會隨着主進程的代碼執行結束(此時主進程不必定結束)而結束。
3)守護進程是否結束的判斷標準是 :
主進程的代碼是否執行完。不是主進程是否結束。
4)主進程的代碼執行完後會要等着全部子進程結束以後結束。
5)守護進程不會等待其它子進程結束,和子進程無關。但若是想讓守護進程等待某個子程序結束後再結束,能夠在主程序後使用p.join()設置阻塞,即主程序會等待該子進程,而等待這個代碼是主進程的代碼,即主進程代碼沒有執行完,因此守護進程不會結束。
6)守護進程 要在start以前設置
7)守護進程中 不能再開啓子進程編程
1,開一個守護進程json
1)父進程作本身的事,開一個報時器子進程,每過一秒通報一次。可是父進程的代碼執行完後子進程並不會結束。
2)設置守護進程後,父進程的代碼執行完後,子進程也會結束。數組
import time
from multiprocessing import Process
def cal_time(): # 定義一個報時器
while True:
time.sleep(1)
print('過去了1秒')
if __name__ == '__main__':
p = Process(target=cal_time)
# 必定在開啓進程以前設置 # 將接下來開啓的一個進程設置成守護進程
# p.daemon = True
p.start()
for i in range(100): # 10s
time.sleep(0.1)
print('*'*i)
2,使守護進程等待一個子進程結束以後在結束安全
import time from multiprocessing import Process def func(): print('--'*10) time.sleep(15) print('--'*10) def cal_time(): while True: time.sleep(1) print('過去了1秒') if __name__ == '__main__': p = Process(target=cal_time) p.daemon = True # 必定在開啓進程以前設置 p.start() p2 = Process(target=func) # 15s p2.start() for i in range(100): # 10s time.sleep(0.1) print('*'*i) p2.join() # 使守護進程等待一個子進程結束以後在結束
# 此處阻塞,子進程p2不結束主程序代碼就不會執行完
process 中的部分方法網絡
3,process 中的方法 p.is_alive() p.terminate()併發
# p.is_alive() # 是否活着 True表明進程還在 False表明進程不在了
# p.terminate() # 結束一個進程,可是這個進程不會馬上被殺;強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖app
import time
from multiprocessing import Process
def func():
print('wahaha')
time.sleep(5)
print('qqxing')
if __name__ == '__main__':
p = Process(target=func)
p.start()
print(p.is_alive()) # True
time.sleep(0.1)
p.terminate() # 給操做系統發出結束進程的請求,# 是個異步操做,同步的會等在這。
print(p.is_alive()) # True #不會當即結束進程,會有一點延遲。
time.sleep(1)
print(p.is_alive()) # False # 進程結束。
# 結果:
# True
# wahaha
# True
# False
process 中的部分屬性dom
4,process 中的屬性
# pid 查看這個進程 進程id
# name 查看這個進程的名字
def func():
print('wahaha')
time.sleep(5)
print('qqxing')
if __name__ == '__main__':
p = Process(target=func)
p.start()
print(p.name,p.pid)
p.name = '哇哈哈哈' # 改進程的名字
print(p.name)
5,如何能在子進程裏查看該子進程的name,pid。
class MyProcess(Process):
def run(self):
print('wahaha',self.name,self.pid)
time.sleep(5)
print('qqxing',self.name,self.pid)
if __name__ == '__main__':
p = MyProcess()
p.start()
print(p.pid)
鎖
1)爲何須要鎖?
由於實現了異步進程,各進程互不干擾,當多個進程同時訪問同一個文件時會出現問題。例如,文件中記錄有10張票,(多我的)多個進程同時訪問,拿到的記錄是同樣的,每一個進程拿到的都是10,每一個進程都執行的是10-1,而後有都寫入文件,文件記錄爲9張票。事實上,咱們知道,不止賣了一張票。爲防止此情況的出現,使用了鎖的方式。
2)什麼是鎖?
一個文件設置一個鎖,當一個進程訪問文件時,其它進程就不能進入,當這個進程進行完該文件的操做。其它進程才能訪問。
至關於,一個上鎖的房間,門外掛着一把鑰匙,要訪問的進程都去強這把鑰匙,搶到的進程拿着鑰匙進入房間並鎖上房間,處理完出來後在把鑰匙還掛在門外,其它進程在搶。
3)鎖 就是在併發編程中 保證數據安全
7,建立鎖
from multiprocessing import Lock
lock = Lock() # 建立鎖對象
lock.acquire() # 鎖 拿鑰匙
lock.acquire() # 鎖 阻塞 當第一個拿鑰匙的釋放了鑰匙才能拿鑰匙
lock.release() # 釋放鎖 還鑰匙
7.1,多進程 實現 併發
import json
import time
import random
from multiprocessing import Lock
from multiprocessing import Process
def search(i):
with open('ticket') as f:
print(i,json.load(f)['count'])
def get(i):
with open('ticket') as f:
ticket_num = json.load(f)['count']
time.sleep(random.random())
if ticket_num > 0:
with open('ticket','w') as f:
json.dump({'count':ticket_num-1},f)
print('%s買到票了'%i)
else:
print('%s沒票了'%i)
def task(i,lock):
search(i) # 查看票
lock.acquire()
get(i) # 搶票
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(20): # 20我的同時搶票
p = Process(target=task,args=(i,lock))
p.start()
信號量
就是一個上鎖的文件,外面掛了設定數量的鑰匙,即房間最多存在設定數量的人數。
from multiprocessing import Semaphore
sem = Semaphore(4) # 設置爲四個
sem.acquire() # 須要鑰匙
print(0)
sem.acquire() # 須要鑰匙
print(1)
sem.acquire() # 須要鑰匙
print(2)
sem.acquire() # 須要鑰匙
print(3)
sem.release()
sem.acquire() # 須要鑰匙
print(4)
8, 實例
import time
import random
from multiprocessing import Semaphore
from multiprocessing import Process
def sing(i,sem):
sem.acquire()
print('%s : 進入 ktv'%i)
time.sleep(random.randint(1,10))
print('%s : 出 ktv'%i)
sem.release()
# 迷你唱吧 20我的,同一時間只能有4我的進去唱歌
if __name__ == '__main__':
sem = Semaphore(4)
for i in range(20):
Process(target=sing,args=(i,sem)).start()
事件
9,
# recv accept input sleep 同步阻塞
# lock鎖 是異步阻塞
# 事件——異步阻塞
# 事件是一個通知,標誌,能夠同時使全部進程 都陷入阻塞
from multiprocessing import Event #事件
e = Event() # 實例化一個事件 標誌 /交通訊號燈
e.wait() # 剛實例化出來的一個事件對象,阻塞信號 /是紅燈
# 執行到wait,要先看燈,綠燈行紅燈停,若是在停的過程當中燈綠了,就變成非阻塞了
e.set() # 將標誌變成非阻塞 /交通燈變綠
e.clear() # 將標誌又變成阻塞 /交通燈變紅
e.is_set() # 判斷是否阻塞 True就是綠燈 False就是紅燈
10,實現一個紅綠燈處隨機經過的車輛,每一個數表明一個車輛(進程)
import time
import random
for i in range(100): # 一百輛車
if i%random.randint(3,6) == 0 : #隨機生成一個數,用車號除以生成數。
time.sleep(random.randint(1,3)) #知足條件的車,等待隨機幾秒在經過
print('發生等待:',end=' ') # 標識
print('%s號車經過'%i)
11,實例:紅綠燈
import time
import random
from multiprocessing import Process
from multiprocessing import Event
def traffic_light(e):
while True:
if e.is_set():
time.sleep(3)
print('紅燈亮')
e.clear() # 綠變紅
else:
time.sleep(3)
print('綠燈亮')
e.set() # 紅變綠
def car(i,e):
e.wait()
print('%s車經過'%i)
if __name__ == '__main__':
e = Event() # 立一個紅燈
tra = Process(target=traffic_light,args=(e,))
tra.start() # 啓動一個進程來控制紅綠燈
for i in range(100):
if i%6 == 0 :
time.sleep(random.randint(1,3))
car_pro = Process(target=car, args=(i,e))
car_pro.start()
進程間通訊——隊列
12,隊列
1.進程之間通訊 可使用multiprocessing 的 Queue模塊
2.隊列有兩種建立方式 第一種不傳參數 這個隊列就沒有長度限制 ;傳參數,建立一個有最大長度限制的隊列
3.提供兩個重要方法;put() get()
4.qsize 可能出錯
from multiprocessing import Queue
q = Queue(3) # 能夠設置長度
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print(q.get())隊列
print(q.get())
print(q.get())
print(q.get()) # 若是隊列裏已經沒有值了 就會阻塞等待有一個值
13,進程間通訊
一個進程往隊列裏放數據,一個進程能拿到隊列裏的數據
from multiprocessing import Process
from multiprocessing import Queue
def q_put(q):
q.put('hello')
def q_get(q):
print(q.get())
if __name__ =='__main__':
q = Queue()
p = Process(target=q_put,args=(q,))
p.start()
p1 = Process(target=q_get, args=(q,))
p1.start()
14, 生產者消費者模型
我要生產一個數據 而後 給一個函數 讓這個函數依賴這個數據進行運算 拿到結果 —— 同步過程
# 但生產者生成數據很快,而消費者處理數據很慢,可使用多個相同的進程處理生產的數據。
# 作包子 和 吃包子
import time
def producer(q): # 生產者
for i in range(100): # 生產一百個包子
q.put('包子%s'%i)
def consumer(q): # 消費者
for i in range(100):
time.sleep(1)
print(q.get())
if __name__ == '__main__':
q = Queue(10) # 托盤 # 限制隊列長度十,當隊列滿了,只有用一個才能在放一個新的。
# 防止佔用內存
p = Process(target=producer,args=(q,))
p.start()
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.start() #用兩個消費者進程,處理數據。
c2.start()
15,解決 數據的供需不平衡
# 同步 生產數據 使用數據
# 異步 主進程 生產數據 子進程使用數據 —— 問題:500個子進程去處理數據 生產過慢
# 異步 子進程 生產數據 子進程處理數據 —— 問題:不知道生產慢仍是處理慢
15.1,
# 3個子進程 生產包子
# 2個子進程 吃包子
# 生產的快 吃的慢 包子溢出
# 生產的慢 吃得快 包子不夠吃
# 若是增長生產者 —— 讓生產變快
# 減小生產者 —— 找一個容器放 約束容器的容量
import time
import random
from multiprocessing import Queue
from multiprocessing import Process
def producer(q,food):
for i in range(5):
q.put('%s-%s'%(food,i))
print('生產了%s'%food)
# time.sleep(random.randint(1,3))
time.sleep(random.random())
q.put(None) # 有幾個消費者進程,就要放入幾個
q.put(None)
q.put(None)
def consumer(q,name):
while True:
food = q.get() # 生產者不生產仍是生產的慢
if food == None:break
print('%s 吃了 %s'%(name,food))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'包子'))
p1.start()
p2 = Process(target=producer, args=(q, '骨頭'))
p2.start()
c1 = Process(target=consumer, args=(q, 'alex'))
c1.start()
c2 = Process(target=consumer, args=(q, 'egon'))
c2.start()
c3 = Process(target=consumer, args=(q, 'jin'))
c3.start()
# 隊列不會亂,由於一次出一個,先進先出。
# 如今 經過queue
# 生產者消費者模型
#1.消費者要處理多少數據是不肯定的
#2.因此只能用while循環來處理數據 ,可是while循環沒法結束
#3.須要生產者發送信號
#4.有多少個消費者 就須要發送多少個信號
#5.可是發送的信號數量須要根據 生產者和消費者的數量進行計算,因此很是不方便
16,JoinableQueue 解決上面問題
# 使用上面的方式建立的隊列對象,有join和task_done兩個方法。
# q.task_done() 代表完成隊列中一個數據的處理。放到消費者代碼後,當消費者處理完這個數據後執行這個代碼。
# q.join() 放到生產者代碼後,與上一個方法搭配使用,檢查到隊列中的全部數據都處理完了(taskdone)不在阻塞。生產者代碼結束。
# 在主進程後加上每一個生產者進程的阻塞,消費者沒處理完數據,生產者就不會結束,主進程中設置的生產者阻塞就不經過,即主進程代碼不會執行完。
# 將每一個消費者進程設置成 守護進程,這樣當主進程代碼執行完,即生產者已經生產了全部數據並處理完畢。守護進程結束,即消費者進程結束。
import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue
# def producer(q,food):
# for i in range(5):
# q.put('%s-%s'%(food,i))
# print('生產了%s'%food)
# # time.sleep(random.randint(1,3))
# time.sleep(random.random())
# q.join() # 等待 消費者 把全部的數據都處理完
#
# def consumer(q,name):
# while True:
# food = q.get()
# print('%s 吃了 %s'%(name,food))
# q.task_done() # 這個數據處理完了
#
# if __name__ == '__main__':
# q = JoinableQueue()
# p1 = Process(target=producer,args=(q,'泔水'))
# p1.start()
# p2 = Process(target=producer, args=(q, '骨頭'))
# p2.start()
# c1 = Process(target=consumer, args=(q, 'alex'))
# c1.daemon = True # 將消費者設置成守護進程
# c1.start()
# c2 = Process(target=consumer, args=(q, 'egon'))
# c2.daemon = True
# c2.start()
# c3 = Process(target=consumer, args=(q, 'jin'))
# c3.daemon = True
# c3.start()
#
# p1.join() # 等待p1執行完畢
# p2.join() # 等待p2執行完畢
#
# # 生產者生產的數據所有被消費 —— 生產者進程結束 —— 主進程代碼執行結束 —— 消費者守護進程結束
管道 pipe
17,pipe與兩個重要方法 send() recv()
from multiprocessing import Process
from multiprocessing import Pipe
p1,p2 = Pipe() #支持雙向通訊
p1.send('hello')
print(p2.recv())
p2.send('hi')
# p2.close() # p2端關閉
print(p1.recv())
print(p1.recv()) # 若是p2端沒有關閉,會發生阻塞
# p2端關閉,管道內沒有數據可接收時,報錯 EOFError
18,進程中用
from multiprocessing import Process
from multiprocessing import Pipe
def func(son)
print(son.recv()) # 接收到
print(son,recv()) #接收不到報錯
if __name__ == '__main__':
foo,son = Pipe()
p = Process(target=func,args=(son,))
p.start()
foo.send('hello')
foo.close()
19,下面的狀況關閉後沒有報錯而是阻塞
from multiprocessing import Process
from multiprocessing import Pipe
def func(p)
foo,son = p # 此處形成了一個管道兩端各有兩個口
# foo.close() # 關閉子進程的foo,recv沒有接收到會報錯。
# 可是子進程foo的關閉主進程沒關閉,還會阻塞。
print(son.recv()) # 接收到
print(son,recv()) # 阻塞
if __name__ == '__main__':
foo,son = Pipe()
p = Process(target=func,args=((foo,son),))
p.start()
son.close()
foo.send('hello')
foo.close() # 關閉的是主進程的foo ,子進程的foo沒有關閉。
# 因此主進程要及時關閉son ,子進程要關閉不用的foo
# 管道不安全,若是向上面同樣,一個管道有多個發送端口有多個接收端口,當管道中有一個數據時,若是,兩個接收端同時去拿這各數據,會形成這兩個接收端都拿到了這個數據。
20,用管道也能實現生產者消費者模型
管道+鎖
def func(p,l): foo, son = p foo.close() while True: try : l.acquire() print(son.recv()) # EOFError
l.release() except EOFError: l.release() son.close() break
def func2(p): foo, son = p son.close() for i in range(10): foo.send(i) foo.close() if __name__ == '__main__': foo,son = Pipe() l = Lock() p = Process(target=func,args=((foo,son),l)) p1 = Process(target=func,args=((foo,son),l)) p2 = Process(target=func,args=((foo,son),l)) p.start() p1.start() p2.start() p3 = Process(target=func2, args=((foo, son),)).start() p4 = Process(target=func2, args=((foo, son),)).start() p5 = Process(target=func2, args=((foo, son),)).start() p6 = Process(target=func2, args=((foo, son),)).start() p7 = Process(target=func2, args=((foo, son),)).start() son.close() foo.close()
21,管道或隊列的先擇
管道:多個機器多個進程間通訊
隊列:用在同一臺機器的多個進程之間通訊
Manager模塊
22,Manager 模塊
# Pipe 管道 : 雙向通訊 數據不安全
# Queue 管道+鎖: 雙向通訊 數據安全
# JoinableQueue :數據安全
# put 和 get的一個計數機制 ,每次get數據以後發送task_done,put端接收到計數-1,直到計數爲0就能感知到
# Manager是一個類 就提供了能夠進行數據共享的一個機制 提供了不少數據類型 dict list
if __name__ == '__main__':
m = Manager()
d = m.dict() # 建立的字典進程間能共享的字典
print(d)
22.1,例
# Manager : dict list pipe ,並不提供數據安全的支持
# def func(dic):
# print(dic)
# # while True:
# # print(dic)
# # time.sleep(3)
# if __name__ == '__main__':
# m = Manager()
# d = m.dict({'count':0})
# print(d)
# # print(d)
# # d['count'] = 0
# # print(d)
# # d = {}
# p = Process(target=func,args=(d,))
# p.start()
# d['count'] = 0
22.2,例
# from multiprocessing import Manager,Process,Lock
# def work(d,lock):
# lock.acquire()
# d['count'] -= 1
# lock.release()
#
# if __name__ == '__main__':
# lock= Lock()
# m = Manager()
# dic=m.dict({'count':100}) # 共享的數據
# l = []
# for i in range(100):
# p=Process(target=work,args=(dic,lock))
# p.start()
# l.append(p)
# [p.join() for p in l]
# print(dic)
進程池
# 一個電腦四個核,無論開幾個進程一次最多同時處理四個進程。
# 有100 個任務,要處理,造一個池子, 放四個進程(或五個不限,但多了沒啥用)用來完成任務。
# 開啓進程關進程,也須要時間,開的進程 多了花費時間就多
# 開的進程再多,一次最多處理四個進程。而進度池,是開四個或五個進程,反覆使用完成任務。
概念介紹:
Pool([numprocess [,initializer [, initargs]]]):建立進程池
參數:
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None initargs:是要傳給initializer的參數組
主要方法:
1) p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
2) p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
3) p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
4) P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
其它方法:
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
1) obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
2) obj.ready():若是調用完成,返回True
3) obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
4) obj.wait([timeout]):等待結果變爲可用。
5) obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
23,建立進程池
23.1,
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(1) # 結果是五個五個的出來,即同時作五個任務。
print(i,os.getpid()) # 每五個進程的pid是同樣的。
# 注意其實輸出沒順序,各進程是異步的。
if __name__ == '__main__':
p = Pool(5) # 建立能容納5個進程的進程池對象
p.map(func,range(20)) # 向池中提交任務
23.2,
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(1) # 結果是五個五個的出來,即同時作五個任務。
print(i,os.getpid()) # 每五個進程的pid是同樣的。
# 注意其實輸出沒順序,各進程是異步的。
if __name__ == '__main__':
p = Pool(5)
p.map(func,range(20))
p.close() # 是不容許再向進程池中添加任務,不是關閉對象。
p.join() # 必須close以後在join
print('====') # 加上上兩句,才保證任務處理完以後,在執行這一句。
# 由於是異步的,不加上兩句,這句可能會在,任務沒處理完就執行了。
23.3,
import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
i += 1
if __name__ == '__main__':
# 設置進度池,完成一百個任務(用時短,佔內存小)
# 只開可5個進程
p = Pool(5) # 建立了5個進程
start = time.time()
p.map(func,range(1000)) # target = func args=next(iterable) 必須是可迭代的
#要傳多個用元組 [(1,2,3),1,2,3,4]
p.close()
p.join()
print(time.time() - start)
start = time.time()
# 開一百個進程,完成一百個任務(用時長,佔內存多)
# 開了一百個進程
l = []
for i in range(1000):
p = Process(target=func,args=(i,)) # 建立了一百個進程
p.start()
l.append(p)
[i.join() for i in l] # 等待全部進程結束
print(time.time() - start)
24,進程池的 apply 方法
import time
from multiprocessing import Pool
def func(i):
time.sleep(1)
i += 1
if __name__ == '__main__':
p = Pool(5)
for i in range(20):
# p.apply(func,args=(i,)) # apply是同步提交任務的機制,執行完一個任務才提交下一個任務。多進程等於沒開。
p.apply_async(func,args=(i,)) # apply_async是異步提交任務的機制
p.close()
p.join()
回調函數
25,回調函數
25.1,回調函數的使用
import os
from multiprocessing import Pool
def func(i): # 多進程中的io多,
print('子進程%s:%s'%(i,os.getpid())) # 子進程pid
return i*'*'
def call(arg): # 回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值
print('回調 :',os.getpid()) # 回調函數pid
print(arg)
if __name__ == '__main__':
print('---->',os.getpid()) # 主進程pid
p = Pool(5)
for i in range(10):
p.apply_async(func,args=(i,),callback=call)
# callback設置回調函數,即執行完func後返回的值不會再返回到調用它的地方,而是放回給回調函數作參數。
p.close()
p.join()
25.2,例子1
# 請求網頁
# 網絡延時 IO操做
# 單進程
# 10個頁面 同時訪問多個 —> 用多進程
# 分析頁面:——>用回調函數
# from urllib.request import urlopen # 這個模塊須要安裝
# import requests
# ret = requests.get('http://www.baidu.com')
# print(ret.text) # 整個網頁的源代碼
# print(ret.status_code) # 訪問網頁的狀態碼
25.2,例子2
from urllib.request import urlopen
import requests
from multiprocessing import Pool
def get_url(url):
ret = requests.get(url)
return {'url':url,
'status_code':ret.status_code,
'content':ret.text}
def parser(dic):
print(dic['url'],dic['status_code'],len(dic['content']))
# 把分析結果寫到文件裏
if __name__ == '__main__':
url_l = [
'http://www.baidu.com',
'http://www.sogou.com',
'http://www.hao123.com',
'http://www.yangxiaoer.cc',
'http://www.python.org'
]
p = Pool(4)
for url in url_l:
p.apply_async(get_url,args=(url,),callback=parser)
p.close()
p.join()
25.3,爬蟲實例
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()
25.4,若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中全部進程執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到全部結果 print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理
進程池的其餘實現方式:https://docs.python.org/dev/library/concurrent.futures.html