線程的Thread模塊 同步控制:鎖,事件,信號量,條件,定時器

Thread 模塊數據庫

import time安全

from threading import Thread服務器

from multiprocessing import Process網絡

#效率差異多線程

def func(a):app

  a += 1dom

if __name__ == "__main__":socket

  li = []async

  start = time.time()tcp

  for i in range(50):

    p = Thread(target=func)

    p.start()

    li.append(p)

  for i in li:

    i.join()

  pirnt(time.time()-start)

  start = time.time()

  li = []

  for i in range(50):

    t = Process(target=func)

    t.start()

    li.append(t)

  for i in li:

    i.join()

  print(time.time()-start)

terminate(強制結束子進程) 在線程中沒有

線程之間的數據共享

from threading import Thread

n = 100

def func():

  global n

  n -= 1

li = []

for i in range(100):

  t = Thread(target=func)

  t.start()

  li.append(t)

for i in li:

  i.join()

print(n)

守護線程

import time

from threading import Thread

def func():

  while 1:

    print(True)

    time.sleep(0.5)

def inner():

  print("in  inner  start")

  time.sleep(3)

  print("in  inner  end")

t1 = Thread(target=func)

t1.setDaemon(True)

t1.start()

t2 = Thread(target=inner)

t2.start()

time.sleep(1)

print("主進程")

主線程若是結束了,那麼整個進程就結束了,守護線程會等待主線程結束以後才結束.

主進程 等待  守護進程  子進程  守護進程只守護主進程的代碼就能夠了

守護線程不行,主線程結束了那麼整個進程就結束了,全部的線程就都結束了

例子 : 

使用多線程實現tcp協議的socket server

客戶端:

import socket

sk = socket.socket()

sk.connect_ex(("127.0.0.1",9090))

while 1:

  sk.send(b"hello")

  ret = sk.recv(1024).decode("utf-8")

  print(ret)

sk.close()

服務器 :

import socket

from threading import Thread

sk = socket.socket()

sk.bind(("127.0.0.1",9090))

sk.listen()

def func(conn):

  while 1:

    ret = conn.recv(1024).decode("utf-8")

    print(ret)

    conn.send(b"world")

if __name__ == "__main__":

  while 1:

    conn,addr = sk.accept()

    Thread(target=func,args=(conn,)).start()

from threading import Thread,get_ident

#開啓線程的第二種方法和查看線程id

class Mythread(Thread):

  def __init__(self,args):

    super().__init__()

    self.args = args

  def run(self):

    print("in my thread:",get_ident(),self.args)

print("main",get_ident())

t = Mythread("nana")

t.start()

線程中的方法

import time

from threading import Thread,get_ident,currentThread,enumerate,activeCount

class Mythread(Thread):

  def __init__(self,args):

    super().__init__()

    self.args = args

  def run(self):

    time.sleep(0.1)

    print(currentThread())  #返回當前的線程變量

    print("in my thread:",get_ident(),self.args)

print("main:",get_ident())

t = Mythread("nana")

print(t.is_alive())  #返回線程是不是活動的

t.start()

t.setname("nana")  #設置線程名稱

print(t.getname())  #獲取線程名稱

print(activeCount())  #正在運行的線程的數量len(enumerate)  #返回一個包含正在運行的線程list

print("t:",t)

Thread類 : 開啓線程 傳參數 join

和進程的差別 : 1,效率  2. 數據共享   3. 守護線程

面向對象的方式開啓線程 :

thread對象的其餘方法 : isAlive,setname,getname

threading模塊的方法 : currentTread,activeCount,encumerate

GIL鎖(全局解釋器鎖) : 鎖線程, 同一個線程在同一時刻只能被一個CPU執行

在多進程/線程同時訪問一個數據的時候就會產生數據不安全的現象.

多進程 訪問文件

多線程 : 同時訪問一個數據

GIL全局解釋鎖 : 在同一個進程裏的每個進程同一時間只能有一個線程訪問CPU

儘可能不要設置全局變量

只要在多線程/進程之間用到全局變量就加上鎖

from threading import Thread,Lock

lock = Lock()  #互斥鎖

noodle = 100

def func(name):

  global noodle

  lock.acquire()

  noodle -= 1

  lock.release()

  print("%s吃到面了"%name)

if __name__ == "__main__":

  li = []

  for i in range(100):

    t = Thread(target=func)

    t.start()

    li.append(t)

  for i in li:

    i.join()

  print(noodle)

科學家吃麪問題(死鎖現象)

import time

from threading import Thread,Lock

noodle = Lock()

fork = Lock()

def eat1(name):

  noodle.acquire()

  print("%s拿到面了"%name)

  fork.acquire()

  print("%s拿到叉子了"%name)

  print("%s在吃麪"%name)

  time.sleep(0.5)

  fork.release()

  noodle.release()

def eat2(name):

  fork.acquire()

  print("%s拿到叉子了"%name)

  noodle.acquire()

  print("%s拿到面了"%name)

  print("%s 在吃麪"%name)

  time.sleep(0.5)

  noodle.release()

  fork.release()

li = ["alex","egon","wusir","taibai"]

for i in li:

  Thread(target=eat1,args=(i,)).start()

  Thread(target=eat2,args=(i,)).start()

遞歸鎖 :

from threading import RLock

lock = RLock()

lock.acquire()

print(55555)

lock.acquire()

print(666666)

lock.acquire()

print(99999)

遞歸鎖解決死鎖問題 :

import time

from threading import Thread,RLock

lock = RLock()

def eat1(name):

  lock.acquire()

  print("%s拿到面了"%name)

  lock.acquire()

  print("%s拿到了叉子了"%name)

  print("%s 在吃飯"%name)

  time.sleep(0.5)

  lock.release()

  lock.release()

def eat2(name):

  lock.acquire()

  print("%s拿到叉子了"%name)

  lock.acquire()

  print("%s拿到面了"%name)

  time.sleep(0.5)
  lock.release()

  lock.release()

li = ["alex","egon","wusir","taibai"]

for i in li:

  Thread(target=eat1,args=(i,)).start()

  Thread(tarage=eat2,args=(i,)).start()

互斥鎖解決死鎖問題 :

import time

from threading import Thred,Lock

lock = Lock()

def eat1(name):

  lock.acquire()
  print("%s拿到面了"%name)

  print("%s 拿到叉子了"%name)

  print("%s在吃麪"%name)

  time.sleep(0.5)

  lock.release()

def eat2(name):

  lock.acquire()

  print("%s拿到叉子了"%name)

  print("%s拿到面了"%name)

  print("%s 在吃麪"%name)

  time.sleep(0.5)

  lock.release()

li = ["alex","egon","wusir","taibai"]

for i in li:

  Thread(target=eat1,args=(i,)).start()

  Thread(target=eat2,args=(i,)).start()

死鎖 :

多把鎖同時應用在多個線程中

互斥鎖和遞歸鎖哪一個好?

遞歸鎖 : 快速回復服務

死鎖問題的出現,是程序的設計或者邏輯的問題

還應該進一步的排除和重構邏輯來保證使用互斥鎖也不會發生死鎖

互斥鎖和遞歸鎖的區別 :

互斥鎖 : 就是在一個線程中不能連續屢次acquire

遞歸鎖 : 能夠在同一個線程中acquire任意次,注意acquire多少次就須要release多少次

信號量 和 進程池

信號量就是 : 鎖 + 計數器

import time

from multiprocessing import Semaphore,Process,Pool

def ktv(sem,i):

  sem.acquire()

  i += 1

  sem.release()

def ktv1(i):

  i += 1

if __name__ == "__main__":

  start = time.time()

  li = []

  sem  =Semaphore(5)

  for i in  range(100)

    p = Process(targe=ktv,args=(sem,i))

    p.start()

    li.append(p)

  for i in li:

    i.join()

  print(time.time()-start)

  start  = time.time()

  p = Pool(5)

  for i in range(100):

    p.apply_async(func=ktv1,args=(i,))

  p.close()

  p.join()

  print(time.time()-start)

進程池和信號量:

進程池 : 進程池裏有幾個進程就起幾個,無論多少任務,池子裏的進程個數是固定的,開啓進程和關閉進程這些事都是須要固定的時間開銷,就不產生額外的時間開銷.且進程池中的進程數控制的好,那麼操做系統的壓力也小.

信號量 :

有多少個任務就起多少個進程/線程

能夠幫助減小操做系統切換負擔

可是並不能減小進/線程開啓和關閉的時間

數據庫鏈接 :

import time,random

from threading import Thread,Event

def check(e):

  print("正在檢測兩臺機器之間的網絡狀況...")

  time.sleep(random.randint(1,3))

  e.set()

def connect_db(e):

  e.wait()

  print("鏈接數據庫..")

  print("鏈接數據庫成功....")

 

e = Event()

Thread(target=check,args=(e,)).start()

Thread(target=connect_db,args=(e,)).start()

import time,random

from threading import Thread,Event

def check(e):

  print("正在檢測兩臺機器之間的網絡狀況...")

  time.sleep(random.randint(0,2))

  e.set()

def connect_db(e):

  n = 0

  while n<3:

    if e.is_set():

      break

    else:

      e.wait(0.5)

      n += 1

  if n == 3:

    raise TimeoutError

  print("鏈接數據庫....")

  print("鏈接數據庫成功...")

e = Event()

Thread(target=check,args=(e,)).start()

Thread(target=connect_db,args=(e,)).start()

條件 :

from threading import Condition

經常使用方法:

acquire  ,  release  , wait  阻塞 ,  notify (讓wait解除阻塞的工具)

wait仍是notify在執行這兩個方法的先後,必須執行acquire和release

from threading import Condition ,Thread

def func(con,i):

  con.acquire()

  con.wait()

  print("thread:",i)

  con.release()

con = Condition()

for i in range(20):

  Thread(target=func,args=(con,i)).start()

con.acquire()

con.notify_all()  #幫助wait的子線程處理某個數據直到知足條件

con.release()

while 1:

  num = int(input("<<<"))

  con.acquire()

  con.notify(num)

  con.release()

定時器 :

from threading import Thread , Timer

def func():

  print("我執行了")

Timer(1,func).start()  #定時器

建立線程的時候,就規定他多久以後去執行

相關文章
相關標籤/搜索