多任務處理方式之二:多線程

線程的理解

  • 一、操做系統可以進行運算調度的最小單位,即程序執行的最小單位java

  • 二、進程負責程序所必須的資源分配(文本區域、數據區域、堆棧區域)一個進程中也常常須要同時作多件事,即要同時運行多個‘子任務’,這些子任務即線程python

    線程是每個進程中的單一順序控制流 ,其包含在進程中,是進程的實際運做單位(進程是線程的容器)linux

    ⼀個程序中⾄少要有⼀個進程,⼀個進程中⾄少要有⼀個線程編程

    線程不可以獨⽴執⾏,必須依存在進程中windows

  • 三、線程基本不佔用系統資源,其只擁有在運行過程當中必不可少的資源(如程序計數器、一組寄存器和棧)安全

  • 四、同一個進程中的全部線程都共享此進程所擁有的所有資源多線程

    一個進程中的線程共享相同的內存單元/內存地址空間——>能夠訪問相同的變量和對象,並且它們從同一堆中分配對象——>通訊、數據交換、同步操做併發

  • 五、線程之間的通訊主要經過共享所屬進程的資源app

    線程間的通訊是在同一地址空間上進行的,因此不須要額外的通訊機制,這就使得通訊更簡便並且信息傳遞的速度也更快dom

  • 六、線程的上下文切換很快,資源開銷較少,可是相對於進程而言,不夠安全,在多個線程共同操做進程的某一資源時,可能會丟失數據

  • 七、線程和進程之間的區別


線程的五種狀態

  1. 新狀態:線程對象已經建立,還未調用 start() 方法。
  1. 可運行狀態:當線程有資格運行,但調度程序尚未把它選定爲運行線程時線程所處的狀態。當 start()方法調用時,線程首先進入可運行狀態。在線程運行以後或者從阻塞、等待或睡眠狀態回來後,也返回到可運行狀態。
  1. 運行狀態:線程調度程序從可運行池中選擇一個線程做爲當前線程時線程所處的狀態。這也是線程進入運行狀態的惟一方式。
  1. 等待/阻塞/睡眠狀態:這是線程有資格運行時它所處的狀態。實際上這個三狀態組合爲一種,其共同點是:線程仍舊是活的(可運行的),可是當前沒有條件運行。可是若是某件事件出現,他可能返回到可運行狀態。
  1. 死亡態:當線程的run()方法完成時就認爲它死去。這個線程對象也許是活的,可是,它已經不是一個單獨執行的線程。線程一旦死亡,就不能復生。若是在一個死去的線程上調用 start()方法,會拋出 java.lang.IllegalThreadStateException 異常。

GIL全局解釋器鎖

Python中的多線程能夠併發,但不能並行(同一個進程下的多個線程不能被多個cpu同時執行),原因就是GIL全局解釋器鎖,致使同一時間內只有一個線程在執行

python 文件的執行流程爲:

  • 操做系統先將python解釋器和須要執行的py文件由硬盤加載到內存中,開闢一個進程空間
  • 此進程即便得python解釋器首先將py文件中的代碼指令經過編譯器編譯成字節碼
  • 編譯完成的c的字節碼經過虛擬機轉換爲機器碼由cpu執行

這個執行流程便是py文件執行進程中的主線程

若Python中的多線程並行,則每一個線程都要執行上述過程,從而同一時間須要多個CPU同時執行轉換而來的機器碼,極大限度的提升執行效率。但衆所周知,Python是由荷蘭人吉多·範羅蘇姆 (Guido van Rossum)於1989年聖誕節期間開發的一個新的腳本解釋程序,而雙核cpu是2005年才被廣泛應用的,即在當時的條件下,Cpython執行多線程時應用不了多核。故爲了避免多個線程併發執行而形成數據的不完整以及線程的不安全,龜叔在python的解釋器中加上了互斥鎖——全局解釋器鎖(GIL鎖),即便得Cpython在全部線程進入解釋器以前加了一個全局解釋器鎖,當執行完當前py文件後才釋放該鎖,這便致使了python中同一時間內只有一個線程在執行

注:若想使得多線程並行,能夠用多進程間接實現線程的並行,或者更換解釋器爲Pypy、Ppython


線程建立

使用python中的threading模塊中的Thread類建立線程

from threading import Thread

threading模塊提供的Thread類來建立線程對象
from threading import Thread 
import os


def func(num):
    print('當前線程{},所歸屬的進程id號{}'.format(os.getpid(), num))


for i in range(10):
    # 異步建立10個子線程
    t = Thread(target=func, args=(i,))
    t.start()

# 主線程執行任務
print(os.getpid())
自定義類繼承Thread類,每次實例化這個類的時候,就等同於實例化線程對象

這種方法付只須要重寫 threading.Thread 類的 run 方法,而後調用 start() 開啓線程就能夠了

from threading import Thread
import time 


class MyThread(Thread):
    def __init__(self, name):
        # 手動調用父類的構造方法
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print("當前線程正在執行runing ... ", self.name)


if __name__ == "__main__":
    t = MyThread("機器今天會再次爆炸麼?")
    t.start()
    print("主線程執行結束 ... ")

Thread 類中的基本方法

  • t.is_alive() 檢測線程是否仍然存在

  • t.setName() 設置線程名字

  • t.getName() 獲取線程名字

from threading import Thread
import time


def func():
    time.sleep(1)


if __name__ == "__main__":
    t = Thread(target=func)

    t.start()
    print(t , type(t))
    
    print(t.is_alive())  # False
    
    print(t.getName())
    
    t.setName("xboyww")
    print(t.getName())
  • currentThread().ident 查看線程id號
  • enumerate() 返回目前正在運行的線程列表
  • activeCount() 返回目前正在運行的線程數量
from threading import Thread
import time
from threading import currentThread
from threading import enumerate
from threading import activeCount


# 1.currentThread().ident 查看線程id號

def func():
    print("子線程id", currentThread().ident, os.getpid())


if __name__ == "__main__":
    Thread(target=func).start()
    print("主線程id", currentThread().ident, os.getpid())



# 2.enumerate()        返回目前正在運行的線程列表

def func():
    print("子線程id", currentThread().ident, os.getpid())
    time.sleep(0.5)


if __name__ == "__main__":
    for i in range(10):
        Thread(target=func).start()
    lst = enumerate()
    # 子線程10 + 主線程1個 = 11
    print(lst ,len(lst))


    # 3.activeCount()      返回目前正在運行的線程數量
    print(activeCount())

線程池(ThreadPoolExecutor)

默認若是一個線程短期內能夠完成更多的任務,就不會建立額外的新的線程,以節省資源

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread


def func(i):
    print("thread ... start", cthread().ident, i)
    time.sleep(3)
    print("thread ... end", i)
    return cthread().ident


if __name__ == "__main__":
    lst = []
    setvar = set()
    # (1) 建立線程池對象
    """限制線程池最多建立os.cpu_count() * 5 = 線程數,全部任務全由這幾個線程完成,不會額外建立線程"""
    tp = ThreadPoolExecutor()  # 個人電腦40個線程併發

    # (2) 異步提交任務
    for i in range(100):
        res = tp.submit(func, i)
        lst.append(res)

    # (3) 獲取返回值
    for i in lst:
        setvar.add(i.result())

    # (4) 等待全部子線程執行結束
    tp.shutdown()

    print(len(setvar), setvar)
    print("主線程執行結束 ... ")

守護線程

守護線程 : 等待全部線程所有執行完畢以後,再本身終止,守護的是全部線程

線程對象.setDaemon(True)

from threading import Thread
import time


def func1():
    while True:
        time.sleep(0.5)
        print("我是func1")


def func2():
    print("我是func2 start ... ")
    time.sleep(3)
    print("我是func2 end ... ")


t1 = Thread(target=func1)
t2 = Thread(target=func2)

# 在start調用以前,設置守護線程
t1.setDaemon(True)

t1.start()
t2.start()

print("主線程執行結束 ... ")

同步 & 異步

同步

同步意味着順序、統一的時間軸

  • 場景1:是指完成事務的邏輯,先執行第一個事務,若是阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,協同步調,按預約的前後次序進行運行

  • 場景2:一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列

異步

異步則意味着亂序、效率優先的時間軸

  • 處理調用這個事務以後,不會等待這個事務的處理結果,直接處理第二個事務去了,經過狀態、回調來通知調用者處理結果

  • 對於I/O相關的程序來講,異步編程能夠大幅度的提升系統的吞吐量,由於在某個I/O操做的讀寫過程當中,系統能夠先去處理其它的操做(一般是其它的I/O操做)

  • 不肯定執行順序


阻塞 & 非阻塞

阻塞

程序中有了IO操做,就會發生阻塞,必需要輸入/輸出一個字符串,不然代碼不往下執行

非阻塞

程序中沒有任何耗時操做,無需等待正常往下執行

同步阻塞 :效率低,cpu利用不充分
異步阻塞 :好比socketserver,能夠同時鏈接多個,可是彼此都有recv
同步非阻塞:沒有相似input的代碼,從上到下執行.默認的正常狀況代碼
異步非阻塞:效率是最高的,cpu過分充分,過分發熱, 需液冷


串行 & 並行 & 併發

假設有A、B兩個任務,則串行、並行、併發的區別如圖所示。

串行

A和B兩個任務運行在一個CPU線程上,在A任務執行完以前不能夠執行B。即,在整個程序的運行過程當中,僅存在一個運行上下文,即一個調用棧一個堆。程序會按順序執行每一個指令

並行

並行指兩個或兩個以上任務同一時刻被不一樣的cpu執行。在多道程序環境下,並行性使多個程序同一時刻可在不一樣CPU上同時執行。好比,A和B兩個任務能夠同時運行在不一樣的CPU線程上,效率較高,但受限於CPU線程數,若是任務數量超過了CPU線程數,那麼每一個線程上的任務仍然是順序執行的。

併發

併發指多個線程在宏觀(相對於較長的時間區間而言)上表現爲同時執行,而其實是輪流穿插着執行,併發的實質是一個物理CPU在若干道程序之間多路複用,其目的是提升有限物理資源的運行效率。 併發與並行串行並非互斥的概念,若是是在一個CPU線程上啓用併發,那麼天然就仍是串行的,而若是在多個線程上啓用併發,那麼程序的執行就能夠是既併發

圖示


線程同步

因爲一個進程中的多個線程享進程中的資源,因此可能形成多個線程同時修改一個變量的狀況(即線程⾮安全),可能形成數據混亂,故須要進⾏同步控制,即線程同步

能夠經過延時肯定多線程的執行順序,但不推薦。

import threading
import time


def work1(nums):
    nums.append(44)
    print('-----in work1-----', nums)


def work2(nums):
    time.sleep(1)
    # 延時一會保證另外一線程執行
    print('-----in work2-----', nums)


g_nums = [11, 22, 33]
t1 = threading.Thread(target=work1, args=(g_nums,))
t1.start()
t2 = threading.Thread(target=work2, args=(g_nums,))
t2.start()

互斥鎖(threading模塊中定義的Lock類)

互斥鎖保證了每次只有⼀個線程操做共享數據,從⽽保證了多線程狀況下數據的安全性(原子性),能夠實現線程同步

互斥鎖爲資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態爲「鎖定」狀態,其餘線程不能更改;直到該線程釋放資源,將資源的狀態變成「非鎖定」,其餘的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程操做共享數據,從而保證了多線程狀況下數據的安全性。

儘可能使用一把鎖解決問題,不要互相嵌套,不然容易死鎖

import threading

num = 0


def test1():
    global num
    
    # 調用Lock對象的acquire()方法得到鎖時,這把鎖進入「locked」狀態
    # 若是此時另外一個線程2試圖得到這個鎖,該線程2就會變爲同步阻塞狀態
    if mutex.acquire():
        for i in range(1000):
            num += 1
            
    # 調用Lock對象的release()方法釋放鎖以後,該鎖進入「unlocked」狀態。
    mutex.release()


def test2():
    global num
    
    # 線程調度程序繼續從處於同步阻塞狀態的線程中選擇一個來得到鎖,並使得該線程進入運行(running)狀態
    if mutex.acquire():
        for i in range(1000):
            num += 1
    mutex.release()


mutex = threading.Lock()
p1 = threading.Thread(target=test1)
p1.start()
p2 = threading.Thread(target=test2)
p2.start()
print(num)
死鎖(只上鎖,不解鎖)

在多個線程間共享多個資源的時候, 若是兩個線程分別佔有⼀部分資源而且同時等待對⽅的資源, 就會形成死鎖

在多線程程序中,死鎖問題很大一部分是因爲線程同時獲取多個鎖形成的。如一個線程獲取了第一個鎖,而後在獲取第二個鎖的時候發生阻塞,那麼這個線程就可能阻塞其餘線程的執行,從而致使整個程序假死

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        # 線程1被 A 鎖——>鎖定
        if mutexA.acquire():
            print(self.name + '---do1---up---')
            time.sleep(1)
            if mutexB.acquire():
                print(self.name + '---do1---down---')
                mutexB.release()
                
        # 線程1被 A 鎖釋放的前提是:線程1 搶到 B 鎖
        mutexA.release()


class MyThread2(threading.Thread):
    def run(self):
        time.sleep(1)
        # 線程2被 B 鎖——>鎖定
        if mutexB.acquire():
            print(self.name + '---do2---up---')
            if mutexA.acquire():
                print(self.name + '---do2---down---')
            	mutexA.release()
                
		# 線程2被 B 鎖釋放的前提是:線程2 搶到 A 鎖
        mutexB.release()


if __name__ == '__main__':
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
    
    
# Thread-1---do1---up---
# Thread-2---do2---up---
# 程序卡死

# 線程1不釋放A鎖
# 線程2不釋放B鎖
遞歸鎖(threading模塊中定義的RLock類)

用於快速解決項目因死鎖問題不能正常運行的場景,用來處理異常死鎖的

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        if mutexA.acquire():
            print(self.name + '---do1---up---')
            time.sleep(1)
            if mutexB.acquire():
                print(self.name + '---do1---down---')
                mutexB.release()
        mutexA.release()


class MyThread2(threading.Thread):
    def run(self):
        time.sleep(1)
        if mutexB.acquire():
            print(self.name + '---do2---up---')
            if mutexA.acquire():
                print(self.name + '---do2---down---')
            	mutexA.release()

        mutexB.release()


if __name__ == '__main__':
    mutexA = threading.RLock()
    mutexB = threading.RLock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
    
    
# Thread-1---do1---up---
# Thread-1---do1---down---
# Thread-2---do2---up---
# Thread-2---do2---down---

信號量(threading模塊中定義的Semaphore類)

信號量 semaphore:用於控制同一時間內能夠操做進程資源的線程數量的一把鎖,簡言之信號量是用來控制線程併發數的一把鎖,也能夠實現線程同步

使用場景:在讀寫文件的時候,通常只有一個線程在寫,而讀能夠有多個線程同時進行,若是須要限制同時讀文件的線程個數,這時候就能夠用到信號量了(若是用互斥鎖,就是限制同一時刻只能有一個線程讀取文件)

import time
import threading


def foo(se):
    se.acquire()
    time.sleep(2)
    print("ok")
    se.release()


if __name__ == "__main__":
    # 設置同一時間內能夠有5個線程併發
    se = threading.Semaphore(5)
    
    for i in range(20):
        t1 = threading.Thread(target=foo, args=(se,))
        t1.start()  # 此時能夠控制同時進入的線程數

線程隊列(queue模塊)

經過3種類型的隊列來實現線程同步,都實現了鎖原語(能夠理解爲原⼦操做, 即要麼不作, 要麼就作完) , 可以在多線程中直接使⽤

queue.Queue:FIFO(先⼊先出) 隊列 Queue
# 基本使用
from queue import Queue

# put 存
# get 取
# put_nowait 存,超出了隊列長度,報錯
# get_nowait 取,沒數據取不出來,報錯


# linux windows 線程中put_nowait,get_nowait都支持

"""先進先出,後進後出"""
# maxsize爲一個整數,表示隊列的最大條目數,可用來限制內存的使用。
# 一旦隊列滿,插入將被阻塞直到隊列中存在空閒空間。若是maxsize小於等於0,隊列大小爲無限。maxsize默認爲0

q = Queue(maxsize=0)
q.put(1)
q.put(2)
print(q.get())
print(q.get())
# 取不出來,阻塞
# print(q.get())
print(q.get_nowait())


q2 = Queue(3)
q2.put(11)
q2.put(22)
q2.put(33)
# 放不進去了,阻塞
# q2.put(44)
q2.put_nowait(44)
import threading
import time
from queue import Queue


class Pro(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count = count + 1
                    msg = '生成產品' + str(count)
                    queue.put(msg)  # 隊列中添加新產品
                    print(msg)
            time.sleep(1)


class Con(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + '消費了' + queue.get()
                    print(msg)
            time.sleep(1)


if __name__ == "__main__":
    queue = Queue()
    # 建立一個隊列。線程中能用,進程中不能使用
    for i in range(500):  # 建立500個產品放到隊列裏
        queue.put('初始產品' + str(i))  # 字符串放進隊列
        for i in range(2):  # 建立了兩個線程
            p = Pro()
            p.start()
        for i in range(5):  # 5個線程
            c = Con()
            c.start()
queue.LifoQueue:LIFO(後⼊先出) 棧 LifoQueue
# LifoQueue 先進後出,後進先出(按照棧的特色設計)

from queue import LifoQueue


lq = LifoQueue(3)
lq.put(11)
lq.put(22)
lq.put(33)
# print(lq.put_nowait(444))

print(lq.get())
print(lq.get())
print(lq.get())
queue.PriorityQueue:(優先級隊列) PriorityQueue
# PriorityQueue 按照優先級順序排序 (默認從小到大排序)

from queue import PriorityQueue


# 若是都是數字,默認從小到大排序
pq = PriorityQueue()
pq.put(13)
pq.put(3)
pq.put(20)
print(pq.get())
print(pq.get())
print(pq.get())

# 若是都是字符串
"""若是是字符串,按照ascii編碼排序"""
pq1 = PriorityQueue()
pq1.put("chinese")
pq1.put("america")
pq1.put("latinos")
pq1.put("blackman")

print(pq1.get())
print(pq1.get())
print(pq1.get())
print(pq1.get())

# 要麼全是數字,要麼全是字符串,不能混合 error
"""
pq2 = PriorityQueue()
pq2.put(13)
pq2.put("aaa")
pq2.put("擬稿")
"""

pq3 = PriorityQueue()
# 默認按照元組中的第一個元素排序
pq3.put( (20,"wangwen") )
pq3.put( (18,"wangzhen") )
pq3.put( (30,"weiyilin") )
pq3.put( (40,"xiechen") )

print(pq3.get())
print(pq3.get())
print(pq3.get())
print(pq3.get())

生產消費者模式

  • 進程(線程)之間若是直接通訊,可能會出現兩個問題

    • 耦合性太強
    • 速率有可能不匹配

    解決方式,找一個緩衝區來中轉數據即生產者——消費者模式


線程異步

經過回調函數能夠實現多線程異步執行

回調函數:
把函數當成參數傳遞給另一個函數
在當前函數執行完畢以後,最後調用一下該參數(函數),這個函數就是回調函數

功能:
打印狀態: a屬性
支付狀態: b屬性
退款狀態: c屬性
轉帳的狀態: d屬性
把想要的相關成員或者相關邏輯寫在自定義的函數中
支付寶接口在正常執行以後,會調用自定義的函數,來執行相應的邏輯
那麼這個函數就是回調函數

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread as cthread
import os, time


def func1(i):
    print("Process start ... ", os.getpid())
    time.sleep(0.5)
    print("Process end ... ", i)
    return "*" * i


def func2(i):
    print("thread start ... ", cthread().ident)
    time.sleep(0.5)
    print("thread end ... ", i)
    return "*" * i


def call_back1(obj):
    print("<==回調函數callback進程號:===>", os.getpid())
    print(obj.result())


def call_back2(obj):
    print("<==回調函數callback線程號:===>", cthread().ident)
    print(obj.result())


# (1) 進程池的回調函數: 由主進程執行調用完成

if __name__ == "__main__":
    p = ProcessPoolExecutor(5)
    for i in range(1, 11):
        res = p.submit(func1, i)
        # 進程對象.add_done_callback(回調函數) 
        '''
        add_done_callback 能夠把res本對象和回調函數自動傳遞到函數裏來
        '''
        res.add_done_callback(call_back1)
    p.shutdown()
    print("主進程執行結束 ... ", os.getpid())



# (2) 線程池的回調函數: 由當前子線程執行調用完成
if __name__ == "__main__":
    tp = ThreadPoolExecutor(5)
    for i in range(1, 11):
        res = tp.submit(func2, i)
        # 線程對象.add_done_callback(回調函數) 
        '''
        add_done_callback 能夠把res本對象和回調函數自動傳遞到函數裏來
        '''
        res.add_done_callback(call_back2)
    tp.shutdown()
    print("主線程執行結束 ... ", cthread().ident)
from multiprocessing import Pool
import random
import time


def download(f):
    for i in range(1, 4):
        print(f"{f}下載文件{i}")
        time.sleep(random.randint(1, 3))
    return "下載完成"


def alterUser(msg):
    print(msg)


if __name__ == "__main__":
    p = Pool(3)
    # 當func執行完畢後,return的東西會給到回調函數callback
    p.apply_async(func=download, args=("線程1",), callback=alterUser)
    p.apply_async(func=download, args=("線程2",), callback=alterUser)
    p.apply_async(func=download, args=("線程3",), callback=alterUser)
    p.close()
    p.join()
相關文章
相關標籤/搜索