Thread 模塊數據庫
import time安全
from threading import Thread服務器
from multiprocessing import Process網絡
#效率差異多線程
def func(a):app
a += 1dom
if __name__ == "__main__":socket
li = []async
start = time.time()tcp
for i in range(50):
p = Thread(target=func)
p.start()
li.append(p)
for i in li:
i.join()
pirnt(time.time()-start)
start = time.time()
li = []
for i in range(50):
t = Process(target=func)
t.start()
li.append(t)
for i in li:
i.join()
print(time.time()-start)
terminate(強制結束子進程) 在線程中沒有
線程之間的數據共享
from threading import Thread
n = 100
def func():
global n
n -= 1
li = []
for i in range(100):
t = Thread(target=func)
t.start()
li.append(t)
for i in li:
i.join()
print(n)
守護線程
import time
from threading import Thread
def func():
while 1:
print(True)
time.sleep(0.5)
def inner():
print("in inner start")
time.sleep(3)
print("in inner end")
t1 = Thread(target=func)
t1.setDaemon(True)
t1.start()
t2 = Thread(target=inner)
t2.start()
time.sleep(1)
print("主進程")
主線程若是結束了,那麼整個進程就結束了,守護線程會等待主線程結束以後才結束.
主進程 等待 守護進程 子進程 守護進程只守護主進程的代碼就能夠了
守護線程不行,主線程結束了那麼整個進程就結束了,全部的線程就都結束了
例子 :
使用多線程實現tcp協議的socket server
客戶端:
import socket
sk = socket.socket()
sk.connect_ex(("127.0.0.1",9090))
while 1:
sk.send(b"hello")
ret = sk.recv(1024).decode("utf-8")
print(ret)
sk.close()
服務器 :
import socket
from threading import Thread
sk = socket.socket()
sk.bind(("127.0.0.1",9090))
sk.listen()
def func(conn):
while 1:
ret = conn.recv(1024).decode("utf-8")
print(ret)
conn.send(b"world")
if __name__ == "__main__":
while 1:
conn,addr = sk.accept()
Thread(target=func,args=(conn,)).start()
from threading import Thread,get_ident
#開啓線程的第二種方法和查看線程id
class Mythread(Thread):
def __init__(self,args):
super().__init__()
self.args = args
def run(self):
print("in my thread:",get_ident(),self.args)
print("main",get_ident())
t = Mythread("nana")
t.start()
線程中的方法
import time
from threading import Thread,get_ident,currentThread,enumerate,activeCount
class Mythread(Thread):
def __init__(self,args):
super().__init__()
self.args = args
def run(self):
time.sleep(0.1)
print(currentThread()) #返回當前的線程變量
print("in my thread:",get_ident(),self.args)
print("main:",get_ident())
t = Mythread("nana")
print(t.is_alive()) #返回線程是不是活動的
t.start()
t.setname("nana") #設置線程名稱
print(t.getname()) #獲取線程名稱
print(activeCount()) #正在運行的線程的數量len(enumerate) #返回一個包含正在運行的線程list
print("t:",t)
Thread類 : 開啓線程 傳參數 join
和進程的差別 : 1,效率 2. 數據共享 3. 守護線程
面向對象的方式開啓線程 :
thread對象的其餘方法 : isAlive,setname,getname
threading模塊的方法 : currentTread,activeCount,encumerate
GIL鎖(全局解釋器鎖) : 鎖線程, 同一個線程在同一時刻只能被一個CPU執行
在多進程/線程同時訪問一個數據的時候就會產生數據不安全的現象.
多進程 訪問文件
多線程 : 同時訪問一個數據
GIL全局解釋鎖 : 在同一個進程裏的每個進程同一時間只能有一個線程訪問CPU
儘可能不要設置全局變量
只要在多線程/進程之間用到全局變量就加上鎖
from threading import Thread,Lock
lock = Lock() #互斥鎖
noodle = 100
def func(name):
global noodle
lock.acquire()
noodle -= 1
lock.release()
print("%s吃到面了"%name)
if __name__ == "__main__":
li = []
for i in range(100):
t = Thread(target=func)
t.start()
li.append(t)
for i in li:
i.join()
print(noodle)
科學家吃麪問題(死鎖現象)
import time
from threading import Thread,Lock
noodle = Lock()
fork = Lock()
def eat1(name):
noodle.acquire()
print("%s拿到面了"%name)
fork.acquire()
print("%s拿到叉子了"%name)
print("%s在吃麪"%name)
time.sleep(0.5)
fork.release()
noodle.release()
def eat2(name):
fork.acquire()
print("%s拿到叉子了"%name)
noodle.acquire()
print("%s拿到面了"%name)
print("%s 在吃麪"%name)
time.sleep(0.5)
noodle.release()
fork.release()
li = ["alex","egon","wusir","taibai"]
for i in li:
Thread(target=eat1,args=(i,)).start()
Thread(target=eat2,args=(i,)).start()
遞歸鎖 :
from threading import RLock
lock = RLock()
lock.acquire()
print(55555)
lock.acquire()
print(666666)
lock.acquire()
print(99999)
遞歸鎖解決死鎖問題 :
import time
from threading import Thread,RLock
lock = RLock()
def eat1(name):
lock.acquire()
print("%s拿到面了"%name)
lock.acquire()
print("%s拿到了叉子了"%name)
print("%s 在吃飯"%name)
time.sleep(0.5)
lock.release()
lock.release()
def eat2(name):
lock.acquire()
print("%s拿到叉子了"%name)
lock.acquire()
print("%s拿到面了"%name)
time.sleep(0.5)
lock.release()
lock.release()
li = ["alex","egon","wusir","taibai"]
for i in li:
Thread(target=eat1,args=(i,)).start()
Thread(tarage=eat2,args=(i,)).start()
互斥鎖解決死鎖問題 :
import time
from threading import Thred,Lock
lock = Lock()
def eat1(name):
lock.acquire()
print("%s拿到面了"%name)
print("%s 拿到叉子了"%name)
print("%s在吃麪"%name)
time.sleep(0.5)
lock.release()
def eat2(name):
lock.acquire()
print("%s拿到叉子了"%name)
print("%s拿到面了"%name)
print("%s 在吃麪"%name)
time.sleep(0.5)
lock.release()
li = ["alex","egon","wusir","taibai"]
for i in li:
Thread(target=eat1,args=(i,)).start()
Thread(target=eat2,args=(i,)).start()
死鎖 :
多把鎖同時應用在多個線程中
互斥鎖和遞歸鎖哪一個好?
遞歸鎖 : 快速回復服務
死鎖問題的出現,是程序的設計或者邏輯的問題
還應該進一步的排除和重構邏輯來保證使用互斥鎖也不會發生死鎖
互斥鎖和遞歸鎖的區別 :
互斥鎖 : 就是在一個線程中不能連續屢次acquire
遞歸鎖 : 能夠在同一個線程中acquire任意次,注意acquire多少次就須要release多少次
信號量 和 進程池
信號量就是 : 鎖 + 計數器
import time
from multiprocessing import Semaphore,Process,Pool
def ktv(sem,i):
sem.acquire()
i += 1
sem.release()
def ktv1(i):
i += 1
if __name__ == "__main__":
start = time.time()
li = []
sem =Semaphore(5)
for i in range(100)
p = Process(targe=ktv,args=(sem,i))
p.start()
li.append(p)
for i in li:
i.join()
print(time.time()-start)
start = time.time()
p = Pool(5)
for i in range(100):
p.apply_async(func=ktv1,args=(i,))
p.close()
p.join()
print(time.time()-start)
進程池和信號量:
進程池 : 進程池裏有幾個進程就起幾個,無論多少任務,池子裏的進程個數是固定的,開啓進程和關閉進程這些事都是須要固定的時間開銷,就不產生額外的時間開銷.且進程池中的進程數控制的好,那麼操做系統的壓力也小.
信號量 :
有多少個任務就起多少個進程/線程
能夠幫助減小操做系統切換負擔
可是並不能減小進/線程開啓和關閉的時間
數據庫鏈接 :
import time,random
from threading import Thread,Event
def check(e):
print("正在檢測兩臺機器之間的網絡狀況...")
time.sleep(random.randint(1,3))
e.set()
def connect_db(e):
e.wait()
print("鏈接數據庫..")
print("鏈接數據庫成功....")
e = Event()
Thread(target=check,args=(e,)).start()
Thread(target=connect_db,args=(e,)).start()
import time,random
from threading import Thread,Event
def check(e):
print("正在檢測兩臺機器之間的網絡狀況...")
time.sleep(random.randint(0,2))
e.set()
def connect_db(e):
n = 0
while n<3:
if e.is_set():
break
else:
e.wait(0.5)
n += 1
if n == 3:
raise TimeoutError
print("鏈接數據庫....")
print("鏈接數據庫成功...")
e = Event()
Thread(target=check,args=(e,)).start()
Thread(target=connect_db,args=(e,)).start()
條件 :
from threading import Condition
經常使用方法:
acquire , release , wait 阻塞 , notify (讓wait解除阻塞的工具)
wait仍是notify在執行這兩個方法的先後,必須執行acquire和release
from threading import Condition ,Thread
def func(con,i):
con.acquire()
con.wait()
print("thread:",i)
con.release()
con = Condition()
for i in range(20):
Thread(target=func,args=(con,i)).start()
con.acquire()
con.notify_all() #幫助wait的子線程處理某個數據直到知足條件
con.release()
while 1:
num = int(input("<<<"))
con.acquire()
con.notify(num)
con.release()
定時器 :
from threading import Thread , Timer
def func():
print("我執行了")
Timer(1,func).start() #定時器
建立線程的時候,就規定他多久以後去執行