python基礎(34):線程(二)

1. python線程

1.1 全局解釋器鎖GIL

Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中能夠「運行」多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。python

在多線程環境中,Python 虛擬機按如下方式執行:git

設置 GIL程序員

切換到一個線程去運行github

運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用 time.sleep(0))編程

把線程設置爲睡眠狀態json

解鎖 GIL安全

再次重複以上全部步驟服務器

在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束爲止(因爲在這期間沒有Python的字節碼被運行,因此不會作線程切換)編寫擴展的程序員能夠主動解鎖GIL。數據結構

1.2 python線程模塊的選擇

Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊容許程序員建立和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構。多線程

避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程結束時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做,至少threading模塊能確保重要的子線程退出後進程才退出。 

thread模塊不支持守護線程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出。而threading模塊支持守護線程,守護線程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就在那等着,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。

2. threading模塊

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹。

2.1 線程的建立Threading.Thread類

2.1.1 線程的建立

建立線程的方式1:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')

建立線程的方式2:

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)

if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主線程')

2.1.2 多線程與多進程

pid的比較:

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進程pid',os.getpid())

    #part2:開多個進程,每一個進程都有不一樣的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進程pid',os.getpid())

開啓效率的較量:

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主進程下開啓線程
    t=Thread(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    hello
    主線程/主進程
    '''

    #在主進程下開啓子進程
    t=Process(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    主線程/主進程
    hello
    '''

內存數據的共享問題:

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100

    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據
同一進程內的線程共享該進程的數據?

2.1.3 Thread類的其餘方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

代碼示例:

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())

if __name__ == '__main__':
    #在主進程下開啓線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進程
    Thread-1
    '''

join方法:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    egon say hello
    主線程
    False
    '''

2.1.4 守護線程

不管是進程仍是線程,都遵循:守護xx會等待主xx運行完畢後被銷燬。須要強調的是:運行完畢並不是終止運行。

對主進程來講,運行完畢指的是主進程代碼運行完畢。

對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢。

主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束。

主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

守護線程實例1:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()以前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''

守護線程實例2:

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

2.2 鎖

2.2.1 同步鎖

多個線程搶佔資源的狀況:

from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能爲99

對公共數據的操做:

import threading
R=threading.Lock()
R.acquire()
R.release()

同步鎖的引用:

from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全

互斥鎖與join的區別:

#不加鎖:併發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''

#不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼併發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是
#start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1

if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麼的恐怖
'''

2.2.2 死鎖與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一切說了額。

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖。

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

遞歸鎖RLock:

from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

典型問題:科學家吃麪

死鎖問題:

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

遞歸鎖解決死鎖問題:

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

2.3 線程隊列

queue隊列 :使用import queue,用法與進程Queue同樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class   queue. Queue (maxsize=0) #先進先出

先進先出:

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #last in fisrt out

後進先出:

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

優先級隊列:

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

2.4 python標準模塊--concurrent.futures

#1 介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做

#shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數

# done()
判斷某一個線程是否完成

# cancle()
取消某個任務

ProcessPoolExecutor:

#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.


#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

ThreadPoolExecutor:

#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
與ProcessPoolExecutor相同

map的用法:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit

回調函數:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
相關文章
相關標籤/搜索