Python併發編程-線程

  Python做爲一種解釋型語言,因爲使用了全局解釋鎖(GIL)的緣由,其代碼不能同時在多核CPU上併發的運行。這也致使在Python中使用多線程編程並不能實現併發,咱們得使用其餘的方法在Python中實現併發編程。編程

1、全局解釋鎖(GIL)

  Python中不能經過使用多線程實現併發編程主要是由於全局解釋鎖的機制,因此首先解釋一下全局解釋鎖的概念。安全

     首先,咱們知道C++和Java是編譯型語言,而Python則是一種解釋型語言。對於Python程序來講,它是直接被輸入到解釋器中直接運行的。解釋器在程序執行以前對其並不瞭解;它所知道的只是Python的規則,以及在執行過程當中怎樣去動態的應用這些規則。它也有一些優化,可是這基本上只是另外一個級別的優化。因爲解釋器無法很好的對程序進行推導,Python的大部分優化實際上是解釋器自身的優化。更快的解釋器天然意味着程序的運行也能「免費」的更快。也就是說,解釋器優化後,Python程序不用作修改就能夠享受優化後的好處。網絡

      爲了利用多核系統,Python必須支持多線程運行。但做爲解釋型語言,Python的解釋器須要作到既安全又高效。解釋器要注意避免在不一樣的線程操做內部共享的數據,同時還要保證在管理用戶線程時保證老是有最大化的計算資源。爲了保證不一樣線程同時訪問數據時的安全性,Python使用了全局解釋器鎖(GIL)的機制。從名字上咱們很容易明白,它是一個加在解釋器上的全局(從解釋器的角度看)鎖(從互斥或者相似角度看)。這種方式固然很安全,但它也意味着:對於任何Python程序,無論有多少的處理器,任什麼時候候都老是隻有一個線程在執行。即:只有得到了全局解釋器鎖的線程才能操做Python對象或者調用Python/C API函數。多線程

      因此,在Python中」不要使用多線程,請使用多進程」。具體來講,若是你的代碼是IO密集型的,使用多線程或者多進程都是能夠的,多進程比線程更易用,可是會消耗更多的內存;若是你的代碼是CPU密集型的,多進程(multiprocessing模塊)就明顯是更好的選擇——特別是所使用的機器是多核或多CPU的時候。併發

      另外,Python的官方實現CPython帶有GIL,但並非全部的Python實現版本都是這樣的。IronPython,Jython,還有使用.NET框架實現的Python就沒有GIL。因此若是你不能忍受GIL,也能夠嘗試用一下其餘實現版本的Python。app

  

 

  若是是一個計算型的任務,GIL就會讓多線程變慢。咱們舉個計算斐波那契數列的例子:框架

import time
import threading

def text(name):
    def profile(func):
        def wrapper(*args,**kwargs):
            start = time.time()
            res = func(*args,**kwargs)
            end = time.time()
            print('{} cost:{}'.format(name,end-start))
            return res
        return wrapper
    return profile


def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)


@text('nothread')
def nothread():
    fib(35)
    fib(35)


@text('hasthread')
def hasthread():
    for i in range(2):
        t = threading.Thread(target=fib,args=(35,))
        t.start()
    main_thread = threading.current_thread()
    for t in threading.enumerate():
        if t is main_thread:
            continue
        t.join()

nothread()
hasthread()

##輸出結果###
nothread cost:6.141353607177734
hasthread cost:6.15336275100708
View Code

  這種狀況還不如不用多線程!dom

  GIL是必須的,這是Python設計的問題:Python解釋器是非線程安全的。這意味着當從線程內嘗試安全的訪問Python對象的時候將有一個全局的強制鎖。 在任什麼時候候,僅僅一個單一的線程可以獲取Python對象或者C API。每100個字節的Python指令解釋器將從新獲取鎖,這(潛在的)阻塞了I/O操做。由於鎖,CPU密集型的代碼使用線程庫時,不會得到性能的提升。ide

  那是否是因爲GIL的存在,多線程庫就是個「雞肋」呢?固然不是。事實上咱們平時會接觸很是多的和網絡通訊或者數據輸入/輸出相關的程序,好比網絡爬蟲、文本處理等等。這時候因爲網絡狀況和I/O的性能的限制,Python解釋器會等待讀寫數據的函數調用返回,這個時候就能夠利用多線程庫提升併發效率了。函數

2.同步機制

  A. Semaphore(信號量)

  在多線程編程中,爲了防止不一樣的線程同時對一個公用的資源(好比所有變量)進行修改,須要進行同時訪問的數量(一般是1)的限制。信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。

import time
from random import random
from threading import Thread,Semaphore,current_thread,enumerate

sema = Semaphore(3)

def foo(tid):
    with sema:
        print('{} acquire sema'.format(tid))
        wt = random() * 2

        time.sleep(wt)
    print('{} release sema'.format(tid))

for i in range(5):
    t = Thread(target=foo,args=(i,))
    t.start()

main_thread = current_thread()
for t in enumerate():
    if t is main_thread:
        continue
    t.join()

####輸出結果#####
0 acquire sema
1 acquire sema
2 acquire sema
0 release sema
3 acquire sema
1 release sema
4 acquire sema
2 release sema
3 release sema
4 release sema
View Code

  B. Lock(互斥鎖)

  Lock也能夠叫作互斥鎖,其實至關於信號量爲1。咱們先看一個不加鎖的例子:

import time
import threading

value = 0

def getlock():
    global value
    new = value + 1
    time.sleep(0.001)  # 讓線程有機會切換
    value = new

for i in range(100):
    t = threading.Thread(target=getlock)
    t.start()

main_thread = threading.current_thread()

for t in threading.enumerate():
    if t == main_thread:
        continue
    t.join()

print(value)

####輸出結果#####
不肯定(刷新值會發生改變)
View Code

  如今,咱們來看看加鎖以後的狀況:

import time
import threading

value = 0
lock = threading.Lock()

def getlock():
    global value
    with lock:
        new = value + 1
        time.sleep(0.001)  # 讓線程有機會切換
        value = new

for i in range(100):
    t = threading.Thread(target=getlock)
    t.start()

main_thread = threading.current_thread()

for t in threading.enumerate():
    if t == main_thread:
        continue
    t.join()

print(value)

####輸出結果爲#############
100
View Code

  咱們對value的自增長了鎖,就能夠保證告終果了。

3. RLock(遞歸鎖)

  先來講說死鎖,所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。

import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()
        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()
產生死鎖

  解決方案:

import threading
import time

mutex = threading.RLock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        mutex.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutex.release()
        mutex.release()


    def fun2(self):
        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutex.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutex.release()

        mutex.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()
View Code

  遞歸鎖內部維護了一個計數器,當有線程拿到了Lock之後,這個計數器會自動加1,只要這計數器的值大於0,那麼其餘線程就不能搶到改鎖,這就保證了,在同一時刻,僅有一個線程使用該鎖,從而避免了死鎖的方法。關於遞歸鎖內部實現,有興趣的能夠看看源碼。

4. Condition(條件)

  一個線程等待特定條件,而另外一個線程發出特定條件知足的信號。最好說明的例子就是「生產者/消費者」模型:

import time
import threading

def consumer(cond):
    t = threading.current_thread()
    with cond:
        cond.wait()  # 建立了一個鎖,等待producer解鎖
        print('{}: Resource is available to consumer'.format(t.name))

def producer(cond):
    t = threading.current_thread()
    with cond:
        print('{}:Making resource available'.format(t.name))
        cond.notifyAll()  # 釋放鎖,喚醒消費者

condition = threading.Condition()

c1 = threading.Thread(name='c1',target=consumer,args=(condition,))
p = threading.Thread(name='p',target=producer,args=(condition,))
c2 = threading.Thread(name='c2',target=consumer,args=(condition,))


c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()

5. Event

  一個線程發送/傳遞事件,另外的線程等待事件的觸發。咱們一樣的用「生產者/消費者」模型的例子:

import time
import threading
from random import randint

TIMEOUT = 2

def consumer(event, l):
    t = threading.currentThread()
    while 1:
        event_is_set = event.wait(TIMEOUT)
        if event_is_set:
            try:
                integer = l.pop()
                print('{} popped from list by {}'.format(integer,t.name))
                event.clear()  # 重置狀態
            except IndexError:
                pass

def producer(event, l):
    t = threading.currentThread()
    while 1:
        integer = randint(10,100)
        l.append(integer)
        print('{} append to list by {}'.format(integer, t.name))
        event.set()
        time.sleep(1)

event = threading.Event()

l = []
threads = []

p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)

for name in ('consumer1','consumer2'):
    t = threading.Thread(target=consumer, name=name, args=(event, l))
    t.start()
    threads.append(t)


for t in threads:
    t.join()
print('ending')

  能夠看到事件被2個消費者比較平均的接收並處理了。若是使用了wait方法,線程就會等待咱們設置事件,這也有助於保證任務的完成。

6. Queue

  隊列在併發開發中最經常使用的。咱們藉助「生產者/消費者」模式來理解:生產者把生產的「消息」放入隊列,消費者從這個隊列中對去對應的消息執行。

你們主要關心以下4個方法就行了:

  1. put: 向隊列中添加一個消息。

  2. get: 從隊列中刪除並返回一個消息。

  3. task_done: 當某一項任務完成時調用。

  4. join: 阻塞直到全部的項目都被處理完。

import time
import threading
import random
import queue

q = queue.Queue()

def double(n):
    return n*2

def producer():
    while 1:
        wt = random.randint(1,10)
        time.sleep(random.random())
        q.put((double, wt))

def consumer():
    while 1:
        task, arg = q.get()
        print(arg, task(arg))

        q.task_done()

for target in (producer, consumer):
    t = threading.Thread(target=target)
    t.start()

  Queue模塊還自帶了PriorityQueue(帶有優先級)和LifoQueue(先進先出)2種特殊隊列。咱們這裏展現下線程安全的優先級隊列的用法,
  PriorityQueue要求咱們put的數據的格式是(priority_number, data),咱們看看下面的例子:

 

import time
import threading
from random import randint
import queue

q = queue.PriorityQueue()
def double(n):
    return n * 2

def producer():
    count = 0
    while 1:
        if count > 5:
            break
        prit = randint(0,100)
        print("put :{}".format(prit))
        q.put((prit, double, prit))  # (優先級,函數,參數)
        count += 1

def consumer():
    while 1:
        if q.empty():
            break
        pri,task,arg = q.get()
        print('[PRI:{}] {} * 2 = {}'.format(pri,arg,task(arg)))
        q.task_done()
        time.sleep(0.1)

t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()

7.線程池

  面向對象開發中,你們知道建立和銷燬對象是很費時間的,由於建立一個對象要獲取內存資源或者其它更多資源。無節制的建立和銷燬線程是一種極大的浪費。那咱們可不能夠把執行完任務的線程不銷燬而重複利用呢?彷彿就是把這些線程放進一個池子,一方面咱們能夠控制同時工做的線程數量,一方面也避免了建立和銷燬產生的開銷。

import time
import threading
from random import random
import queue


def double(n):
    return n * 2


class Worker(threading.Thread):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self._q = queue
        self.daemon = True
        self.start()

    def run(self):
        while 1:
            f, args, kwargs = self._q.get()
            try:
                print('USE:{}'.format(self.name))
                print(f(*args, **kwargs))
            except Exception as e:
                print(e)
            self._q.task_done()


class ThreadPool(object):
    def __init__(self, max_num=5):
        self._q = queue.Queue(max_num)
        for _ in range(max_num):
            Worker(self._q)  # create worker thread

    def add_task(self, f, *args, **kwargs):
        self._q.put((f, args, kwargs))

    def wait_compelete(self):
        self._q.join()

pool = ThreadPool()
for _ in range(8):
    wt = random()
    pool.add_task(double, wt)
    time.sleep(wt)

pool.wait_compelete()
相關文章
相關標籤/搜索