7-併發編程

多線程

簡單示例

對於CPU計算密集型的任務,python的多線程跟單線程沒什麼區別,甚至有可能會更慢,可是對於IO密集型的任務,好比http請求這類任務,python的多線程仍是有用處。在平常的使用中,常常會結合多線程和隊列一塊兒使用,好比,以爬取simpledestops 網站壁紙爲例:html

import os 
from datetime import datetime 
from queue import Queue
from threading import Thread
import requests
requests.packages.urllib3.disable_warnings()

from bs4 import BeautifulSoup
import re

if not os.path.exists('img'):
    os.mkdir('img')

# 聲明一個隊列
Q = Queue()

def producer(pages):
    for page in range(1,pages+1):
        # 提取每一頁的圖片 url 加入隊列
        print("[-] 收集第 {} 頁".format(str(page)))
        url = "http://simpledesktops.com/browse/"+str(page)+"/"
        r = requests.get(url,verify=False)
        html = r.text
        soup = BeautifulSoup(html,'html.parser')
        try:
            imgs = soup.find_all('img')
            for img in imgs:
                img_url = img['src']
                Q.put(img_url)
        except:
            pass

def worker(i):
   # 取出隊列的值,按順序取,下載圖片
    while not Q.empty():
        img_url = Q.get()
        text = re.search('(http://static.simpledesktops.com/uploads/desktops/\d+/\d+/\d+/(.*?png)).*?png',img_url)
        new_img_url = text.group(1)

        r = requests.get(new_img_url,verify=False)
        path = "img/"+text.group(2)
        print("[-] 線程 {} 開始下載 {} 開始時間:{}".format(i,text.group(2),datetime.now()))

        with open(path,'wb') as f:
            f.write(r.content)
    
    Q.all_tasks_done


if __name__ =="__main__":
    # 必定要將數據加入隊列,不然是啓動不了的,由於隊列爲空 
    producer(50)
    # 線程的聲明
    ts = [Thread(target=worker,args=(i,)) for i in range(50)]
    for t in ts:
        t.start()

    for t in ts:
        t.join()

咱們使用start啓動多線程,使用 join 防止主線程退出的時候結束全部的線程,使用隊列有序的且併發的下載壁紙。 仔細觀察就會發現代碼其實有跡可循,更改其中的爬取內容的部分代碼後,咱們就能夠應用於爬取別的網站。python

ThreadLocal

按照道理來講,多線程中,每一個線程的處理邏輯應該是相同的,可是其處理的數據,卻不必定是相同的,若是數據是全局的,那麼咱們就須要加鎖,防止數據混亂,這樣一來就會麻煩不少,因此線程處理的數據最好是局部的、其餘線程不能干擾的。shell

代碼示例:session

# coding: utf-8 

import threading,time
import requests
requests.packages.urllib3.disable_warnings()
from datetime import datetime 

local_variable = threading.local()

# 邏輯處理函數
def worker():
    print("每一個線程啓動的時間: ",datetime.now())
    time.sleep(10)
    url = local_variable.url
    r = requests.get(url,verify=False)
    print(r.url,datetime.strftime(datetime.now(),'%H:%M:%S'),threading.current_thread().name)

# 線程處理函數
def process_thread(url):
    local_variable.url = url
    worker()


if __name__ == "__main__":
    ts = [threading.Thread(target=process_thread,args=(url,))for url in ['https://www.baidu.com','https://www.google.com','https://www.bing.com']]
    for t in ts:
        t.start()

    for t in ts:

        t.join()

輸出:多線程

線程Thread-1 啓動的時間:2019-01-09 11:25:18.339631
線程Thread-2 啓動的時間:2019-01-09 11:25:18.340646
線程Thread-3 啓動的時間:2019-01-09 11:25:18.342635
https://www.baidu.com/ 11:25:28 Thread-1
https://cn.bing.com/ 11:25:29 Thread-3
https://www.google.com/ 11:25:29 Thread-2

多進程

進程池

python中使用 multiprocessing 來建立多進程,若是要建立多個子進程,則須要使用 進程池 Pool 來建立,一個簡單的例子:併發

from multiprocessing import Pool
import os 
from datetime import datetime 


'''
@param {type} int
@return: None
'''
def print_num(i):
    print("進程{} 打印 {}".format(os.getpid(),i))


if __name__ == "__main__":
    p = Pool(4)
    for i in range(100):
        p.apply_async(print_num,args=(i,))
    # 關閉進程池,再也不加入進程
    p.close()
    # 防止主進程結束,子進程沒法繼續運行
    p.join()

輸出:app

進程2624 打印 0
進程2625 打印 1
進程2626 打印 3
進程2627 打印 2
進程2624 打印 4
進程2625 打印 5
進程2626 打印 6
進程2627 打印 7
進程2624 打印 8
...

進程能夠實現並行運行代碼,可是一旦進程太多,CPU運行不過來也是須要進行等待,用了多進程之後,就能夠不使用隊列了,也能夠實現多線程的效果dom

除此以外,還能夠多進程和多線程結合起來使用,一個簡單的例子async

from multiprocessing import Pool
import threading
import os,time 
import queue 
from datetime import datetime 

def producer(i):
    Q = queue.Queue()
    start = 25*(i-1)
    end = 100 * int(i / 4)

    for x in range(start,end):
        Q.put(x)
  
    
    return Q

def process_thread(Q,j):
    while not Q.empty():
        item = Q.get()
        print("進程{}: 線程{} 正在消耗:{} 時間:{}".format(os.getpid(),j,item,datetime.now()))

    Q.all_tasks_done


def tasks(i):
    Q = producer(i)
    ts = [threading.Thread(target=process_thread,args=(Q,j)) for j in range(10)]
    for t in ts:
        t.start()
    for t in ts:
        t.join()


if __name__ == "__main__":
    start = datetime.now()
    p = Pool(4)
    for i in range(1,5):
        print(i)
        p.apply_async(tasks,args=(i,))
    p.close()
    p.join()
    end = datetime.now()
    waste = end-start
    print("一共花費了: {}".format(waste))

先將要處理的數據,填進隊列,而後建立4個進程,10個線程運行。 其輸出爲:分佈式

'''
(venv) C:\project\libraries-python>python bulit-in-libraries\threading\multithreading.py
進程17020: 線程0 正在消耗:1 時間:2019-01-09 12:50:48.701523
進程17020: 線程1 正在消耗:2 時間:2019-01-09 12:50:48.703521
進程17020: 線程3 正在消耗:4 時間:2019-01-09 12:50:48.704365
進程17020: 線程2 正在消耗:3 時間:2019-01-09 12:50:48.704365

進程2804: 線程0 正在消耗:5 時間:2019-01-09 12:50:48.706349
進程2804: 線程1 正在消耗:6 時間:2019-01-09 12:50:48.707352
進程2804: 線程4 正在消耗:9 時間:2019-01-09 12:50:48.708355
進程2804: 線程3 正在消耗:8 時間:2019-01-09 12:50:48.708355
進程2804: 線程2 正在消耗:7 時間:2019-01-09 12:50:48.708355

進程16060: 線程0 正在消耗:10 時間:2019-01-09 12:50:48.728409
進程16060: 線程1 正在消耗:11 時間:2019-01-09 12:50:48.730413
進程16060: 線程4 正在消耗:14 時間:2019-01-09 12:50:48.732418
進程16060: 線程3 正在消耗:13 時間:2019-01-09 12:50:48.732418
進程16060: 線程2 正在消耗:12 時間:2019-01-09 12:50:48.732418

進程7588: 線程3 正在消耗:18 時間:2019-01-09 12:50:48.761808
進程7588: 線程4 正在消耗:19 時間:2019-01-09 12:50:48.761808
進程7588: 線程0 正在消耗:15 時間:2019-01-09 12:50:48.761808
進程7588: 線程1 正在消耗:16 時間:2019-01-09 12:50:48.761808
進程7588: 線程2 正在消耗:17 時間:2019-01-09 12:50:48.761808

後來實驗了打印出10萬個數,4個進程,每一個進程400個線程,花費了39秒。而400個線程,只花費了17秒。因此有時候,也並非多就是好。進程線程切換都須要使用必定的時間。

子進程

在python中,若是要運行系統命令,會使用 subprocess 來運行,官方建議使用run 方法來運行系統命令,更高級的用法是直接使用其 Popen 接口。
其函數格式爲:

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None)

能夠看幾個簡單的例子:

直接使用

import subprocess
subprocess.run(['ls','-al'])

在python3.7 以前,默認系統命令執行的結果(輸出/錯誤)不存在stdout/stderr 裏面,須要設置 capture_output=True,而在python3.6 版本,若是你須要使用執行的結果,你就須要設置 stdout. 以下所示

# python 3.6
>>> a = subprocess.run(['ls','-al'],stdout=subprocess.PIPE)
>>> a.stdout

# python3.7 
>>> a = subprocess.run(['ls','-al'],capture_output=True)
>>> a.stdout

因此能夠看出python3.7 又作了一層封裝,爲了讓你們使用更上一層的接口。能夠看一下幾個參數的含義爲:

args 列表,爲shell命令
shell boolean值, 設置後,args能夠直接接受shell命令
capture_output = True , 設置後,stdout/stderr會存儲值
check=True, 設置後,若是程序異常退出,會跑出一個CalledProcessError異常
cwd 是工做目錄,能夠爲str,或者path-like 類

高級使用

Popen的構造函數:

class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, encoding=None, errors=None)

一個簡單的例子

p = subprocess.Popen(['ls','-al'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

其次,經過Popen.communicate() ,子進程能夠在啓動了之後,還能夠進行參數的輸入

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
其輸出:
$ nslookup
Server:        192.168.19.4
Address:    192.168.19.4#53

Non-authoritative answer:
python.org    mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
mail.python.org    internet address = 82.94.164.166
mail.python.org    has AAAA address 2001:888:2000:d::a6

Exit code: 0

分佈式多進程

python的分佈式接口簡單,使用起來也十分簡單,能夠參考廖雪峯的教程,須要的時候,修改代碼,便可完成屬於本身的分佈式程序

這裏貼出代碼:

# master
import random,time,queue
from multiprocessing.managers import BaseManager

task_queue = queue.Queue()
result_queue = queue.Queue()

class QueueManager(BaseManager):
    pass

QueueManager.register('get_task_queue',callable=lambda:task_queue)
QueueManager.register('get_result_queue',callable=lambda:result_queue)

manager = QueueManager(address=('',5000),authkey=b'abc')
manager.start()

tasks = manager.get_task_queue()
results = manager.get_result_queue()

for i in range(10):
    n = random.randint(0,10000)
    print('put task {}'.format(n))
    tasks.put(n)

print('try get results...')
for i in range(10):
    r = results.get(timeout=100)
    print('result:{}'.format(r))

manager.shutdown()
print('master exit')

# worker
import time,sys,queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass


QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# master的主機地址
server_addr = '127.0.0.1'
print('connect to server...')
m = QueueManager(address=(server_addr,5000),authkey=b'abc')
m.connect()

tasks = m.get_task_queue()
results = m.get_result_queue()

for i in range(10):
    try:
        n = tasks.get(timeout=1)
        print('run task %d * %d...' % (n, n))

        r = '{} * {} = {}'.format(n,n,n*n)
        time.sleep(1)
        results.put(r)
    except Queue.Empty:
        print('task queue is empty.')

print('worker exit.')

參考

https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000
https://docs.python.org/3.6/library/subprocess.html
https://docs.python.org/3.7/library/subprocess.html

相關文章
相關標籤/搜索