併發編程【五】線程

線程

 互斥鎖
互斥鎖:在同一個進程內,也有鎖的競爭關係
在同一個進程中連續acquire屢次會產生死鎖
複製代碼
from multiprocessing import Lock
"""
互斥鎖:在同一個進程內,也有鎖的競爭關係
在同一個進程中連續acquire屢次會產生死鎖
"""
lock = Lock()
lock.acquire()  # 拿走鑰匙
print(123)
lock.acquire()  # 又想拿鑰匙 卡住
print(456)


lock = Lock()
lock.acquire()  # 拿走鑰匙
print(123)
lock.release()  # 還鑰匙
lock.acquire()  # 第二我的才能拿到鑰匙,繼續操做
print(456)
複製代碼

Queue隊列

如何解決Queue中,往隊列裏放的值超出設定值的阻塞問題,及put_nowait數據丟失問題
複製代碼
import queue
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(1)
q.put(1)
q.put(1)
q.put(1)
print('阻塞前')
try:
    q.put_nowait(1) # 放不進去 了,數據丟失
except queue.Full:  # 想要數據不丟失,本身建立個容器,在except中將數據添加到容器中
    pass
print('阻塞後')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
try:
    q.get_nowait()  # 沒有值的話就不會在等了
except queue.Empty:
    pass
複製代碼

 進程和線程的關係

 線程與進程的區別能夠概括爲如下4點:
  1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。
  2)通訊: 進程間通訊 IPC,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊——須要 進程同步和互斥手段的輔助,以保證數據的一致性。
  3)調度和切換:線程上下文切換比進程上下文切換要快得多。
  4)在多線程操做系統中,進程不是一個可執行的實體。

線程的建立

線程的建立方式和進程的建立方式大體相同html

建立線程用threading模塊python

複製代碼
Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
複製代碼
import os
import time
from threading import Thread

# 建立線程
def func(i):
    time.sleep(0.2)
    print('in func', i ,os.getpid())
print('in main',os.getpid())
for i in range(20):
    Thread(target=func,args=(i,)).start()
"""
開的20個線程是在這個進程內的因此他們的進程id就是本程序的進程id
if __name__ == '__main__'在開啓線程的時候能夠不加
    在線程部分不須要經過import來爲新的線程獲取代碼
    由於新的線程和以前的主線程共享同一段代碼
    不須要import 也就不存在在子線程中又重複了一次建立線程的操做
    因此就沒必要要if __name__ == '__main__'
"""

線程的建立
線程的建立
from threading import Thread
def func():
    print('這裏是線程')
Thread(target=func).start()  # 建立一個線程
建立進程
from threading import Thread
def func():
    print('這裏是線程')
for i in range(100):
    Thread(target=func).start()  # 建立一個線程
建立多線程
from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進程pid',os.getpid())

    #part2:開多個進程,每一個進程都有不一樣的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進程pid',os.getpid())

多線程與多進程pid比較
多線程與多進程pid比較
開銷 開啓進程\線程與銷燬進程\線程
import time
from multiprocessing import Process
from threading import Thread

def func(a):
    a += 1

if __name__ == '__main__':
    start = time.time()
    t_lis = []
    for i in range(100):
        t = Thread(target=func, args=(i,))
        t.start()
        t_lis.append(t)
    for j in t_lis: j.join()
    print('threading', time.time() - start)  # 計算線程的開啓與銷燬時間threading 0.01236867904663086

    p_lis = []
    for i in range(100):
        p = Process(target=func,args=(i,))
        p.start()
        p_lis.append(p)
    for j in p_lis:j.join()
    print('process:',time.time()-start)  # 計算進程的開啓與銷燬時間process: 1.4895079135894775

進程和線程的開啓效率
進程和線程的開啓效率
多個線程之間的全局變量是共享的
from threading import Thread
pn = 0
def func():
    global pn
    pn+=1
t_l = []
for i in range(100):
    t = Thread(target=func) # 每一個線程都去加1,數據共享
    t.start()
    t_l.append(t)
for j in t_l:j.join()
print(pn)   # 100

進程之間數據隔離
from multiprocessing import Process
pn = 0
def func():
    global pn
    pn+=1
if __name__ == '__main__':
    t_l = []
    for i in range(100):
        t = Process(target=func) # 根本操做不到子進程
        t.start()
        t_l.append(t)
    for j in t_l:j.join()
    print(pn)   # 0

線程與進程內存數據的共享問題
線程與進程內存數據的共享問題
import time
import random
from threading import Thread,currentThread,active_count
def func():
    time.sleep(random.random())
    print(currentThread().name)
for i in range(10):
    Thread(target=func).start()
print(active_count())            # 返回當前有多少個正在工做的線程
print(currentThread().name) # 查看當前線程的變量




for i in range(10):
    t = Thread(target=func)
    t.start()
    print(t)   # <Thread(Thread-1, started 11544)>

print(currentThread())  # <Thread(Thread-1, started 11544)>

他倆是同樣的效果

threading的其餘用法
import time
import random
from threading import Thread,currentThread,active_count
def func():
    time.sleep(random.random())
    print(currentThread().name)
for i in range(10):
    Thread(target=func).start()
print(active_count())            # 返回當前有多少個正在工做的線程
print(currentThread().name) # 查看當前線程的變量




for i in range(10):
    t = Thread(target=func)
    t.start()
    print(t)   # <Thread(Thread-1, started 11544)>

print(currentThread())  # <Thread(Thread-1, started 11544)>

他倆是同樣的效果

threading的其餘用法

按照順序把列表中的每個元素都計算平方,使用多線程方式,而且結果按照順序返回程序員

import time
import random
from threading import Thread,currentThread
dic = {}  # {15020: 64, 11284: 25, 8488: 100, 4464: 9, 11488: 36, 14152: 4, 12068: 16, 8080: 81, 8496: 1, 5788: 49}
def func(a):
    p = a * a
    time.sleep(random.random())
    t = currentThread()
    dic[t.ident]= p    # {15020: 64}
lst = []  # [<Thread(Thread-1, stopped 8496)>, <Thread(Thread-2, stopped 14152)>, <Thread(Thread-3, stopped 4464)>,
for i in range(1,11):
    t = Thread(target=func,args=(i,))
    t.start()
    lst.append(t)  # 將線程對象添加到列表中 t.ident查看id
for j in lst:
    j.join()
    print(dic[j.ident]) # 取值的時候根據你添加時候的線程id取值

from threading import Thread
def func():
    print('這裏是線程')
for i in range(100):
    Thread(target=func).start()  # 建立一個線程

按順序返回結果
按順序返回結果

發佈了任務不表明當即執行,執行任務須要操做系統幫咱們調度,由於時間片輪轉何時操做系統不忙了或者輪到某個線程了纔會執行相應的任務;安全

全局解釋器鎖GIL

Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中能夠「運行」多個線程,但在任意時刻只有一個線程在解釋器中運行。
  對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。多線程

  在多線程環境中,Python 虛擬機按如下方式執行:併發

  a、設置 GIL;app

  b、切換到一個線程去運行;dom

  c、運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用 time.sleep(0));異步

  d、把線程設置爲睡眠狀態;socket

  e、解鎖 GIL;

  d、再次重複以上全部步驟。
  在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束爲止(因爲在這期間沒有Python的字節碼被運行,因此不會作線程切換)編寫擴展的程序員能夠主動解鎖GIL。

 

消除GIL反作用的兩種方案:
1.使用多進程,讓多個核心都工做
2.使用其餘語言編寫的解釋器,此時使用多線程,也能運行在多個核心上.

守護線程

不管是進程仍是線程,都遵循:守護xx會等待主xx運行完畢後被銷燬。須要強調的是:運行完畢並不是終止運行

#1.對主進程來講,運行完畢指的是主進程代碼運行完畢
#2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,
#2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
import time
from threading import Thread
def daemon_func():  # 主進程會等待子線程的結束而結束
    while True:
        time.sleep(0.2)
        print('子線程')
Thread(target=daemon_func).start()

主進程會等待子線程的結束而結束
主進程會等待子線程的結束而結束
import time
from threading import Thread
def daemon_func():  # 守護線程會等主線程結束而結束
    while True:
        time.sleep(1)
        print('守護線程')
t = Thread(target=daemon_func)
t.daemon = True
t.start()
time.sleep(5)
print('主線程結束')

守護線程會等主線程結束而結束
守護線程會等主線程結束而結束
mport time
from threading import Thread
def daemon_func():
    while True:
        time.sleep(1)
        print('守護線程')
def son_func():
    print('start son')
    time.sleep(1)
    print('end son')
t = Thread(target=daemon_func)
t.daemon = True
t.start()
Thread(target=son_func).start()
time.sleep(5)
print('主線程結束')

守護線程會守護主線程和全部的子線程
守護線程會守護主線程和全部子線程

進程會隨着主線程的結束而結束

複製代碼
問題:
     1.主線程需不須要回收子線程的資源
         不須要,線程資源屬於進程,因此進程結束了,線程的資源天然就被回收了
     2.主線程爲何要等待子線程結束以後才結束
         主線程結束意味着進程結束,進程結束,全部的子線程都會結束
         要想讓子線程可以順利執行完,主線程只能等
     3.守護線程究竟是怎麼結束的
         主線程結束了,主進程也結束,守護線程被主進程的結束結束掉了

守護進程 :只會守護到主進程的代碼結束
守護線程 :會守護全部其餘非守護線程的結束
複製代碼

GIL和鎖的關係

複製代碼
# 數據不安全問題
# 在線程中也是會出現數據不安全的
    # 1.對全局變量進行修改
    # 2.對某個值 += -= *= /=  list[0] += 1  dic[key] -= 1
# 經過加鎖來解決

# list pop append extend insert remove   安全
# dict pop update    安全

# list pop/append pop列表爲空的時候會報錯
# queue put/get get隊列爲空的時候會等待

複製代碼
import time
from threading import Thread,Lock
count = 0
def add_func():
    global count
    for i in range(100000):
        count += 1
def sub_func():
    global count
    time.sleep(0.2)
    for i in range(100000):
        count -= 1
lis = []
for i in range(5):
    t = Thread(target=add_func)  # 開5個線程作10萬加一操做
    t.start()
    lis.append(t)
    t1 = Thread(target=sub_func) # 開5個線程作10萬減一操做
    t1.start()
    lis.append(t1)
for j in lis:j.join()
print(count)

數據不安全
數據不安全
import time
from threading import Thread,Lock
count = 0
def add_func(lock):
    global count
    for i in range(100000):
        with lock:
            count += 1
def sub_func(lock):
    global count
    time.sleep(0.1)
    for i in range(100000):
        with lock:
            count -= 1
lis = []
lock = Lock()
for i in range(5):
    t = Thread(target=add_func,args=(lock,))  # 開5個線程作10萬加一操做
    t.start()
    lis.append(t)
    t1 = Thread(target=sub_func,args=(lock,)) # 開5個線程作10萬減一操做
    t1.start()
    lis.append(t1)
for j in lis:j.join()
print(count)

加鎖後計算
加鎖後計算

爲何有了GIL鎖還要給線程加鎖?

由於GIL鎖只能保證原子型操做的數據安全(append),對於那些可拆分(a+=1)的來講修改數據仍是不安全;因此須要咱們本身加鎖來保證數據安全:

遞歸鎖

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

死鎖
死鎖

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

遞歸鎖
遞歸鎖

典型問題:科學家吃麪問題

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

死鎖
死鎖
import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

遞歸鎖解決死鎖問題

遞歸鎖解決死鎖問題
遞歸鎖解決死鎖問題

線程隊列

queue隊列 :使用import queue,用法與進程Queue同樣

class queue.Queue(maxsize=0) #先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

先進先出
先進先出

class queue.LifoQueue(maxsize=0) #last in fisrt ou

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

後進先出
後進先出

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

優先級隊列
優先級隊列

進程/線程池

#1 介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做

#shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數

# done()
判斷某一個線程是否完成

# cancle()
取消某個任務

介紹
介紹
rom concurrent.futures import ProcessPoolExecutor

def get_html(i):
    return i**2

def parser_page(n):
    print('進程將值傳給了我%s'%n.result()) # 10個任務異步執行,誰先執行完就將結果傳給我

if __name__ == '__main__':
    p = ProcessPoolExecutor(5)        # 進程池中放幾個進程,獲得進程池對象
    ret = p.map(get_html,range(10))     # 綁定方法,提交任務,返回一個可迭代對象(生成器)
    for i in ret:
        print(i)

    # -------------------------------------------------------
    lis = []
    for i in range(10):
        ret = p.submit(get_html, i)  # 綁定方法,提交任務,接收到任務對象
        lis.append(ret)
    p.shutdown()
    for j in lis:
        print(j.result())  # 爲了防止阻塞咱們將全部的任務對象都添加到列表裏

    # -------------------------------------------------------
    for i in range(10):
        ret = p.submit(get_html,i)    # 綁定方法,提交任務,接收到任務對象
        print(ret.result())           # 獲取任務函數的返回值,(阻塞方法)
        ret.add_done_callback(parser_page)  # 回調函數,每執行完一個任務都執行一次回調函數(ret傳給parser_page)
    p.shutdown()                      # 等待進程池中全部的進程都結束完畢結束阻塞(阻塞方法)

建立一個進程池
建立一個進程池
ret = p.map(get_html,range(10))   # 綁定方法,提交任務,返回一個可迭代對象(生成器)
# 池對象的方法:
        submit()   # 綁定方法,提交任務,接收到任務對象
        shutdown() # 等待進程池中全部的進程都結束完畢結束阻塞(阻塞方法)
# 任務對象的方法:
        result()   # 獲取任務函數的返回值,(阻塞方法)
        add_done_callback() # 回調函數,每執行完一個任務都執行一次回調函數(都將任務結果傳給回調函數)
View Code
from urllib.request import urlopen
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def get_html(name,addr):
    ret = urlopen(addr)
    return {'name':name,'content':ret.read()}

def parser_page(ret_obj):
    dic = ret_obj.result()
    with open(dic['name']+'.html','wb') as f:
        f.write(dic['content'])

url_lst = {
    '協程':'http://www.cnblogs.com/Eva-J/articles/8324673.html',
    '線程':'http://www.cnblogs.com/Eva-J/articles/8306047.html',
    '目錄':'https://www.cnblogs.com/Eva-J/p/7277026.html',
    '百度':'http://www.baidu.com',
    'sogou':'http://www.sogou.com'
}

t = ThreadPoolExecutor(20)
for url in url_lst:
    task = t.submit(get_html,url,url_lst[url])

# 使用多線程去執行get_html獲取網頁對應的內容
# 一旦get_html執行結束以後,當即使用parser_page來分析獲取的頁面結果
    task.add_done_callback(parser_page)

# 進程池 除非高計算型的場景不然幾乎不用  CPU的個數*2
# 線程池 cpu的個數*5
# 4 cpu 4*20 = 80

建立一個線程池
建立一個線程池

線程實現socket server併發

import socket
from threading import Thread
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()

def func(conn):
    while True:
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.send(b'hello')
while True:
    conn,addr = sk.accept()
    t = Thread(target=func,args=(conn,))
    t.start()

server
server
相關文章
相關標籤/搜索