對於CPU計算密集型的任務,python的多線程跟單線程沒什麼區別,甚至有可能會更慢,可是對於IO密集型的任務,好比http請求這類任務,python的多線程仍是有用處。在平常的使用中,常常會結合多線程和隊列一塊兒使用,好比,以爬取simpledestops 網站壁紙爲例:html
import os from datetime import datetime from queue import Queue from threading import Thread import requests requests.packages.urllib3.disable_warnings() from bs4 import BeautifulSoup import re if not os.path.exists('img'): os.mkdir('img') # 聲明一個隊列 Q = Queue() def producer(pages): for page in range(1,pages+1): # 提取每一頁的圖片 url 加入隊列 print("[-] 收集第 {} 頁".format(str(page))) url = "http://simpledesktops.com/browse/"+str(page)+"/" r = requests.get(url,verify=False) html = r.text soup = BeautifulSoup(html,'html.parser') try: imgs = soup.find_all('img') for img in imgs: img_url = img['src'] Q.put(img_url) except: pass def worker(i): # 取出隊列的值,按順序取,下載圖片 while not Q.empty(): img_url = Q.get() text = re.search('(http://static.simpledesktops.com/uploads/desktops/\d+/\d+/\d+/(.*?png)).*?png',img_url) new_img_url = text.group(1) r = requests.get(new_img_url,verify=False) path = "img/"+text.group(2) print("[-] 線程 {} 開始下載 {} 開始時間:{}".format(i,text.group(2),datetime.now())) with open(path,'wb') as f: f.write(r.content) Q.all_tasks_done if __name__ =="__main__": # 必定要將數據加入隊列,不然是啓動不了的,由於隊列爲空 producer(50) # 線程的聲明 ts = [Thread(target=worker,args=(i,)) for i in range(50)] for t in ts: t.start() for t in ts: t.join()
咱們使用start啓動多線程,使用 join 防止主線程退出的時候結束全部的線程,使用隊列有序的且併發的下載壁紙。 仔細觀察就會發現代碼其實有跡可循,更改其中的爬取內容的部分代碼後,咱們就能夠應用於爬取別的網站。python
# coding: utf-8 import threading,time import requests requests.packages.urllib3.disable_warnings() from datetime import datetime local_variable = threading.local() # 邏輯處理函數 def worker(): print("每一個線程啓動的時間: ",datetime.now()) time.sleep(10) url = local_variable.url r = requests.get(url,verify=False) print(r.url,datetime.strftime(datetime.now(),'%H:%M:%S'),threading.current_thread().name) # 線程處理函數 def process_thread(url): local_variable.url = url worker() if __name__ == "__main__": ts = [threading.Thread(target=process_thread,args=(url,))for url in ['https://www.baidu.com','https://www.google.com','https://www.bing.com']] for t in ts: t.start() for t in ts: t.join()
線程Thread-1 啓動的時間:2019-01-09 11:25:18.339631 線程Thread-2 啓動的時間:2019-01-09 11:25:18.340646 線程Thread-3 啓動的時間:2019-01-09 11:25:18.342635 https://www.baidu.com/ 11:25:28 Thread-1 https://cn.bing.com/ 11:25:29 Thread-3 https://www.google.com/ 11:25:29 Thread-2
python中使用 multiprocessing 來建立多進程,若是要建立多個子進程,則須要使用 進程池 Pool 來建立,一個簡單的例子:併發
from multiprocessing import Pool import os from datetime import datetime ''' @param {type} int @return: None ''' def print_num(i): print("進程{} 打印 {}".format(os.getpid(),i)) if __name__ == "__main__": p = Pool(4) for i in range(100): p.apply_async(print_num,args=(i,)) # 關閉進程池,再也不加入進程 p.close() # 防止主進程結束,子進程沒法繼續運行 p.join()
進程2624 打印 0 進程2625 打印 1 進程2626 打印 3 進程2627 打印 2 進程2624 打印 4 進程2625 打印 5 進程2626 打印 6 進程2627 打印 7 進程2624 打印 8 ...
from multiprocessing import Pool import threading import os,time import queue from datetime import datetime def producer(i): Q = queue.Queue() start = 25*(i-1) end = 100 * int(i / 4) for x in range(start,end): Q.put(x) return Q def process_thread(Q,j): while not Q.empty(): item = Q.get() print("進程{}: 線程{} 正在消耗:{} 時間:{}".format(os.getpid(),j,item,datetime.now())) Q.all_tasks_done def tasks(i): Q = producer(i) ts = [threading.Thread(target=process_thread,args=(Q,j)) for j in range(10)] for t in ts: t.start() for t in ts: t.join() if __name__ == "__main__": start = datetime.now() p = Pool(4) for i in range(1,5): print(i) p.apply_async(tasks,args=(i,)) p.close() p.join() end = datetime.now() waste = end-start print("一共花費了: {}".format(waste))
先將要處理的數據,填進隊列,而後建立4個進程,10個線程運行。 其輸出爲:分佈式
''' (venv) C:\project\libraries-python>python bulit-in-libraries\threading\multithreading.py 進程17020: 線程0 正在消耗:1 時間:2019-01-09 12:50:48.701523 進程17020: 線程1 正在消耗:2 時間:2019-01-09 12:50:48.703521 進程17020: 線程3 正在消耗:4 時間:2019-01-09 12:50:48.704365 進程17020: 線程2 正在消耗:3 時間:2019-01-09 12:50:48.704365 進程2804: 線程0 正在消耗:5 時間:2019-01-09 12:50:48.706349 進程2804: 線程1 正在消耗:6 時間:2019-01-09 12:50:48.707352 進程2804: 線程4 正在消耗:9 時間:2019-01-09 12:50:48.708355 進程2804: 線程3 正在消耗:8 時間:2019-01-09 12:50:48.708355 進程2804: 線程2 正在消耗:7 時間:2019-01-09 12:50:48.708355 進程16060: 線程0 正在消耗:10 時間:2019-01-09 12:50:48.728409 進程16060: 線程1 正在消耗:11 時間:2019-01-09 12:50:48.730413 進程16060: 線程4 正在消耗:14 時間:2019-01-09 12:50:48.732418 進程16060: 線程3 正在消耗:13 時間:2019-01-09 12:50:48.732418 進程16060: 線程2 正在消耗:12 時間:2019-01-09 12:50:48.732418 進程7588: 線程3 正在消耗:18 時間:2019-01-09 12:50:48.761808 進程7588: 線程4 正在消耗:19 時間:2019-01-09 12:50:48.761808 進程7588: 線程0 正在消耗:15 時間:2019-01-09 12:50:48.761808 進程7588: 線程1 正在消耗:16 時間:2019-01-09 12:50:48.761808 進程7588: 線程2 正在消耗:17 時間:2019-01-09 12:50:48.761808
在python中,若是要運行系統命令,會使用 subprocess 來運行,官方建議使用run 方法來運行系統命令,更高級的用法是直接使用其 Popen 接口。
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)
import subprocess subprocess.run(['ls','-al'])
在python3.7 以前,默認系統命令執行的結果(輸出/錯誤)不存在stdout/stderr 裏面,須要設置 capture_output=True,而在python3.6 版本,若是你須要使用執行的結果,你就須要設置 stdout. 以下所示
# python 3.6 >>> a = subprocess.run(['ls','-al'],stdout=subprocess.PIPE) >>> a.stdout # python3.7 >>> a = subprocess.run(['ls','-al'],capture_output=True) >>> a.stdout
因此能夠看出python3.7 又作了一層封裝,爲了讓你們使用更上一層的接口。能夠看一下幾個參數的含義爲:
args 列表,爲shell命令 shell boolean值, 設置後,args能夠直接接受shell命令 capture_output = True , 設置後,stdout/stderr會存儲值 check=True, 設置後,若是程序異常退出,會跑出一個CalledProcessError異常 cwd 是工做目錄,能夠爲str,或者path-like 類
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, encoding=None, errors=None)
p = subprocess.Popen(['ls','-al'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
其次,經過Popen.communicate() ,子進程能夠在啓動了之後,還能夠進行參數的輸入
import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8')) print('Exit code:', p.returncode) 其輸出: $ nslookup Server: Address: Non-authoritative answer: python.org mail exchanger = 50 mail.python.org. Authoritative answers can be found from: mail.python.org internet address = mail.python.org has AAAA address 2001:888:2000:d::a6 Exit code: 0
# master import random,time,queue from multiprocessing.managers import BaseManager task_queue = queue.Queue() result_queue = queue.Queue() class QueueManager(BaseManager): pass QueueManager.register('get_task_queue',callable=lambda:task_queue) QueueManager.register('get_result_queue',callable=lambda:result_queue) manager = QueueManager(address=('',5000),authkey=b'abc') manager.start() tasks = manager.get_task_queue() results = manager.get_result_queue() for i in range(10): n = random.randint(0,10000) print('put task {}'.format(n)) tasks.put(n) print('try get results...') for i in range(10): r = results.get(timeout=100) print('result:{}'.format(r)) manager.shutdown() print('master exit') # worker import time,sys,queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # master的主機地址 server_addr = '' print('connect to server...') m = QueueManager(address=(server_addr,5000),authkey=b'abc') m.connect() tasks = m.get_task_queue() results = m.get_result_queue() for i in range(10): try: n = tasks.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '{} * {} = {}'.format(n,n,n*n) time.sleep(1) results.put(r) except Queue.Empty: print('task queue is empty.') print('worker exit.')