線程鎖&&信號量&&GIL&&線程定時器&&進程池與線程池&&協程

1、線程鎖(同步鎖)(線程的互斥鎖)

1.1線程鎖

from threading import Thread,Lock
x = 0
mutex = Lock()
def task():
    global x
    for i in range(200000):
        x = x+1
        """
        因爲沒有進程鎖,程序切換比較快,例如:
            t1 的x 剛拿到0   x = 0+1  還沒進行運算,保存了狀態,就被cpu切換
            t2 的 x 拿到0   進行了運算  x = 1
            又切換到t1,x = 0+1    x = 1
            產生數據安全的問題,因此結果就會隨機
        """
if __name__ == '__main__':
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t3 = Thread(target=task)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    print(x)
#################
337566            #比較隨機,正確的應該是600000
from threading import Thread, Lock

x = 0
mutex = Lock()


def task():
    mutex.acquire()  # 線程鎖
    global x
    for i in range(200000):
        x = x + 1
    mutex.release()


if __name__ == '__main__':
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t3 = Thread(target=task)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    print(x)
##########################
600000

1.2死鎖

from threading import Thread,Lock
import time
mutex1 = Lock()
mutex2 = Lock()
class MyTreada(Thread):
     def run(self):
         self.task1()
         self.task2()
     def task1(self):
         mutex1.acquire()
         print(f'{self.name}搶到了鎖1')
         mutex2.acquire()
         print(f'{self.name}搶到了鎖2')
         mutex2.release()
         print(f'{self.name}釋放了鎖1')
         mutex1.release()
         print(f'{self.name}釋放了鎖2')
     def task2(self):
         mutex2.acquire()
         print(f'{self.name}搶到了鎖2')
         time.sleep(1)
         mutex1.acquire()
         print(f'{self.name}搶到了鎖2')
         mutex2.release()
         print(f'{self.name}釋放了鎖2')
         mutex1.release()
         print(f'{self.name}釋放了鎖1')
for i in range(3):
    t = MyTreada()
    t.start()
##################
Thread-1搶到了鎖1
Thread-1搶到了鎖2
Thread-1釋放了鎖1
Thread-1釋放了鎖2
Thread-1搶到了鎖2
Thread-2搶到了鎖1##########下面的線程被阻塞
#############################
"""
倆個線程
線程1拿到了鎖頭2,要想往下執行須要鎖頭1
線程2拿到了鎖頭1,想要往下執行須要鎖頭2
互相拿到了彼此往下執行的必要條件(鎖頭),互相不放棄手裏的鎖頭,出現阻塞
"""

1.3解決死鎖方法(遞歸鎖)

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。安全

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
from threading import Thread,RLock
"""
遞歸鎖:在同一個線程內能夠被屢次acquire
釋放:內部至關於維護了一個計數器,也就是說同一個線程acquire了幾回就要release()幾回

"""
import time

mutex1 = RLock()
mutex2 = mutex1

class MyThreada(Thread):
    def run(self):
        self.task1()
        self.task2()
    def task1(self):
        mutex1.acquire()
        print(f'{self.name}搶到了鎖1')
        mutex2.acquire()
        print(f'{self.name}搶到了鎖2')
        mutex2.release()
        print(f'{self.name}釋放了鎖2')
        mutex1.release()
        print(f'{self.name}釋放了鎖1')
    def task2(self):
        mutex2.acquire()
        print(f'{self.name}搶到了鎖2')
        time.sleep(1)
        mutex1.acquire()
        print(f'{self.name}搶到了鎖1')
        mutex1.release()
        print(f'{self.name}釋放了鎖1')
        mutex2.release()
        print(f'{self.name}釋放了鎖2')
for i in range(3):
    t = MyThreada()
    t.start()
#############################
Thread-1搶到了鎖1
Thread-1搶到了鎖2
Thread-1釋放了鎖2
Thread-1釋放了鎖1
Thread-1搶到了鎖2
Thread-1搶到了鎖1
Thread-1釋放了鎖1
Thread-1釋放了鎖2
Thread-2搶到了鎖1
Thread-2搶到了鎖2
Thread-2釋放了鎖2
Thread-2釋放了鎖1
Thread-2搶到了鎖2
Thread-2搶到了鎖1
Thread-2釋放了鎖1
Thread-2釋放了鎖2
Thread-3搶到了鎖1
Thread-3搶到了鎖2
Thread-3釋放了鎖2
Thread-3釋放了鎖1
Thread-3搶到了鎖2
Thread-3搶到了鎖1
Thread-3釋放了鎖1
Thread-3釋放了鎖2

2、信號量Semaphore

同進程的同樣服務器

實現:定製了鎖的個數,也就意味着最多有幾個線程能夠搶到鎖頭網絡

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器+1;
調用release() 時內置計數器-1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。多線程

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):併發

from threading import Thread,currentThread,Semaphore
import time
def task():
    sm.acquire()
    print(f'{currentThread().name}正在執行')
    time.sleep(3)
    sm.release()
sm = Semaphore(5)
for i in range(15):
    t = Thread(target=task)
    t.start()
####################
Thread-1正在執行
Thread-2正在執行
Thread-3正在執行
Thread-4正在執行
Thread-5正在執行


Thread-7正在執行
Thread-6正在執行
Thread-8正在執行
Thread-9正在執行
Thread-10正在執行


Thread-11正在執行
Thread-13正在執行
Thread-12正在執行
Thread-14正在執行
Thread-15正在執行

與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程app

3、GIL

3.1 什麼是GIL

  1. 在cpython解釋器中有一把GIL鎖(全局解釋鎖),GIL鎖本質是一把互斥鎖。
  2. 致使了同一個進程下,同一時間只能運行一個線程,沒法利用多核優點
  3. 同一個進程下的多個線程只能實現併發不能實現並行。

3.2爲何要有GIL

由於cpython自帶的垃圾回收機制不是線程安全的,因此要有GIL鎖dom

致使了同一個進程下,同一時間只能由一個線程,沒法利用多核優點異步

分析:socket

  1. 咱們四個任務要處理,處理方式併發
  2. 解決方案:
    1. 開啓四個進程
    2. 一個進程下開啓四個線程

多進程vs多線程

計算密集型(推薦使用多進程)

推薦使用多進程,利用cpu的多核優點,並行的計算(cpu主要負責計算)

下面例子,分析過程:

  1. 每一個都要計算10s多線程
  2. 在同一時刻只有一個線程會被執行,也就意味着每一個10s都不能省,分開每一個都要計算10s,共40.ns
  3. 多進程
  4. 能夠並行的執行多個線程,10s+開啓進程的時間
from threading import Thread
from multiprocessing import Process
import time
def workl():
    res = 0
    for i in range(10000000):
        res*=i
if __name__ == '__main__':
    t_list = []
    start = time.time()
    for i in range(4):
        p = Process(target=workl)
        t_list.append(p)
        p.start()
    for p in t_list:
        p.join()
    end = time.time()
    print("多進程",end-start)
################
多進程 1.082251787185669
from threading import Thread
from multiprocessing import Process
import time
def workl():
    res = 0
    for i in range(10000000):
        res*=i
if __name__ == '__main__':
    t_list = []
    start = time.time()
    for i in range(4):
        t = Thread(target=workl)
        t_list.append(t)
        t.start()
    for t in t_list:
        t.join()
    end = time.time()
    print("多線程",end-start)
###############
多進程 2.43462872505188

IO密集型(推薦使用多線程)

推薦使用多線程解決,大部分時間都在io,而且開啓一個線程要比開啓進程的速度快的多

大部分的需求都是io密集型,由於大部分的軟件都是基於網絡的

  1. 4個任務每一個任務90%大部分時間都在io.
  2. 每一個任務io10s 0.5s
  3. 多線程
  4. 能夠實現併發,每一個線程io的時間不咋佔用cpu, 10s + 4個任務的計算時間
  5. 多進程
  6. 能夠實現並行,10s+1個任務執行的時間+開進程的時間
from threading import Thread
from multiprocessing import Process
import time
def work():
    x = 1+1
    time.sleep(5)
if __name__ == '__main__':
    t_list = []
    start = time.time()
    for i in range(4):
        t = Thread(target=work)
        t_list.append(t)
        t.start()
    for t in t_list:
        t.join()
    end = time.time()
    print("多線程",end-start)
################
多線程 5.002916097640991
from multiprocessing import Process
import time
def work():
    x = 1+1
    time.sleep(5)
if __name__ == '__main__':
    t_list = []
    start = time.time()
    for i in range(4):
        p = Process(target=work)
        t_list.append(p)
        p.start()
    for p in t_list:
        p.join()
    end = time.time()
    print("多進程",end-start)
#################
多進程 5.1951446533203125

[TOC]

4、線程Queue()隊列

1.1 先進先出

import queue
q = queue.Queue()
q.put('123')
q.put('dd')
print(q.get())
print(q.get())
###############
123
dd

1.2 先進後出(堆棧)

import queue
q = queue.LifoQueue()#堆棧,後進先出
q.put('ocean')
q.put('sku')
q.put('chen')
print(q.get())
print(q.get())
print(q.get())
##############
chen
sku
ocean

1.3優先級取數據

import queue
q = queue.PriorityQueue()
q.put((50,'chen'))#一般元組的第一個值是int類型
q.put((100,'sky'))
q.put((1,'rocky'))
print(q.get())
print(q.get())
print(q.get())
##############
(1, 'rocky')
(50, 'chen')
(100, 'sky')
import queue
q = queue.PriorityQueue()
q.put('chen')
q.put('sky')
q.put('rocky')
print(q.get())
print(q.get())
print(q.get())
##################
chen
rocky
sky
import queue
q = queue.PriorityQueue()
q.put(50)
q.put(100)
q.put(1)
print(q.get())
print(q.get())
print(q.get())
############
1
50
100

5、線程定時器

和time.sleep()不一樣,線程定時器不會阻礙下面代碼的執行,而time.sleep()會阻礙。

from threading import Timer
import time
def task():
    print('線程執行了')
    time.sleep(2)
    print('線程結束了')
t = Timer(4,task)###元組第一個數是多少秒開啓線程
t.start()
print("不會阻礙線程的執行")
########################
不會阻礙線程的執行
線程執行了
線程結束了

6、多線程併發socket服務端

  1. 服務端

    import socket 
    from threading import Thread
    def talk(conn):
        while True:
            try:
                msg = conn.recv(1024)
                if len(msg)==0:break
                conn.send(msg.upper())
            except Exception:
                print('客戶點關閉鏈接')
                break
    def server_demo():
        server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        server.bind(("127.0.0.1",8008))
        server.listen(5)
    
        while True:
            conn,addr = server.accept()
            t = Thread(target=talk,args=(conn,))
            t.start()
    if __name__ == '__main__':
        server_demo()
  2. 客戶端

    import socket
    from threading import Thread,currentThread
    def client_dome():
        client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        client.connect(('127.0.0.1',8008))
        while True:
            msg = f"{currentThread().name}"
            if len(msg)==0:continue
            client.send(msg.encode('utf8'))
            feedback = client.recv(1024)
            print(feedback.decode('utf8'))
        client.close()
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=client_dome)
            t.start()

7、進程池和線程池

4.1進程池和線程池的意義

  1. 進程池和線程池

    1. 進程池和線程池的功能是限制進程數或者線程數。
  2. 何時限制?

    當併發的任務數量遠遠大於計算機所能承受的範圍的時候,即沒法一次性開啓過多的任務數量,就應該考慮去限制進程數或者線程數,從而保證服務器不會崩潰。(蕩機)

4.2 同步異步

4.2.1同步

同步:提交一個任務,必須等待任務執行完畢(或者最後拿到返回值),才能執行下一行代碼

4.2.1.1進程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'{current_process().name}在執行任務{i}')
    time.sleep(1)
    return i**2
if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)#池子裏有四個進程
    fu_list = []
    for i in range(20):
        future = pool.submit(task,i)#task任務要作20次,4個線程負責作這些事情
        print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行
#################
Process-1在執行任務0
0
Process-2在執行任務1
1
Process-3在執行任務2
4
Process-4在執行任務3
9
Process-1在執行任務4
16
Process-2在執行任務5
25
Process-3在執行任務6
36
Process-4在執行任務7
49
Process-1在執行任務8
64
Process-2在執行任務9
81
Process-3在執行任務10
100
Process-4在執行任務11
121
Process-1在執行任務12
144
Process-2在執行任務13
169
Process-3在執行任務14
196
Process-4在執行任務15
225
Process-1在執行任務16
256
Process-2在執行任務17
289
Process-3在執行任務18
324
Process-4在執行任務19
361
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'{current_process().name}在執行任務{i}')
    time.sleep(1)
    return i**2
if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)#池子裏有四個進程
    fu_list = []
    for i in range(20):
        future = pool.submit(task,i)#task任務要作20次,4個線程負責作這些事情
        # print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行
        fu_list.append(future)
    pool.shutdown()#關閉了池的入口,會等待全部的任務執行完,結束阻塞.
    for fu in fu_list:
        print(fu.result())
###########################
Process-1在執行任務0
Process-2在執行任務1
Process-3在執行任務2
Process-4在執行任務3
Process-1在執行任務4
Process-2在執行任務5
Process-3在執行任務6
Process-4在執行任務7
Process-1在執行任務8
Process-2在執行任務9
Process-3在執行任務10
Process-4在執行任務11
Process-1在執行任務12
Process-2在執行任務13
Process-3在執行任務14
Process-4在執行任務15
Process-1在執行任務16
Process-2在執行任務17
Process-3在執行任務18
Process-4在執行任務19
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361

4.2.1.2線程池

4.2.1線程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'{currentThread().name}在執行任務{i}')
    time.sleep(1)
    return i**2
if __name__ == '__main__':
    pool = ThreadPoolExecutor(4)#池子裏有四個線程
    fu_list = []
    for i in range(20):
        future = pool.submit(task,i)#task任務要作20次,4個線程負責作這些事情
        print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行
##########################
ThreadPoolExecutor-0_0在執行任務0
0
ThreadPoolExecutor-0_0在執行任務1
1
ThreadPoolExecutor-0_1在執行任務2
4
ThreadPoolExecutor-0_0在執行任務3
9
ThreadPoolExecutor-0_2在執行任務4
16
ThreadPoolExecutor-0_1在執行任務5
25
ThreadPoolExecutor-0_3在執行任務6
36
ThreadPoolExecutor-0_0在執行任務7
49
ThreadPoolExecutor-0_2在執行任務8
64
ThreadPoolExecutor-0_1在執行任務9
81
ThreadPoolExecutor-0_3在執行任務10
100
ThreadPoolExecutor-0_0在執行任務11
121
ThreadPoolExecutor-0_2在執行任務12
144
ThreadPoolExecutor-0_1在執行任務13
169
ThreadPoolExecutor-0_3在執行任務14
196
ThreadPoolExecutor-0_0在執行任務15
225
ThreadPoolExecutor-0_2在執行任務16
256
ThreadPoolExecutor-0_1在執行任務17
289
ThreadPoolExecutor-0_3在執行任務18
324
ThreadPoolExecutor-0_0在執行任務19
361

4.2.2異步

異步:提交一個任務,不須要等待執行完畢,能夠直接執行下一行代碼。

4.2.2.1異步進程池

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time


def task(i):
    print(f'{current_process().name}在執行任務{i}')
    time.sleep(1)
    return i ** 2


def parse(future):
    print(future.result())


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 池子裏有四個進程
    for i in range(20):
        future = pool.submit(task, i)  # task任務要作20次,4個線程負責作這些事情
        # print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行
        future.add_done_callback(parse)
        # 爲當前任務綁定了一個函數,在當前任務執行結束的時候會觸發這個函數,
        # 會把future對象做爲參數傳給函數
        # 這個稱之爲回調函數,處理完了回來就調用這個函數.
#######################
Process-1在執行任務0
Process-2在執行任務1
Process-3在執行任務2
Process-4在執行任務3
Process-1在執行任務4
0
Process-2在執行任務5
1
Process-3在執行任務6
4
Process-4在執行任務7
9
Process-1在執行任務8
16
Process-2在執行任務9
25
Process-3在執行任務10
36
Process-4在執行任務11
49
Process-1在執行任務12
64
Process-2在執行任務13
81
Process-3在執行任務14
100
Process-4在執行任務15
121
Process-1在執行任務16
144
Process-2在執行任務17
169
Process-3在執行任務18
196
Process-4在執行任務19
225
256
289
324
361

4.2.2.2異步線程池

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time


def task(i):
    print(f'{currentThread().name}在執行任務{i}')
    time.sleep(1)
    return i ** 2


def parse(future):
    print(future.result())


if __name__ == '__main__':
    pool = ThreadPoolExecutor(4)  # 池子裏有四個線程
    for i in range(20):
        future = pool.submit(task, i)  # task任務要作20次,4個線程負責作這些事情
        # print(future.result())#若是沒有結果會一直等下去,致使了全部的任務都在串行
        future.add_done_callback(parse)
        # 爲當前任務綁定了一個函數,在當前任務執行結束的時候會觸發這個函數,
        # 會把future對象做爲參數傳給函數
        # 這個稱之爲回調函數,處理完了回來就調用這個函數.
##############################
ThreadPoolExecutor-0_0在執行任務0
ThreadPoolExecutor-0_1在執行任務1
ThreadPoolExecutor-0_2在執行任務2
ThreadPoolExecutor-0_3在執行任務3
0
ThreadPoolExecutor-0_0在執行任務4
1
ThreadPoolExecutor-0_1在執行任務5
9
ThreadPoolExecutor-0_3在執行任務6
4
ThreadPoolExecutor-0_2在執行任務7
25
ThreadPoolExecutor-0_1在執行任務8
16
ThreadPoolExecutor-0_0在執行任務9
36
ThreadPoolExecutor-0_3在執行任務10
49
ThreadPoolExecutor-0_2在執行任務11
81
ThreadPoolExecutor-0_0在執行任務12
64
ThreadPoolExecutor-0_1在執行任務13
100
ThreadPoolExecutor-0_3在執行任務14
121
ThreadPoolExecutor-0_2在執行任務15
144
ThreadPoolExecutor-0_0在執行任務16
169
ThreadPoolExecutor-0_1在執行任務17
196
ThreadPoolExecutor-0_3在執行任務18
225
ThreadPoolExecutor-0_2在執行任務19
256
289
361
324

8、協程

  1. python的線程用的是操做系統原生的線程

5.1什麼是協程

  1. 協程:單線程下實現併發
  2. 併發:切換+保存狀態
  3. 多線程:操做系統幫助實現,若是遇到io切換,執行時間過程也會切換,實現一個雨露均沾的效果
  • 什麼樣的協程是有有意義的?

    1. 遇到io切換的時候是有意義的
    2. 具體:協程的概念本質是程序猿抽象出來的,操做系統根本不知道協程的存在,也就是說來了一個線程我本身遇到io,我本身線程內部直接切到本身的另外一個線程任務上,操做系統根本不知道這個過程。
  • 優勢:

    1. 本身控制切換要比操做系統控制切換的快的多
  • 缺點:

    1. 對比多線程

      本身要檢測全部的io,只要有一個阻塞總體都會跟着阻塞

    2. 對比多進程

      沒法利用多核優點

  • 爲何要有協程?

    本身控制切換的速度比操做系統控制切換的快,下降了單個線程的io時間。

import gevent
import time


def eat():
    print("eat 1")
    gevent.sleep(2)
    print('eat 2')


def play():
    print("play 1")
    gevent.sleep(3)
    print("play 2")


start = time.time()
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join()
end = time.time()
print(end - start)
###################
eat 1
play 1
eat 2
play 2
3.006275177001953
from gevent import monkey;monkey.patch_all()####爲協程打補丁
# monkey.patch_all()
import gevent,time
def eat():
    print('eat 1')
    time.sleep(2)
    print('eat 2')
def play():
    print('play 1 ')
    time.sleep(3)
    print('play 2')
start = time.time()
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join()
end = time.time()
print(end-start)
#####################
eat 1
play 1 
eat 2
play 2
3.004676580429077
相關文章
相關標籤/搜索