##########################線程#################################
進程是自願分配的最小單位,線程是CPU調度的最小單位,每個進程中至少有一個線程
線程屬於進程,進程負責獲取操做系統分配給個人資源,線程負責執行代碼
進程的弊端:
開啓和關閉 以及 切換 都會帶來很大的時間開銷
過多的進程還會形成操做系統調度的壓力
進程和線程有什麼關係?
1)地址空間和其餘自願(如打開文件):進程間相互獨立,同一進程的各線程間共享.某進程內的線程在其餘進程不可見
2)通訊: 進程間通訊IPC,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊---須要進程同步和互斥手段的輔助
以保證數據的一致性
3)調度和切換:線程上下文切換比進程上下文切換要快得多
4)在多線程操做系統中,進程不是一個可執行的實體
線程的概念
進程是操做系統中最小的資源分配單位
線程是CPU調度的最小單位
線程進程之間的對比
線程不能獨立存在,必須在一個進程裏
線程的開啓 關閉以及切換的開銷要遠遠小於進程
同一個進程之間的多個線程之間數據共享
全局解釋器鎖GIL
使得一個進程中的多個線程不能充分的利用多核
從代碼的角度上來看
多進程
開啓和結束 時間開銷大
切換的效率低
內存隔離
多線程
開啓和結束 時間開銷很是小
切換效率高
內存不隔離
線程的特色:
1)輕型實體 2) 獨立調度和分派的基本單位 3)共享進程資源 4)可併發執行
Cputhon解釋器下的全局解釋器鎖
在同一進程中的多個線程在同一時刻只能有一個線程訪問CPU
多線程沒法造成並行
鎖的線程
Jpython 解釋器就沒有全局解釋器鎖
何時纔會有到CPU
程序計算的時候
IO阻塞
是不會用到CPU的
在多個進程\線程同時訪問一個數據的時候就會產生數據不安全的現象
多進程 訪問文件
多線程
同時去訪問一個數據
GIL 全局解釋器鎖
在同一個進程裏的每個線程同一時間只能有一個線程訪問cpu
儘可能不要設置全局變量
只要在多線程/進程之間用到全局變量 就加上鎖
from threading import Lock,Thread
死鎖
lock = Lock()
lock.acquire()
lock.acquire()
noodle = 100
def func(name,lock):
global noodle
lock.acquire()
noodle -= 1
lock.release()
print('%s吃到面了'%name)
if __name__ == '__main__':
lock = Lock() # 線程鎖 互斥鎖
t_lst = []
for i in range(10):
t = Thread(target=func,args=(i,lock))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(noodle)
科學家吃麪問題
import time
from threading import Thread,Lock
lock = Lock()
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s拿到了面' % name)
fork_lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
fork_lock.release() # 0.01
noodle_lock.release() # 0.01
def eat2(name):
fork_lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
noodle_lock.acquire()
print('%s拿到了面' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
noodle_lock.release()
fork_lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
遞歸鎖
from threading import RLock
rlock = RLock()
rlock.acquire()
print(1)
rlock.acquire()
print(2)
rlock.acquire()
print(3)
遞歸鎖解決死鎖問題
import time
from threading import Thread,RLock
lock = RLock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release() # 0.01
lock.release() # 0.01
def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
lock.acquire()
print('%s拿到了面' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release()
lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
互斥鎖解決死鎖問題
import time
from threading import Thread,Lock
lock = Lock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
print('%s拿到了叉子' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release() # 0.01
def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
print('%s拿到了面' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
死鎖
多把鎖同時應用在多個線程中
互斥鎖和遞歸鎖哪一個好
遞歸鎖 快速恢復服務
死鎖問題的出現 是程序的設計或者邏輯的問題
還應該進一步的排除和重構邏輯來保證使用互斥鎖也不會發生死鎖
互斥鎖和遞歸鎖的區別
互斥鎖 就是在一個線程中不能連續屢次ACQUIRE
遞歸鎖 能夠在同一個線程中acquire任意次,注意acquire多少次就須要release多少次
#####################threading模塊 thread類###############
Thread類的其餘方法
Thread實例對象的方法 # isAlive(): 返回線程是否活動的 # getName(): 返回線程名 #setName() : 設置線程名 threading 模塊提供的一些方法: #threading.currentThread():返回當前的線程變量 #threading.enumerate(): 返回一個包含正在運行的線程的list,正在運行指線程啓動後,結束前,不包括啓動前和終止後的線程. #threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
##信號量###
鎖+計時器
和進程的同樣, Semaphore 管理一個內置的計數器,
每當調用acquire()時 內置計數器 -1;
調用release()時 內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()
池和信號量
import time
from multiprocessing import Semaphore,Process,Pool
def ktv1(sem,i):
sem.acquire()
i +=1
sem.release()
def ktv2(i):
i +=1
if __name__=='__main__':
sem = Semaphore(5)
start = time.time()
p_1 =[]
for i in range(100):
p = Process(target=ktv1,args=(sem,i))
p.start()
p_1.append(p)
for p in p_1:p.join()
print('###信號量',time.time() - start)
start = time.time()
p = Pool(5)
p_1 = []
for i in range(100):
ret = p.apply_async(func = ktv2,args=(sem,i))
p_1.append(ret)
p.close()
p.join()
print('***池',time.time() - start)
池 效率高
池子裏面有幾個一共就起幾個
無論多少任務,池子的個數是固定的
開啓進程和關閉進程這些事都是須要固定的開銷
就不產生額外的時間開銷
且進程池中的進程數控制的好,name操做系統的壓力也小
信號量
有多少個任務就起多少進程/線程
能夠幫助你減小操做系統切換的負擔
可是並不能幫助你減小進/線程開啓和關閉的時間
#####事件
和進程同樣
wait
等 到時間內部的信號變成True就不阻塞了
set
設置信號變成True
clear
設置信號變成False
is_get
查看信號是否爲True
#數據庫鏈接 import time import random from threading import Event,Thread def check(e): """檢測一下數據庫的網絡和個人網絡是否通""" print('正在檢測兩臺機器之間的網絡狀況....') time.sleep(random.randint(1,3)) e.set() def connet_db(e): e.wait() print('鏈接數據庫...') print('鏈接數據庫成功~~~') e = Event() Thread(target=connet_db,args=(e,)).start() Thread(target=check,args=(e,)).start()
######條件
使得線程等待,只有知足某條件時,才釋放n個線程
Python提供的Condition對象提供了對複雜線程同步問題的支持。Condition被稱爲條件變量,
除了提供與Lock相似的acquire和
release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,而後判斷一些條件。
若是條件不知足則wait;若是
條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後
會從新判斷條件。不斷的重
復這一過程,從而解決複雜的同步問題。
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" % n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() print('****')
##定時器
定時器,指定n秒後執行某個操做
from threading import Timer def func(): print('執行我啦') interval 時間間隔 Timer(0.2,func).start() # 定時器 建立線程的時候,就規定它多久以後去執行
####線程隊列python
from multiprocessing import Queue,JoinableQueue #進程IPC隊列
from queue import Queue #線程隊列,先進先出的
from queue import LifoQueue #後進先出的
隊列Queue
先進先出
自帶鎖, 數據安全
棧 LifoQueue
後進先出
自帶鎖,數據安全
lq = LifoQueue() ##裏面若是帶有數字, lq裏面的值的數量不能超過這個數字,不然它會一直等着
lq.put(123)
lq.put(456)
lq.put('abc')
lq.put('cba')
lq.put('ccc')
lq.put('aaa')
print(lq)
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get())
from queue import PriorityQueue # 優先級隊列
pq = PriorityQueue()
pq.put((10,'aaa')) ##裏面是元組(不是必須),判斷優先級,若是是字母根據ACCII碼來判斷大小
pq.put((5,'zzz'))
print(pq.get())
print(pq.get())
# #python標準模塊--concurrent.futures
#1 介紹 concurrent.futures 模塊提供了高度封裝的異步調用接口(高度封裝:省事) #進程池/線程池的同一使用方式 ThreadPoolExecutor: 線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 #2,基本方法 #submit(fn,*args,**kwargs) 異步提交任務 #map(func,*iterables,timeout = None ,chunksize = 1) 取代for 循環 submit的操做 #shutdown(wait = True) 至關於進程池的 pool.close()+pool.join()操做 wait = True , 等待池內全部任務執行完畢回收完資源後才繼續 wait = False, 當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 #result(timeout = None) 取得結果 #add_done_callback(fn) 回調函數
import time from threading import currentThread,get_ident from concurrent.futures import ThreadPoolExecutor #幫助你啓動線程池的類 from concurrent.futures import ProcessPoolExecutor #幫助你啓動線程池的類 def func(i): time.sleep(1) print('in %s %s'%(i,currentThread())) return i**2 t = ThreadPoolExecutor(5) t.map(func,range(20)) for i in range(20): t.submit((func,i))
import time from threading import currentThread,get_ident from concurrent.futures import ThreadPoolExecutor #幫助你啓動線程池的類 from concurrent.futures import ProcessPoolExecutor #幫助你啓動線程池的類 def func(i): time.sleep(1) print('in %s %s'%(i,currentThread())) return i**2 def back(fn): print(fn.result(),currentThread()) 獲取任務結果 t = ThreadPoolExecutor(20) ret_l = [] for i in range(20): ret = t.submit(func,i) ret_l.append(ret) t.shutdown() for ret in ret_l: print(ret.result()) print('main : ',currentThread())
import os import time from concurrent.futures import ProcessPoolExecutor # 幫助你啓動線程池的類 def func(i): time.sleep(1) print('in %s %s'%(i,os.getpid())) return i**2 def back(fn): print(fn.result(),os.getpid()) if __name__ == '__main__': print('main : ',os.getpid()) t = ProcessPoolExecutor(20) for i in range(100): t.submit(func,i).add_done_callback(back)