python大佬養成計劃----線程與多線程

建立線程

一個進程必有一個線程,進程也可由多個線程組成,但有一個線程爲主線程。
若一個任務須要花10Mins,當只有一個線程時,花費10Mins,當有十個線程時,可能就花費1Mins,因此多線程能夠提高任務執行時間,提升工做效率。
python裏與線程有關的模塊:php

  • _thread 底層
  • threading

查看當前運行的線程個數:threading.current_thread()
查看當前線程信息:threading.active_count()python

import _thread
import threading

def job():

    print("當前線程個數:",threading.active_count())
    print("當前線程信息",threading.current_thread())

if __name__=='__main__':
    job()

圖片描述

_thread建立多線程

調用thread模塊中的start_new_thread()函數來產生新線程。
thread.start_new_thread(function,args = ())git

#_thread建立多線程
import _thread
import time

def job(name):
    print("name:%s,time:%s" %(name,time.ctime()))

if __name__=="__main__":
    # 建立多個線程, 可是沒有開始執行任務
    _thread.start_new_thread(job,('thread1',))
    _thread.start_new_thread(job,('thread2',))
    while True: #盲等待
        pass

圖片描述

threading經過實例化Thread類建立多線程

_thread模塊提供了低級別的、原始的線程以及一個簡單的鎖。
threading模塊是對_thread再封裝,對使用者更友好
經過實例化Thread對象建立線程,Thread的方法有:github

  • run() #Method representing the thread's activity.
  • start() #Start the thread's activity.
  • join() #Wait until the thread terminates.
  • is_alive() #Return whether the thread is alive.
import threading

def job(name):
    print("當前執行的任務名:",name)
    print("當前線程個數:",threading.active_count())
    print("當前線程信息:",threading.current_thread())

if __name__=="__main__":
    t1 = threading.Thread(target=job,name='thread1',args=('job1',))
    t2 = threading.Thread(target=job,name='thread2',args=('job2',))
    t1.start()  #Start the thread's activity.
    t2.start()

圖片描述

使用多線程與不使用多線程的對比

不使用多線程執行任務,程序會一直等待sleep時間過去,在執行下一條命令。json

#不使用多線程
import time

def music(name):
    for i in range(2):
        print("i am listening :",name)
        time.sleep(2)
def read(book):
    for i in range(2):
        print("i am reading :",book)
        time.sleep(1)
if __name__ == '__main__':
    start_time = time.time()
    music("空空如也")
    read('面紗')
    print("花費時間: %s" %(time.time()-start_time))

圖片描述

使用多線程執行任務,在遇到某一線程須要等待時,會執行其餘線程
Thread.join()會等待當前線程執行結束,再執行主線程。api

import threading
import time


def music(name):
    for i in range(2):
        print("i am listening :",name)
        time.sleep(2)
def read(book):
    for i in range(2):
        print("i am reading :",book)
        time.sleep(1)
if __name__=="__main__":
    start_time = time.time()
    t1 = threading.Thread(target=music,args=('空空如也',))
    t2 = threading.Thread(target=read,args=('面紗',))
    t1.start()
    t2.start()
    t1.join()   #等待線程執行結束,才執行主程序,防止主線程阻塞子線程
    t2.join()
    end_time = time.time()
    print("任務執行時間:",end_time-start_time)

圖片描述

守護線程setDeamon

當申明一個子線程爲守護線程時,主線程結束時,子線程也結束。
申明守護線程須要在開啓線程以前。網絡

import threading
import time

def music(name):
    for i in range(2):
        print("listening music :",name)
        time.sleep(4)

def code(pro):
    for i in range(2):
        print('i am coding :',pro)
        time.sleep(5)

if __name__=='__main__':
    st_time = time.time()
    t1 = threading.Thread(target=music,args=('hello',))
    t2 = threading.Thread(target=code,args=('mydiff',))
    #將線程申明爲守護線程,若是設置爲True,當主線程結束,子線程也結束
    #必須在啓動線程以前進行設置
    t1.setDaemon(True)
    t2.setDaemon(True)  #主線程執行結束以後,子線程還沒來得及執行結束,整個程序就退出了
    t1.start()
    t2.start()
    end_time = time.time()
    print('運行時間:',end_time-st_time)

圖片描述

線程同步

若是多個線程共同對某個數據修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要對多個線程進行同步。
使用Thread對象的Lock和Rlock能夠實現簡單的線程同步,這兩個對象都有acquire方法和release方法,對於那些須要每次只容許一個線程操做的數據,能夠將其操做放到acquire和release方法之間。多線程

import threading

def add(lock):
    #操做變量以前加鎖
    lock.acquire()
    global money
    for i in range(1389993):
        money+=1
    #變量操做完成以後,解鎖
    lock.release()

def reduce(lock):
    #操做變量以前加鎖
    lock.acquire()
    global money
    for i in range(4728937):
        money-=1
    #變量操做完成以後,解鎖
    lock.release()

if __name__=="__main__":
    money = 0
    lock = threading.Lock() #示例化一個鎖對象
    t1 = threading.Thread(target=add,args=(lock,))
    t2 = threading.Thread(target=reduce,args=(lock,))
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    print('最終金額爲:',money)

圖片描述

GIL全局解釋器鎖

Python 代碼的執行由 Python 虛擬機(也叫解釋器主循環)來控制。Python 在設計之初就考慮到要在主循環中,同時只有一個線程在執行,就像單 CPU 的系統中運行多個進程那樣,內存中能夠存放多個程序,但任意時刻,只有一個程序在 CPU 中運行。一樣地,雖然 Python 解釋器中能夠「運行」,多個線程,但在任意時刻,只有一個線程在解釋器中運行。app

對 Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。函數

執行過程:

1). 設置GIL
2). 切換到線程去運行對應的任務;
3). 運行
    - 執行完了
    - time.sleep()
    - 獲取其餘信息才能繼續執行, eg: 從網絡上獲取網頁信息等;
3. 把線程設置爲睡眠狀態
4. 解鎖GIL
5.再次重複執行上述內容;

生產者消費者模型

在工做中,某些模塊生成一些數據,由另外一些模塊負責處理。產生數據的模塊,就形象地稱爲生產者;而處理數據的模塊,就稱爲消費者。在生產者與消費者之間在加個緩衝區,咱們形象的稱之爲倉庫,生產者負責往倉庫了進商品,而消費者負責從倉庫裏拿商品,這就構成了生產者消費者模式。
這裏,咱們用生產者消費者模型來實現多線程的網址訪問,節省時間。

#多線程實現生產者消費者模型
#實現不一樣的網址或ip訪問
import threading
from urllib.request import urlopen


def create_data():
    with open('ips.txt','w') as f:
        f.write("www.baidu.com\n")
        f.write("www.163.com\n")
        for i in range(100):
            f.write('172.25.254.%s\n' %(i+1))
def creat_url(filename='ips.txt'):
    ports=[80,443]
    with open(filename) as f:
        ips = [url_info.strip() for url_info in f.readlines()]
    urls = ['http://%s:%s' %(ip,port) for ip in ips for port in ports]
    return urls

def job(url):
    try:
        urlObj = urlopen(url)
    except Exception as e :
        print('Warnning!!!    %s不可訪問' %(url))
    else:
        print("%s能夠訪問" %(url))

if __name__=="__main__":
    urls = creat_url()
    threads = []
    for url in urls:
        t = threading.Thread(target=job,args=(url,))
        threads.append(t)
        t.start()
    [thread.join() for thread in threads]
    print("任務執行結束")

圖片描述

再封裝threading.Thread類

無參版

對threading.Thread類的再封裝,執行時無需傳遞參數

from threading import Thread
class IpThread(Thread):
    def __init__(self):
        super(IpThread, self).__init__()
# 將多線程須要執行的任務重寫到run方法中;
    def run(self):
        print("this is a JOB")
        print(type(self))

t = IpThread()
t.start()

圖片描述

含參版

實現訪問Ip地址

import json
from threading import Thread
from urllib.request import urlopen

class IpThread(Thread):
    #重寫構造方法,若是執行的任務須要傳遞參數,那將參數與self綁定
    def __init__(self,jobname,ip):
        super(IpThread, self).__init__()
        self.jobname = jobname
        self.ip = ip
    #將多線程須要執行的任務重寫到run方法中
    def run(self):
        print('this is a %s job' %(self.jobname))
        #須要有一個參數,傳遞ip
        url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (self.ip)
        try :
            # 根據url獲取網頁的內容, 而且解碼爲utf-8格式, 識別中文;
            result = urlopen(url).read().decode('utf-8')
        except Exception as e:
            print("訪問%s失敗" %(self.ip))
        else:
             # 將獲取的字符串類型轉換爲字典, 方便處理
            d = json.loads(result)['data']
            country = d['country']
            city = d['city']
        print("%s位於%s,城市爲%s" %(self.ip,country,city))

if __name__=="__main__":
    ips = ['172.25.254.22','8.8.8.8','89.31.136.0']
    threads = []
    for ip in ips :
        t = IpThread(jobname='Clawer',ip=ip)
        threads.append(t)
        t.start()
    [thread.join() for thread in threads]
    print("程序執行結束")

圖片描述

線程池

線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。線程池線程都是後臺線程。每一個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。

from concurrent.futures import ThreadPoolExecutor

import time
#須要執行的任務
def job():
    print("morning sheen")
    return 'new day'

if __name__=='__main__':
    #示例化對象,線程池裏最多有10個線程
    pool = ThreadPoolExecutor(max_workers=10)
    #往線程池裏扔須要執行的任務,返回一個對象 _base.Future()示例化出來的
    f1 = pool.submit(job)
    f2 = pool.submit(job)
    #判斷任務是否執行結束
    print(f1.done())
    time.sleep(1)
    print(f2.done())    #判斷是否釋放了線程
    #獲取執行任務的結果
    print(f1.result())
    print(f2.result())

圖片描述

線程池循環執行任務

線程池執行任務方式

concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數,另外一種是map()函數,二者的主要區別在於:

  • map能夠保證輸出的順序, submit輸出的順序是亂的
  • 若是你要提交的任務的函數是同樣的,就能夠簡化成map。可是假如提交的任務函數是不同的,或者執行的過程之可能出現異常(使用map執行過程當中發現問題會直接拋出錯誤)就要用到submit()
  • submit和map的參數是不一樣的,submit每次都須要提交一個目標函數和對應的參數,map只須要提交一次目標函數,目標函數的參數放在一個迭代器(列表,字典)裏就能夠。
from urllib.error import HTTPError
from urllib.request import urlopen
from concurrent.futures import ThreadPoolExecutor,as_completed
import time

URLS = ['http://httpbin.org', 'http://example.com/',
        'https://api.github.com/']*3
def get_page(url,timeout = 0.3):   #爬取網頁信息
    try:
        content = urlopen(url).read()
        return {'url':url, 'len':len(content)}
    except HTTPError as e:
        return {'url':url, 'len':0}

# 方法1: submit提交任務
start_time = time.time()
pool = ThreadPoolExecutor(max_workers=20)
#submit返回的是Future對象,對於Future對象能夠簡單地理解爲一個在將來完成的操做
futuresObj = [pool.submit(get_page, url) for url in URLS]
# # 注意: 傳遞的是包含futures對象的序列, as_complete返回已經執行完任務的future對象,
# # 直到全部的future對應的任務執行完成, 循環結束;
for finish_fs in as_completed(futuresObj):
    print(finish_fs.result() )
#submit返回值Future的方法result(self, timeout=None)
"""Return the result of the call that the future represents.
Args:
    timeout: The number of seconds to wait for the result if the future
        isn't done. If None, then there is no limit on the wait time.
Returns:
    The result of the call that the future represents."""
print("執行時間:%s" %(time.time()-start_time))

# 方法2:經過map方式執行
start2_time = time.time()
pool2 = ThreadPoolExecutor(max_workers=20)
for res in pool2.map(get_page, URLS):
    print(res)
print("執行時間:%s" %(time.time()-start2_time))

圖片描述

相關文章
相關標籤/搜索