併發編程——多線程

本節導讀:html

  • 什麼是線程
  • 線程與進程的區別
  • 開啓線程的兩種方法
  • 多線程與多進程的區別
  • thread對象的其餘屬性
  • 守護線程
  • gil全局解釋器鎖
  • 死鎖現象與遞歸鎖
  • 信號量,event,定時器
  • 線程queue
  • 進程池與線程池

 

一 什麼是線程python

  線程顧名思義,就是一條流水線工做的過程(流水線的工做須要電源,電源就至關於cpu),而一條流水線必須屬於一個車間,一個車間的工做過程是一個進程,車間負責把資源整合到一塊兒,是一個資源單位,而一個車間內至少有一條流水線。mysql

因此,進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位。git

多線程(即多個控制線程)的概念是,在一個進程中存在多個線程,多個線程共享該進程的地址空間,至關於一個車間內有多條流水線,都共用一個車間的資源。例如,北京地鐵與上海地鐵是不一樣的進程,而北京地鐵裏的13號線是一個線程,北京地鐵全部的線路共享北京地鐵全部的資源,好比全部的乘客能夠被全部線路拉。github

 

二 線程與進程的區別web

  • 同一個進程內的多個線程共享該進程內的地址資源
  • 建立線程的開銷要遠小於建立進程的開銷(建立一個進程,就是建立一個車間,涉及到申請空間,並且在該空間內建至少一條流水線,但建立線程,就只是在一個車間內造一條流水線,無需申請空間,因此建立開銷小)

三 開啓線程的兩種方法sql

threading模塊介紹json

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹服務器

方式一多線程

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')
實例Thread對象

方式二

from threading import Thread
import time

class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)

if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主線程')
繼承Thread類

 

四 多線程與多進程的區別

開啓速度

在主進程下開啓線程,幾乎是t.start ()的同時就將線程開啓了,說明開銷極小

在主進程下開子進程,p.start ()將開啓進程的信號發給操做系統後,操做系統要申請內存空間,讓好拷貝父進程地址空間到子進程,開銷遠大於線程

pid

在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣

開多個進程,每一個進程都有不一樣的pid

數據共享

進程之間地址空間是隔離的

同一進程內開啓的多個線程是共享該進程地址空間的

 

五 thread對象的其餘屬性

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

 

六 守護線程

不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬

須要強調的是:運行完畢並不是終止運行

對主進程來講,運行完畢指的是主進程代碼運行完畢,

對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢,

主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,

主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

 

七 gil全局解釋器鎖

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。>有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL

八 死鎖現象與遞歸鎖

死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

 

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
死鎖

遞歸鎖

 

遞歸鎖,死鎖的解決方案,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

 

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖,兩者的區別是:遞歸鎖能夠連續acquire屢次,而互斥鎖只能acquire一次

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
遞歸鎖

 

 

九 信號量,event,定時器

 

信號量

  信號量也是一把鎖,能夠指定信號量爲5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間能夠有5個任務拿到鎖去執行,若是說互斥鎖是合租房屋的人去搶一個廁所,那麼信號量就至關於一羣路人爭搶公共廁所,公共廁全部多個坑位,這意味着同一時間能夠有多我的上公共廁所,但公共廁所容納的人數是必定的,這即是信號量的大小

 

from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()

if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

#Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;計數器不能小於0;當計數器爲0時acquire()將阻塞線程直到其餘線程調用release()。
View Code

 

event

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

from threading import Event

event.isSet():返回event的狀態值;

event.wait():若是 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。

例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做

 

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連接超時')
        print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>連接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

定時器

定時器,指定n秒後執行某操做

 

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

十 線程queue

有三種不一樣的用法

class queue.Queue(maxsize=0) #隊列:先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
結果(先進先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #堆棧:last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
結果(後進先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0) #優先級隊列:存儲數據時可設置優先級的隊列

 

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())



'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

十一 進程池與線程池

在剛開始學多進程或多線程時,咱們火燒眉毛地基於多進程或多線程實現併發的套接字通訊,然而這種實現方式的致命缺陷是:服務的開啓的進程數或線程數都會隨着併發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,因而咱們必須對服務端開啓的進程數或線程數加以控制,讓機器在一個本身能夠承受的範圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質仍是基於多進程,只不過是對開啓進程的數目加上了限制

介紹

官網:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

基本方法

一、submit(fn, *args, **kwargs)
異步提交任務

二、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做

三、shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前

四、result(timeout=None)
取得結果

五、add_done_callback(fn)
回調函數

進程池

用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())
進程池
 

線程池

用法

把ProcessPoolExecutor換成ThreadPoolExecutor,其他用法所有相同

map方法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
map

回調函數

能夠爲進程池或線程池內的每一個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢後自動觸發,並接收任務的返回值看成參數,該函數稱爲回調函數

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
回調函數
相關文章
相關標籤/搜索