併發編程之多線程

1、什麼是多線程?
進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位。
多線程:指在一個進程中存在多個線程,多個線程共享該進程的地址空間。html

2、線程和進程的區別
1,線程共享建立它的進程的地址空間;進程有本身的地址空間。
2,線程能夠直接訪問其進程的數據段;進程有本身的父進程數據段的副本。
3,線程能夠直接與進程的其餘線程通訊;進程必須使用進程間通訊來與同胞進程通訊。
4,新線程很容易建立;新進程須要父進程的重複。
5,線程能夠對相同進程的線程進行至關大的控制;進程只能對子進程進行控制。
6,對主線程的更改(取消、優先級更改等)可能會影響進程的其餘線程的行爲;對父進程的更改不會影響子進程。python

所以咱們在特定的場景下須要使用多線程:
1.同一個進程內的多個線程共享該進程內的地址資源
2.建立線程的開銷要遠小於建立進程的開銷(建立一個進程,就是建立一個車間,涉及到申請空間,並且在該空間內建至少一條流水線,但建立線程,就只是在一個車間內造一條流水線,無需申請空間,因此建立開銷小)mysql

3、多線程應用舉例
開啓一個字處理軟件進程,該進程確定須要辦不止一件事情,好比監聽鍵盤輸入,處理文字,定時自動將文字保存到硬盤,這三個任務操做的都是同一塊數據,於是不能用多進程。只能在一個進程裏併發地開啓三個線程,若是是單線程,那就只能是,鍵盤輸入時,不能處理文字和自動保存,自動保存時又不能輸入和處理文字。
git

4、threading模塊介紹
multiprocess模塊模仿了threading模塊的接口;使用層面上很是類似github

5、開啓線程的兩種方式
方式一:
import time
import random
from threading import Threadweb

def study(name):
print("%s is learning" % name)
time.sleep(random.randint(1, 3))
print("%s is done" % name)sql

if name == 'main':
t = Thread(target=study,args=('james',))
t.start()
print("主線程開始運行....")json

方式二:
import time
import random
from threading import Thread安全

class MyThread(Thread):
def init(self, name):
super().__init__()
self.name = name多線程

def run(self):
    print("%s is learning" % self.name)
    time.sleep(random.randint(1, 3))
    print("%s is done" % self.name)

if name == 'main':
t = MyThread('james')
t.start()
print("主線程開始運行....")

6、基於多線程實現併發的套接字通訊
客戶端

_*_ coding: utf-8 _*_
from socket import *

ip_port = ('127.0.0.1',9999)
client = socket(AF_INET,SOCK_STREAM)
client.connect(ip_port)

while True:
cmd = input(">>>").strip()
if not cmd:
continue
client.send(cmd.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()

服務端
import multiprocessing
import threading
import socket

ip_port = ('127.0.0.1',9999)
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(ip_port)
s.listen(5)

def action(conn):
while True:
data = conn.recv(1024)
print(data)
conn.send(data.upper())
if name == 'main':
while True:
conn,addr = s.accept()

p = threading.Thread(target=action,args=(conn,))
    p.start()

7、多線程與多進程的區別
進程要申請內存空間,開進程的開銷遠遠大於開線程

在主進程下開啓線程
import time
import random
from multiprocessing import Process
from threading import Thread

def study(name):
print("%s is learning"%name)
time.sleep(random.randint(1,3))
print("%s is playing" % name)

if name == 'main':
t = Process(target=study,args=('james',))
t.start()
print("主進程程開始運行....")

執行結果:在t.start()的同時將線程開啓了

在主進程下開啓子進程
import time
import random
from multiprocessing import Process
from threading import Thread

def study(name):
print("%s is learning"%name)
time.sleep(random.randint(1,3))
print("%s is playing" % name)

if name == 'main':

t = Thread(target=study,args=('james',))
t.start()
print("主線程開始運行....")

執行結果:p.start()將開啓進程的信號發給操做系統後,操做系統要申請內存空間,讓拷貝好的父進程地址空間到子進程

查看PID
在主線程下開啓多個線程,每一個線程都跟主線程的pid同樣(線程共享主進程的pid)
from threading import Thread
import os

def work():
print('hello',os.getpid())

if name == 'main':
t1=Thread(target=work)
t2=Thread(target=work)
t1.start()
t2.start()
print('主線程/主進程pid',os.getpid())

開多個進程,每一個進程都有不一樣的pid
from multiprocessing import Process
import os

def work():
print('hello',os.getpid())

if name == 'main':
p1=Process(target=work)
p2=Process(target=work)
p1.start()
p2.start()
print('主線程/主進程',os.getpid())

同一進程內的線程共享該進程的數據
進程之間地址空間是隔離的
from multiprocessing import Process
import os

def work():
global n
n=0

if name == 'main':
n=100
p=Process(target=work)
p.start()
p.join()
print('主',n)

執行結果:子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100

from threading import Thread
import os

def work():
global n
n=0

if name == 'main':
n=100
t=Thread(target=work)
t.start()
t.join()
print('主',n)

執行結果:查看結果爲0,由於同一進程內的線程之間共享進程內的數據

8、Thread對象的其餘屬性或方法
介紹
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。

threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、
結束前,不包括啓動前和終止後的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

from threading import Thread
from threading import current_thread
import time

def task():
print("%s is running"%current_thread().getName())
time.sleep(1)
print("%s is done" % current_thread().getName())

if name =='main':
#沒有子線程這個概念,只是爲了理解方便
t = Thread(target=task,name='子線程1')
t.start()
t.setName('兒子線程1')
print("主線程 %s" % current_thread().getName())
#主線程 MainThread
#子線程1 is running
#兒子線程1 is done

主線程等待子線程結束
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)

if name == 'main':
t=Thread(target=sayhi,args=('james',))
t.start()
t.join()
print('主線程')
print(t.is_alive())
'''
james say hello
主線程
False
'''

9、守護線程
1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束。
2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

from threading import Thread
import time

def task(name):
time.sleep(1)
print("%s is working"%name)

if name == 'main':
t = Thread(target=task,args=('james',))
#t.setDaemon(True)#必須在t,start()以前設置,和t.daemon是同樣的做用
t.daemon =True
t.start()
print("主線程")
print(t.is_alive())

互斥鎖:與進程的互斥鎖相似

mutex

from threading import Thread,Lock
import time

n=100

def task():
global n
mutex.acquire()
temp=n
time.sleep(0.1)
n=temp-1
mutex.release()

if name == 'main':
mutex=Lock()
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start()

for t in t_l:
    t.join()

print('主',n)

10、GIL全局解釋器鎖
GIL本質就是一把互斥鎖,既然是互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。
每次執行python程序,都會產生一個獨立的進程。

GIL與Lock的區別
鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據
保護不一樣的數據就應該加不一樣的鎖
GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的,後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock。

from threading import Thread,Lock
import os,time
def work():
global n
lock.acquire()
temp=n
time.sleep(0.1)
n=temp-1
lock.release()
if name == 'main':
lock=Lock()
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()

print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全,不加鎖則結果可能爲99

GIL與多線程
有了GIL的存在,同一時刻同一進程中只有一個線程被執行
一、對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用(CPU密集型代碼)
二、固然對運行一個程序來講,隨着cpu的增多執行效率確定會有所提升(IO密集型代碼)

在python多線程下,每一個線程的執行方式:
1.獲取GIL
2.執行代碼直到sleep或者是python虛擬機將其掛起
3.釋放GIL

每次釋放GIL鎖,線程進行鎖競爭,切換線程,會消耗資源。而且因爲GIL鎖存在,python裏一個進程永遠只能同時執行一個線程(拿到GIL的線程才能執行),這就是爲何在多核CPU上,python的多線程效率並不高的緣由。

多線程性能測試
若是併發的多個任務是計算密集型:多進程效率高
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i

if name == 'main':
l=[]
print(os.cpu_count()) #本機爲4核
start=time.time()
for i in range(4):
p=Process(target=work) #耗時5s多
p=Thread(target=work) #耗時18s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))

若是併發的多個任務是I/O密集型:多線程效率高
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
time.sleep(2)
print('===>')

if name == 'main':
l=[]
print(os.cpu_count()) #本機爲4核
start=time.time()
for i in range(400):
# p=Process(target=work) #耗時12s多,大部分時間耗費在建立進程上
p=Thread(target=work) #耗時2s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))

應用:多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

11、死鎖現象與遞歸鎖
死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A鎖\033[0m' %self.name)

mutexB.acquire()
    print('\033[42m%s 拿到B鎖\033[0m' %self.name)
    mutexB.release()

    mutexA.release()

def func2(self):
    mutexB.acquire()
    print('\033[43m%s 拿到B鎖\033[0m' %self.name)
    time.sleep(2)

    mutexA.acquire()
    print('\033[44m%s 拿到A鎖\033[0m' %self.name)
    mutexA.release()

    mutexB.release()

if name == 'main':
for i in range(10):
t=MyThread()
t.start()

遞歸鎖
遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖,兩者的區別是:遞歸鎖能夠連續acquire屢次,而互斥鎖只能acquire一次

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A鎖\033[0m' %self.name)

mutexB.acquire()
    print('\033[42m%s 拿到B鎖\033[0m' %self.name)
    mutexB.release()

    mutexA.release()

def func2(self):
    mutexB.acquire()
    print('\033[43m%s 拿到B鎖\033[0m' %self.name)
    time.sleep(2)

    mutexA.acquire()
    print('\033[44m%s 拿到A鎖\033[0m' %self.name)
    mutexA.release()

    mutexB.release()

if name == 'main':
for i in range(10):
t=MyThread()
t.start()

12、信號量,Event,定時器
信號量
信號量也是一把鎖,能夠指定信號量爲5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間能夠有5個任務拿到鎖去執行,若是說互斥鎖是合租房屋的人去搶一個廁所,那麼信號量就至關於一羣路人爭搶公共廁所,公共廁全部多個坑位,這意味着同一時間能夠有多我的上公共廁所,但公共廁所容納的人數是必定的,這即是信號量的大小

from threading import Thread,Semaphore
import threading
import time

def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3)
sm.release()

if name == 'main':
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start()

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

Event
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

from threading import Event
event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
count=1
while not event.is_set():
if count > 3:
raise TimeoutError('連接超時')
print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count))
event.wait(0.5)
count+=1
print('<%s>連接成功' %threading.current_thread().getName())

def check_mysql():
print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
time.sleep(random.randint(2,4))
event.set()
if name == 'main':
event=Event()
conn1=Thread(target=conn_mysql)
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql)

conn1.start()
conn2.start()
check.start()

定時器
定時器,指定n秒後執行某操做
from threading import Timer

def hello():
print("hello, world")

t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed

十3、線程queue
線程queue有三種不一樣的用法
1,class queue.Queue(maxsize=0) #隊列:先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
2,class queue.LifoQueue(maxsize=0) #堆棧:last in fisrt out
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

3,class queue.PriorityQueue(maxsize=0) #優先級隊列:存儲數據時可設置優先級的隊列
import queue

q=queue.PriorityQueue()

put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高

q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())

十4、進程池與線程池
官網:https://docs.python.org/dev/library/concurrent.futures.html
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

基本方法
一、submit(fn, *args, **kwargs) 異步提交任務
二、map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做
三、shutdown(wait=True) 至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前
四、result(timeout=None) 取得結果
五、add_done_callback(fn) 回調函數

進程池
用法:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2

if name == 'main':

executor=ProcessPoolExecutor(max_workers=3)

futures=[]
for i in range(11):
    future=executor.submit(task,i)
    futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
    print(future.result())

線程池
用法
把ProcessPoolExecutor換成ThreadPoolExecutor,其他用法所有相同

map方法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2

if name == 'main':

executor=ThreadPoolExecutor(max_workers=3)

# for i in range(11):
#     future=executor.submit(task,i)

executor.map(task,range(1,12)) #map取代了for+submit

回調函數
能夠爲進程池或線程池內的每一個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢後自動觸發,並接收任務的返回值看成參數,該函數稱爲回調函數

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
print('<進程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}

def parse_page(res):
res=res.result()
print('<進程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)

if name == 'main':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]

p=ProcessPoolExecutor(3)
for url in urls:
    p.submit(get_page,url).add_done_callback(parse_page)

#parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果

相關文章
相關標籤/搜索