線程

##########################線程#################################
進程是自願分配的最小單位,線程是CPU調度的最小單位,每個進程中至少有一個線程
線程屬於進程,進程負責獲取操做系統分配給個人資源,線程負責執行代碼

進程的弊端:
開啓和關閉 以及 切換 都會帶來很大的時間開銷
過多的進程還會形成操做系統調度的壓力

進程和線程有什麼關係?
1)地址空間和其餘自願(如打開文件):進程間相互獨立,同一進程的各線程間共享.某進程內的線程在其餘進程不可見
2)通訊: 進程間通訊IPC,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊---須要進程同步和互斥手段的輔助
以保證數據的一致性
3)調度和切換:線程上下文切換比進程上下文切換要快得多
4)在多線程操做系統中,進程不是一個可執行的實體

線程的概念
進程是操做系統中最小的資源分配單位
線程是CPU調度的最小單位
線程進程之間的對比
線程不能獨立存在,必須在一個進程裏
線程的開啓 關閉以及切換的開銷要遠遠小於進程
同一個進程之間的多個線程之間數據共享
全局解釋器鎖GIL
使得一個進程中的多個線程不能充分的利用多核


從代碼的角度上來看
多進程
開啓和結束 時間開銷大
切換的效率低
內存隔離
多線程
開啓和結束 時間開銷很是小
切換效率高
內存不隔離

線程的特色:
1)輕型實體 2) 獨立調度和分派的基本單位 3)共享進程資源 4)可併發執行

Cputhon解釋器下的全局解釋器鎖
在同一進程中的多個線程在同一時刻只能有一個線程訪問CPU
多線程沒法造成並行
鎖的線程

Jpython 解釋器就沒有全局解釋器鎖

何時纔會有到CPU
程序計算的時候

IO阻塞
是不會用到CPU的
在多個進程\線程同時訪問一個數據的時候就會產生數據不安全的現象
多進程 訪問文件
多線程
同時去訪問一個數據
GIL 全局解釋器鎖
在同一個進程裏的每個線程同一時間只能有一個線程訪問cpu
儘可能不要設置全局變量
只要在多線程/進程之間用到全局變量 就加上鎖

from threading import Lock,Thread
死鎖
lock = Lock()
lock.acquire()
lock.acquire()
noodle = 100
def func(name,lock):
global noodle
lock.acquire()
noodle -= 1
lock.release()
print('%s吃到面了'%name)

if __name__ == '__main__':
lock = Lock() # 線程鎖 互斥鎖
t_lst = []
for i in range(10):
t = Thread(target=func,args=(i,lock))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(noodle)

科學家吃麪問題
import time
from threading import Thread,Lock
lock = Lock()
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s拿到了面' % name)
fork_lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
fork_lock.release() # 0.01
noodle_lock.release() # 0.01

def eat2(name):
fork_lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
noodle_lock.acquire()
print('%s拿到了面' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
noodle_lock.release()
fork_lock.release()

eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()


遞歸鎖
from threading import RLock
rlock = RLock()
rlock.acquire()
print(1)
rlock.acquire()
print(2)
rlock.acquire()
print(3)

遞歸鎖解決死鎖問題
import time
from threading import Thread,RLock
lock = RLock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release() # 0.01
lock.release() # 0.01

def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
lock.acquire()
print('%s拿到了面' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release()
lock.release()

eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()

互斥鎖解決死鎖問題
import time
from threading import Thread,Lock
lock = Lock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
print('%s拿到了叉子' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release() # 0.01

def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
print('%s拿到了面' % name)
print('%s在吃麪'%name)
time.sleep(0.5)
lock.release()

eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8個子線程 7個線程 3個線程eat1,4個線程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()

死鎖
多把鎖同時應用在多個線程中
互斥鎖和遞歸鎖哪一個好
遞歸鎖 快速恢復服務
死鎖問題的出現 是程序的設計或者邏輯的問題
還應該進一步的排除和重構邏輯來保證使用互斥鎖也不會發生死鎖
互斥鎖和遞歸鎖的區別
互斥鎖 就是在一個線程中不能連續屢次ACQUIRE
遞歸鎖 能夠在同一個線程中acquire任意次,注意acquire多少次就須要release多少次

#####################threading模塊 thread類###############
Thread類的其餘方法
Thread實例對象的方法
    # isAlive(): 返回線程是否活動的
    # getName(): 返回線程名
    #setName() :  設置線程名

threading 模塊提供的一些方法:
    #threading.currentThread():返回當前的線程變量
    #threading.enumerate(): 返回一個包含正在運行的線程的list,正在運行指線程啓動後,結束前,不包括啓動前和終止後的線程.
    #threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果

 




##信號量###
鎖+計時器
和進程的同樣, Semaphore 管理一個內置的計數器,
每當調用acquire()時 內置計數器 -1;
調用release()時 內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()

池和信號量
import time
from multiprocessing import Semaphore,Process,Pool
def ktv1(sem,i):
sem.acquire()
i +=1
sem.release()

def ktv2(i):
i +=1

if __name__=='__main__':
sem = Semaphore(5)
start = time.time()
p_1 =[]
for i in range(100):
p = Process(target=ktv1,args=(sem,i))
p.start()
p_1.append(p)
for p in p_1:p.join()
print('###信號量',time.time() - start)


start = time.time()
p = Pool(5)
p_1 = []
for i in range(100):
ret = p.apply_async(func = ktv2,args=(sem,i))
p_1.append(ret)
p.close()
p.join()
print('***池',time.time() - start)

池 效率高
池子裏面有幾個一共就起幾個
無論多少任務,池子的個數是固定的
開啓進程和關閉進程這些事都是須要固定的開銷
就不產生額外的時間開銷
且進程池中的進程數控制的好,name操做系統的壓力也小

信號量
有多少個任務就起多少進程/線程
能夠幫助你減小操做系統切換的負擔
可是並不能幫助你減小進/線程開啓和關閉的時間


#####事件
和進程同樣
wait
等 到時間內部的信號變成True就不阻塞了
set
設置信號變成True
clear
設置信號變成False
is_get
查看信號是否爲True
#數據庫鏈接
import time
import random
from threading import Event,Thread
def check(e):
    """檢測一下數據庫的網絡和個人網絡是否通"""
    print('正在檢測兩臺機器之間的網絡狀況....')
    time.sleep(random.randint(1,3))
    e.set()

def connet_db(e):
    e.wait()
    print('鏈接數據庫...')
    print('鏈接數據庫成功~~~')

e = Event()
Thread(target=connet_db,args=(e,)).start()
Thread(target=check,args=(e,)).start()
實例(數據庫鏈接)

 

######條件
使得線程等待,只有知足某條件時,才釋放n個線程
Python提供的Condition對象提供了對複雜線程同步問題的支持。Condition被稱爲條件變量,
除了提供與Lock相似的acquire和 release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,而後判斷一些條件。
若是條件不知足則wait;若是 條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後
會從新判斷條件。不斷的重 復這一過程,從而解決複雜的同步問題。
import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" % n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
        print('****')

 

##定時器
定時器,指定n秒後執行某個操做
from threading import Timer
def func():
    print('執行我啦')

interval 時間間隔
Timer(0.2,func).start()  # 定時器
建立線程的時候,就規定它多久以後去執行

 ####線程隊列python

from multiprocessing import Queue,JoinableQueue       #進程IPC隊列
from queue import Queue #線程隊列,先進先出的
from queue import LifoQueue #後進先出的

隊列Queue
先進先出
自帶鎖, 數據安全
棧 LifoQueue
後進先出
自帶鎖,數據安全
lq = LifoQueue() ##裏面若是帶有數字, lq裏面的值的數量不能超過這個數字,不然它會一直等着
lq.put(123)
lq.put(456)
lq.put('abc')
lq.put('cba')
lq.put('ccc')
lq.put('aaa')
print(lq)
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get())


from queue import PriorityQueue # 優先級隊列
pq = PriorityQueue()
pq.put((10,'aaa')) ##裏面是元組(不是必須),判斷優先級,若是是字母根據ACCII碼來判斷大小
pq.put((5,'zzz'))
print(pq.get())
print(pq.get())

 

# #python標準模塊--concurrent.futures
#1 介紹
concurrent.futures  模塊提供了高度封裝的異步調用接口(高度封裝:省事)
        #進程池/線程池的同一使用方式
ThreadPoolExecutor: 線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用


#2,基本方法
#submit(fn,*args,**kwargs)
異步提交任務

#map(func,*iterables,timeout = None ,chunksize = 1)
取代for 循環 submit的操做

#shutdown(wait = True)
至關於進程池的 pool.close()+pool.join()操做
wait = True , 等待池內全部任務執行完畢回收完資源後才繼續
wait = False, 當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前

#result(timeout = None)
取得結果

#add_done_callback(fn)
回調函數

 

import time
from threading import currentThread,get_ident
from concurrent.futures import ThreadPoolExecutor     #幫助你啓動線程池的類
from concurrent.futures import ProcessPoolExecutor    #幫助你啓動線程池的類
def func(i):
    time.sleep(1)
    print('in %s %s'%(i,currentThread()))
    return i**2
t = ThreadPoolExecutor(5)
t.map(func,range(20))
for i in range(20):
        t.submit((func,i))
map啓動多線程任務

 

import time
from threading import currentThread,get_ident
from concurrent.futures import ThreadPoolExecutor     #幫助你啓動線程池的類
from concurrent.futures import ProcessPoolExecutor    #幫助你啓動線程池的類
def func(i):
    time.sleep(1)
    print('in %s %s'%(i,currentThread()))
    return i**2

def back(fn):
    print(fn.result(),currentThread())


獲取任務結果
t = ThreadPoolExecutor(20)
ret_l = []
for i in range(20):
    ret = t.submit(func,i)
    ret_l.append(ret)
t.shutdown()
for ret in ret_l:
    print(ret.result())
print('main : ',currentThread())
submit 異步提交任務

 

import os
import time
from concurrent.futures import ProcessPoolExecutor  # 幫助你啓動線程池的類

def func(i):
    time.sleep(1)
    print('in %s %s'%(i,os.getpid()))
    return i**2

def back(fn):
    print(fn.result(),os.getpid())
if __name__ == '__main__':
    print('main : ',os.getpid())
    t = ProcessPoolExecutor(20)
    for i in range(100):
        t.submit(func,i).add_done_callback(back)
回調函數
相關文章
相關標籤/搜索