Python全棧開發之併發編程

No.1 線程

什麼是多任務

就是操做系統能夠同時運行多個任務,就是能夠一邊用瀏覽器上網,同時又能夠聽歌,還能再撩個×××姐,這就是多任務,操做系統會輪流把系統調度到每一個核心上去執行python

併發和並行

併發是指任務數多餘cpu核數,經過操做系統的各類任務調度算法,實現多個任務算法

並行是指任務數小於cpu核數,即任務同時執行瀏覽器

單線程

import time

def say_hello(i):
    print('hello ', i)

if __name__ == '__main__':
    for i in range(5):
        say_hello(i)

多線程

import threading
import time

def say_hello(i):
    print('hello ', i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=say_hello,args=(i,))
        t.start() # 當調用start方法時,纔會正真的執行線程

主線程會等待子線程

import threading
import time

def say_hello(name):
    for i in range(5):
        print('hello ', i, name)

if __name__ == '__main__':
    say_hello('主線程')
    t1 = threading.Thread(target=say_hello,args=('t1',))
    t2 = threading.Thread(target=say_hello,args=('t2',))
    t1.start()
    t2.start()
    print('end')
# hello  0 主線程
# hello  1 主線程
# hello  2 主線程
# hello  3 主線程
# hello  4 主線程
# hello  0 t1
# hello  1 t1
# hello  2 t1
# hello  3 t1
# hello  4 t1
# hello  0 t2
# hello  1 t2
# hello  2 t2
# hello  3 t2
# hello  4 t2
# end

查看線程數量

import threading
import time

def say_hello(i):
    print('hello ', i)

if __name__ == '__main__':
    say_hello('主線程')
    for i in range(5):
        t = threading.Thread(target=say_hello,args=(i,))
        t.start()
        while True:
            length = len(threading.enumerate())
            print('當前運行的線程數爲:%d' % length)
            if length <= 1:
                break
# hello  主線程
# hello  0
# 當前運行的線程數爲:2
# 當前運行的線程數爲:1
# hello  1
# 當前運行的線程數爲:1
# hello  2
# 當前運行的線程數爲:1
# hello  3
# 當前運行的線程數爲:1
# hello  4
# 當前運行的線程數爲:1

封裝線程

爲了讓每一個線程的封裝性更加完整,咱們一般會建立一個線程類,讓這個線程類繼承自threading.Thread,而後重寫run方法就能夠了緩存

import threading
import time
class MyThread(threading.Thread):

    def run(self):
        for i in range(5):
            time.sleep(1)
            print(self.name + str(i))

if __name__ == '__main__':
    t = MyThread()
    t.start()

Python的threading.Thread類的run方法,用於定義線程的功能函數,能夠在咱們本身的類中覆蓋該方法,當建立本身的線程類對象後,能夠start方法,進行調度安全

線程的執行順序

線程的執行順序是不肯定的,當執行到sleep語句時,線程將會被阻塞,而後線程進入就緒狀態,等待cpu的調度,線程調度自動選擇一個線程執行多線程

No.2 多線程

多線程共享全局變量

import threading
import time

num = 100
def demo1():
    global num
    for i in range(3):
        num -= 1
        print('num = ',num)

def demo2():
    print('num = ', num)

if __name__ == '__main__':
    print('線程建立以前num = ',num)
    t1 = threading.Thread(target=demo1)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=demo2)
    t2.start()
# 線程建立以前num =  100
# num =  99
# num =  98
# num =  97
# num =  97

在一個進程內的全部線程共享全局變量,能很方便的在多個線程之間共享數據,可是這也帶來一個麻煩,就是線程就全局變量的隨機修改可能會致使多線程之間對於全局變量的的混亂,即線程非安全併發

import threading
import time

num = 100
def demo1():
    global num
    for i in range(1000000):
        # lock.acquire()
        num += 1
        # lock.release()

def demo2():
    global num

    for i in range(1000000):
        # lock.acquire()
        num += 1
        # lock.release()

if __name__ == '__main__':
    print('線程建立以前num = ',num)
    # lock = threading.Lock()
    t1 = threading.Thread(target=demo1)
    t1.start()
    t2 = threading.Thread(target=demo2)
    t2.start()
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print('線程執行完畢num = ',num)
# 線程建立以前num =  100
# 線程執行完畢num =  1559954
兩個線程分別對線程自增了10次,結果倒是122,若是多個線程同時對同一個全局變量操做,會出現資源競爭問題,從而數據結果會不正確

同步

對於多線程非安全的問題,能夠採用同步的方式來解決,每一個線程對數據的修改時,都要先上鎖,處理完成後再解鎖,在上鎖的過程當中不容許任何線程打擾,這樣就能保證線程安全性了,數據也不會不正確app

互斥鎖

當多個線程幾乎同時修改某一個共享數據的時候,須要進程同步控制,線程同步可以保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖,互斥鎖爲資源引入一個狀態,鎖定/非鎖定,某個線程要更改共享數據時,此時資源狀態爲鎖定,其餘線程不能更改,當該線程修改完畢,將資源設置爲非鎖定,互斥鎖保證了每次只能由一個線程進入寫入,從而保證了多線程狀況下數據的正確性dom

# 建立鎖
lock = threading.Lock()

# 鎖定
lock.acquire()

# 釋放
lock.release()

使用互斥鎖對兩個線程對同一個全局變量各加1億次async

import threading
import time

num = 100
def demo1():
    global num
    for i in range(100000000):
        lock.acquire()
        num += 1
        lock.release()

def demo2():
    global num

    for i in range(100000000):
        lock.acquire()
        num += 1
        lock.release()

if __name__ == '__main__':
    print('線程建立以前num = ',num)
    lock = threading.Lock()
    t1 = threading.Thread(target=demo1)
    t1.start()
    t2 = threading.Thread(target=demo2)
    t2.start()
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print('線程執行完畢num = ',num)
# 線程建立以前num =  100
# 線程執行完畢num =  200000100

上鎖解鎖過程

當一個線程調用所的acquire方法得到鎖時,鎖就進入locked狀態,每次只能有一個線程的鎖,若是此時有另一個線程試圖得到鎖,那麼此時這個鎖就會進入阻塞狀態,直到擁有鎖的線程調用release解鎖以後,鎖進入unlocked狀態,其餘線程就能夠得到鎖了

鎖的優缺點

確保了某段關鍵代碼只能由一個線程完整執行,確保了數據的完整性,阻止了多線程併發,使得包含的鎖的代碼只能以單線程執行,效率就大大下降了,還可能發生死鎖

死鎖

在線程共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會造成死鎖

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        # lockA上鎖
        lockA.acquire()
        # 延時1秒,等待另外那個線程 把lockB上鎖
        print(self.name+' A start')
        time.sleep(1)
        # 堵塞,由於這個lockB已經被另外的線程搶先上鎖了
        lockB.acquire()
        print(self.name+' B end')
        lockB.release()
        # lockA解鎖
        lockA.release()

class MyThread2(threading.Thread):
    def run(self):
        # lockB上鎖
        lockB.acquire()
        # 延時1秒,等待另外那個線程 把lockA上鎖
        print(self.name+' B start')
        time.sleep(1)
        # 堵塞,lockA已經被另外的線程搶先上鎖了
        lockA.acquire()
        print(self.name+' A end')
        lockA.release()
        # lockB解鎖
        lockB.release()

if __name__ == '__main__':
    lockA = threading.Lock()
    lockB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
# Thread-1 A start
# Thread-2 B start

如何避免死鎖

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        lockA.acquire()
        print(self.name, 'A', 'start')
        time.sleep(1)
        # 若是在規定時間內能夠上鎖就返回True,反之。。。
        result = lockB.acquire(timeout=1)
        if result:
            print(self.name, 'B', 'start')
            lockA.release()
            print(self.name, 'A', 'end')
            lockB.release()
            print(self.name, 'B', 'end')
        else:
            lockA.release()

class MyThread2(threading.Thread):
    def run(self):
        lockB.acquire()
        print(self.name, 'B', 'start')
        time.sleep(1)
        lockA.acquire()
        print(self.name, 'A', 'start')
        lockA.release()
        print(self.name, 'A', 'end')
        lockB.release()
        print(self.name, 'B', 'end')

if __name__ == '__main__':
    lockA = threading.Lock()
    lockB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

GIL

Python語言和GIL沒有半毛錢關係,僅僅是因爲歷史緣由在Cpython虛擬機,難以移除GIL

GIL,全局解釋器鎖,每一個線程在執行的過程都須要先獲取GIL,保證同一時刻只有一個線程能夠執行代碼

線程釋放GIL鎖的狀況: 在IO操做等可能會引發阻塞的system call以前,能夠暫時釋放GIL,但在執行完畢後,必須從新獲取GIL

Python使用多進程是能夠利用多核的CPU資源的

多線程爬取比單線程性能有提高,由於遇到IO阻塞會自動釋放GIL鎖

No.3 進程

什麼是進程?

一個程序在運行期間,代碼和程序運行所需的資源稱爲進程

進程的狀態

工做中,任務數每每大於cpu核心數,因此必定有一些任務在執行,另一部分是處於等待狀態
就緒態,運行的條件已經知足,等待cpu執行
執行態,cpu正在執行該任務
等待態,等待某些條件知足

建立進程

from multiprocessing import Process
import time

def run_proc():
    while True:
        print('子線程')
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target=run_proc)
    p.start()
    while True:
        print('主線程')
        time.sleep(1)

pid

from multiprocessing import Process
import time
import os
def run_proc():
    # 獲取當前進程的pid
    print('子進程 pid = ',os.getpid())

if __name__ == '__main__':
    print('父進程 pid = ',os.getpid())
    p = Process(target=run_proc)
    p.start()

Process語法結構

Process([group [, target [, name [, args [, kwargs]]]]])
target 若是傳遞了函數的引用,可讓子進程執行內部代碼
args 給target指定的函數傳遞參數,元組方式
kwargs 給target指定的函數傳遞命名參數
name 給進程設定一個名字
group 指定進程組
經常使用方法
start() 啓動子進程
is_alive() 判斷進程是否還在運行
join([timeout]) 是否等待子進程執行結束
terminate() 無論任務是否執行完畢,直接結束

進程不能共享全局變量

from multiprocessing import Process
import time
NUM_LIST = [11,22,33]

def demo1(num_list):
    num_list.append(44)
    print(num_list)

def demo2(num_list):
    num_list.append(55)
    print(num_list)

if __name__ == '__main__':
    p1 = Process(target=demo1,args=(NUM_LIST,))
    p2 = Process(target=demo2,args=(NUM_LIST,))
    p1.start()
    p2.start()
    print(NUM_LIST)
# [11, 22, 33]
# [11, 22, 33, 44]
# [11, 22, 33, 55]

進程、線程對比

定義

進程,可以完成多任務,例如,在一臺電腦上登陸多個QQ客戶端

線程,可以完成多任務,例如,在一臺電腦上和多個妹子聊天

區別

一個程序至少有一個進程,一個進程至少有一個線程,線程的劃分尺度小於進程,是的多線程的併發高於多進程,進程在執行過程當中擁有獨立的內存單元,而線程倒是共享的,線程的運行開銷小,可是不安全,進程和它相反,因此咱們要根據不一樣的場景選擇適合的

進程池

當須要建立的線程數量很少時,咱們能夠直接利用Process動態生成進程,可是當進程數量上百甚至上千時,咱們再採用Process建立進程就能夠猝死了,此時可使用pool,初始化pool時,能夠指定一個最大進程數,當有新的請求提交到pool時,若是線程池沒有滿,那麼會建立一個新的線程來執行該請求,不然,等待其餘進程結束

from multiprocessing import Pool
import time

def func(arg):
  print(arg)
  time.sleep(1)

if __name__ == '__main__':
  pool = Pool(5)
  for i in range(30):
      pool.apply_async(func=func,args=(i,))
  pool.close() # 全部任務執行完畢
  pool.join() # 等待pool中全部子進程執行完成,必須放在close語句以後

pool函數解析

apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args爲傳遞給func的參數列表,kwds爲傳遞給func的關鍵字參數列表
close():關閉Pool,使其再也不接受新的任務;
terminate():無論任務是否完成,當即終止;
join():主進程阻塞,等待子進程的退出, 必須在close或terminate以後使用;

進程池中的queue

from multiprocessing import Pool,Manager
import time
import random

def write(q):
    for i in [11,22,33]:
        if not q.full():
            q.put(i)
            print('put %s to queue' %i)
            time.sleep(random.random())

def read(q):
    while True:
        if not q.empty():
            value = q.get()
            print('get %s to queue' %value)
            time.sleep(random.random())
        else:
            break

if __name__ == '__main__':
    q = Manager().Queue()
    pool = Pool()
    pool.apply_async(write,args=(q,))
    # time.sleep(1)
    pool.apply_async(read,args=(q,))
    pool.close()
    pool.join()

進程間通訊

進程間有時也須要通訊,可使用multiprocessing模塊的Queue實現

初始化Queue()對象時,若括號中沒有指定最大可接收的消息數量,或數量爲負值,那麼就表明可接受的消息數量沒有上限
Queue.qsize() 返回當前隊列包含的消息數量
Queue.empty() 若是隊列爲空,返回True,反之False
Queue.full() 若是隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列中的一條消息,而後將其從列隊中移除,block默認值爲True,若是block使用默認值,且沒有設置timeout,消息列隊若是爲空,此時程序將被阻塞,直到從消息列隊讀到消息爲止,若是設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則出"Queue.Empty"異常,若是block值爲False,消息列隊若是爲空,則會馬上拋出"Queue.Empty"異常
Queue.get_nowait() 至關Queue.get(False)
Queue.put(item,[block[, timeout]] 將item消息寫入隊列,block默認值爲True,若是block使用默認值,且沒有設置timeout,消息列隊若是已經沒有空間可寫入,此時程序將被阻塞,直到從消息列隊騰出空間爲止,若是設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常,若是block值爲False,消息列隊若是沒有空間可寫入,則會馬上拋出"Queue.Full"異常
Queue.put_nowait(item) 至關Queue.put(item, False)

栗子

from multiprocessing import Process,Queue
import time
import random

def write(q):
    for i in [11,22,33]:
        if not q.full():
            q.put(i)
            print('put %s to queue' %i)
            time.sleep(random.random())

def read(q):
    while True:
        if not q.empty():
            value = q.get()
            print('get %s to queue' %value)
            time.sleep(random.random())
        else:
            break

if __name__ == '__main__':
    q = Queue()
    t1 = Process(target=write,args=(q,))
    t2 = Process(target=read,args=(q,))
    t1.start()
    t1.join()
    t2.start()
    t2.join()

No.4 迭代器

迭代是遍歷序列的一種方式,它能夠記住序列的遍歷位置,迭代器從第一個元素開始訪問,只能向前,不能後退

可迭代對象

能夠經過for...in...這類語句迭代的對象稱爲可迭代對象

如何判斷一個對象是否能夠迭代

可使用inistance(obj,Iterable)判斷一個對象是否是iterable對象

可迭代對象的本質

可迭代對象進行迭代的時候,咱們發現沒迭代一次,都會返回對象的中的下一條數據,一直日後讀取數據直到數據所有迭代完成,那麼,這個負責記錄數據迭代到的索引的機制叫作迭代器,可迭代對象經過iter方法向咱們提供一個迭代器,咱們在迭代對象的時候,實際上就是調用該方法獲取了一個迭代器,而後根據迭代器來獲取數據的,也就說,具備iter方法 的對象稱爲可迭代對象

from collections import Iterable

class MyInt(int):
    def __iter__(self):
        pass

if __name__ == '__main__':
    myint = MyInt()
    print(isinstance(myint, Iterable)) # True

iter函數與next函數

可迭代對象經過iter函數獲取迭代器,咱們能夠對獲取到的迭代器不停的使用next函數來獲取下一條數據,當咱們對迭代器使用iter函數就是調用了可迭代對象的iter函數,注意,當咱們迭代玩最後一個數據時,再次調用next函數會拋出StopIterable異常

迭代器iterable

當咱們對迭代器使用next方法的時候,實際上時調用的next函數(Python2是next函數),因此,想構建一個迭代器,就要實現它的nextiter函數,實現了這兩個函數,就是迭代器

class MyIterator(object):
    """自定義的供上面可迭代對象使用的一個迭代器"""
    def __init__(self):
        self.items = []
        self.current = 0 # current用來記錄當前訪問到的位置
    def add(self,value):
        self.items.append(value)

    def __next__(self):
        if self.current < len(self.items):
            item = self.items[self.current]
            self.current += 1
            return item
        else:
            raise StopIteration

    def __iter__(self):
        return self

if __name__ == '__main__':
    mi = MyIterator()
    mi.add(1)
    mi.add(2)
    mi.add(3)
    mi.add(4)
    mi.add(5)
    for num in mi:
        print(num)

for...in...本質

本質就是先經過iter獲取迭代器,在經過迭代器不斷的調用next方法,當遇到異常退出

迭代器應用場景

每次返回的數據不是在一個已有的數據集合中讀取,而是程序經過必定的規律計算生成的,也就是說不用將全部要迭代的數據一次存儲下來提供後續讀取,這樣能夠節省很大空間

class FibIterator(object):
    def __init__(self, n):
        self.n = n
        self.current = 0
        self.num1 = 0
        self.num2 = 1

    def __next__(self):
        if self.current < self.n:
            num = self.num1
            self.num1, self.num2 = self.num2, self.num1+self.num2
            self.current += 1
            return num
        else:
            raise StopIteration

    def __iter__(self):
        return self

if __name__ == '__main__':
    fib = FibIterator(10)
    for num in fib:
        print(num, end=" ")

No.5 生成器

生成器

一邊循環一邊計算的機制,稱爲生成器,生成器是一種特殊的迭代器

建立生成器

將列表生成式的定界符改爲()

G = ( x*2 for x in range(5))
G
<generator object <genexpr> at 0x000001E86BC993B8>

建立生成式和生成器的區別僅在於定界符,L是一個列表,G是一個生成器,咱們能夠直接打印出列表的每一個元素,而對於生成其,咱們能夠按照迭代器的使用方法來使用

使用yield

def fib(n):
    current = 0
    num1, num2 = 0, 1
    while current < n:
        num = num1
        num1, num2 = num2, num1+num2
        current += 1
        yield num

if __name__ == '__main__':
    f = fib(10)
    for i in f:
        print(i)

使用了yield關鍵字的函數就是生成器,yield的做用有兩點,一是保存當前運行狀態,暫停執行,掛起生成器,二是將yield後面的表達式的值做爲返回值返回,可使用next函數讓生成器從斷點處繼續執行,喚醒生成器

使用send喚醒

咱們除了可使用next函數來喚醒生成器,還可使用send函數,使用send還能夠在喚醒的同時從間斷點傳入一個附加數據

def fib(n):
    current = 0
    num1, num2 = 0, 1
    while current < n:
        num = num1
        num1, num2 = num2, num1+num2
        current += 1
        yield num

if __name__ == '__main__':
    f = fib(10)
    print(next(f))
    print(f.send('...'))

No.7 協程

協程概念

協程是什麼

協程是Python中另一種實現多任務的方式,只不是比比線程更小的執行單元,協程自帶CPU上下文,只要在合適的時機,咱們能夠把一個協程切換到另外一個協程,這要在這個過程當中保存或恢復CPU上下文那麼程序仍是能夠運行的,說到這,小夥伴們是否是想到了上文介紹的yield

協程和線程差別

在實現多任務時, 線程切換從系統層面遠不止保存和恢復 CPU上下文這麼簡單。 操做系統爲了程序運行的高效性每一個線程都有本身緩存Cache等等數據,操做系統還會幫你作這些數據的恢復操做。 因此線程的切換很是耗性能。可是協程的切換隻是單純的操做CPU的上下文,因此一秒鐘切換個上百萬次系統都抗的住

簡單實現協程

import time

def work1():
    while True:
        print("----work1---")
        yield
        time.sleep(0.5)

def work2():
    while True:
        print("----work2---")
        yield
        time.sleep(0.5)

def main():
    w1 = work1()
    w2 = work2()
    while True:
        next(w1)
        next(w2)

if __name__ == "__main__":
    main()

greenlet

安裝greenlet pip3 install greenlet

from greenlet import greenlet
import time

def demo1():
    while True:
        print('Demo1 is running')
        gl2.switch()
        time.sleep(1)

def demo2():
    while True:
        print('Demo2 is running')
        gl1.switch()
        time.sleep(1)

if __name__ == '__main__':
    gl1 = greenlet(demo1)
    gl2 = greenlet(demo2)
    gl1.switch() # 切換到gl1執行

gevent

交替運行

import gevent

def demo():
    for i in range(10):
        print(gevent.getcurrent(),i)

if __name__ == '__main__':
    g1 = gevent.spawn(demo,)
    g2 = gevent.spawn(demo,)
    g3 = gevent.spawn(demo,)
    g4 = gevent.spawn(demo,)
    g1.join()
    g2.join()
    g3.join()
    g4.join()

自動切換

import gevent

def demo():
    for i in range(10):
        print(gevent.getcurrent(),i)
        gevent.sleep(1)

if __name__ == '__main__':
    g1 = gevent.spawn(demo,)
    g2 = gevent.spawn(demo,)
    g3 = gevent.spawn(demo,)
    g4 = gevent.spawn(demo,)
    g1.join()
    g2.join()
    g3.join()
    g4.join()

No.7 線程、進程、協程區別

進程是資源分配的單位

線程是操做系統調度的單位

進程切換須要的資源很最大,效率很低

線程切換須要的資源通常,效率通常(固然了在不考慮GIL的狀況下)

協程切換任務資源很小,效率高

多進程、多線程根據cpu核數不同多是並行的,可是協程是在一個線程中,因此是併發

相關文章
相關標籤/搜索