網絡編程和併發之多線程編程

多線程threadinghtml

線程與進程的區別能夠概括爲如下4點:
  1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。
  2)通訊: 進程間通訊 IPC,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊——須要 進程同步和互斥手段的輔助,以保證數據的一致性。
  3)調度和切換:線程上下文切換比進程上下文切換要快得多。
  4)在多線程操做系統中,進程不是一個可執行的實體。

 

1.threading模塊python

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性mysql

1.1模塊的建立git

 1 from threading import Thread  2 import time  3 def sayhi(name):  4     time.sleep(2)  5     print('%s say hello' %name)  6 
 7 if __name__ == '__main__':  8     t=Thread(target=sayhi,args=('egon',))  9  t.start() 10     print('主線程')
建立線程的方式1
 1 from threading import Thread  2 import time  3 class Sayhi(Thread):  4     def __init__(self,name):  5         super().__init__()  6         self.name=name  7     def run(self):  8         time.sleep(2)  9         print('%s say hello' % self.name) 10 
11 
12 if __name__ == '__main__': 13     t = Sayhi('egon') 14  t.start() 15     print('主線程')
建立線程的方式2

1.2多線程與多進程比較程序員

 1 from threading import Thread  2 from multiprocessing import Process  3 import os  4 
 5 def work():  6     print('hello',os.getpid())  7 
 8 if __name__ == '__main__':  9     #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
10     t1=Thread(target=work) 11     t2=Thread(target=work) 12  t1.start() 13  t2.start() 14     print('主線程/主進程pid',os.getpid()) 15 
16     #part2:開多個進程,每一個進程都有不一樣的pid
17     p1=Process(target=work) 18     p2=Process(target=work) 19  p1.start() 20  p2.start() 21     print('主線程/主進程pid',os.getpid())
pid的比較
 1 from threading import Thread  2 from multiprocessing import Process  3 import os  4 
 5 def work():  6     print('hello')  7 
 8 if __name__ == '__main__':  9     #在主進程下開啓線程
10     t=Thread(target=work) 11  t.start() 12     print('主線程/主進程') 13     '''
14  打印結果: 15  hello 16  主線程/主進程 17     '''
18 
19     #在主進程下開啓子進程
20     t=Process(target=work) 21  t.start() 22     print('主線程/主進程') 23     '''
24  打印結果: 25  主線程/主進程 26  hello 27     '''
開啓效率的較量
 1 from  threading import Thread  2 from multiprocessing import Process  3 import os  4 def work():  5     global n  6     n=0  7 
 8 if __name__ == '__main__':  9     # n=100
10     # p=Process(target=work)
11     # p.start()
12     # p.join()
13     # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100
14 
15 
16     n=1
17     t=Thread(target=work) 18  t.start() 19  t.join() 20     print('',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據
21 同一進程內的線程共享該進程的數據?
內存數據的共享問題

1.3Thread類的其餘方法github

 

Thread實例對象的方法 # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。
 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進程下開啓線程
    t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count()) print('主線程/主進程') ''' 打印結果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主線程/主進程 Thread-1 '''
代碼示例
 1 from threading import Thread  2 import time  3 def sayhi(name):  4     time.sleep(2)  5     print('%s say hello' %name)  6 
 7 if __name__ == '__main__':  8     t=Thread(target=sayhi,args=('egon',))  9  t.start() 10  t.join() 11     print('主線程') 12     print(t.is_alive()) 13     '''
14  egon say hello 15  主線程 16  False 17     '''
join方法

1.4守護線程sql

不管是進程仍是線程,都遵循:守護xx會等待主xx運行完畢後被銷燬。須要強調的是:運行完畢並不是終止運行json

#1.對主進程來講,運行完畢指的是主進程代碼運行完畢 #2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束, #2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
詳細解釋
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置
 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
守護線程例1
 1 from threading import Thread  2 import time  3 def foo():  4     print(123)  5     time.sleep(1)  6     print("end123")  7 
 8 def bar():  9     print(456) 10     time.sleep(3) 11     print("end456") 12 
13 
14 t1=Thread(target=foo) 15 t2=Thread(target=bar) 16 
17 t1.daemon=True 18 t1.start() 19 t2.start() 20 print("main-------")
守護線程例2

 

2.鎖安全

2.1GIL鎖服務器

Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中能夠「運行」多個線程,但在任意時刻只有一個線程在解釋器中運行。
  對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。

  在多線程環境中,Python 虛擬機按如下方式執行:

  a、設置 GIL;

  b、切換到一個線程去運行;

  c、運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用 time.sleep(0));

  d、把線程設置爲睡眠狀態;

  e、解鎖 GIL;

  d、再次重複以上全部步驟。
  在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束爲止(因爲在這期間沒有Python的字節碼被運行,因此不會作線程切換)編寫擴展的程序員能夠主動解鎖GIL。

2.2同步鎖

from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1
if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果可能爲99
多個線程搶佔資源的狀況
import threading R=threading.Lock() R.acquire() ''' 對公共數據的操做 ''' R.release()
 1 from threading import Thread,Lock  2 import os,time  3 def work():  4     global n  5  lock.acquire()  6     temp=n  7     time.sleep(0.1)  8     n=temp-1
 9  lock.release() 10 if __name__ == '__main__': 11     lock=Lock() 12     n=100
13     l=[] 14     for i in range(100): 15         p=Thread(target=work) 16  l.append(p) 17  p.start() 18     for p in l: 19  p.join() 20 
21     print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
同步鎖的引用
#不加鎖:併發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1


if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''


#不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行
    time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行
 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''

#有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊 #沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是 #start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的 #單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1


if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 '''
互斥鎖與join的區別

2.3死鎖

進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一切說了額

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
死鎖

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
遞歸鎖RLock

典型問題:科學家吃麪

import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
死鎖問題
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
遞歸鎖解決死鎖問題

 

3.信號量

同進程的同樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ' get semaphore') # time.sleep(2) # sm.release()
def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
實例
池與信號量:與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

 

4.事件

同進程的同樣

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。

 例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做

import threading import time,random from threading import Thread,Event def conn_mysql(): count=1
    while not event.is_set(): if count > 3: raise TimeoutError('連接超時') print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1
    print('<%s>連接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
實例

 

5.線程隊列

queue隊列 :使用import queue,用法與進程Queue同樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

 class queue.Queue(maxsize=0) #先進先出

import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
先進先出

class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
後進先出

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
優先級隊列

 

6.Python標準模塊--concurrent.futures

https://docs.python.org/dev/library/concurrent.futures.html

#1 介紹
concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs)
異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做 #shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 #result(timeout=None)
取得結果 #add_done_callback(fn)
回調函數
#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised. #用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2

if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
ProcessPoolExecutor
#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. #用法
與ProcessPoolExecutor相同
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2

if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11):
    # future=executor.submit(task,i)
 executor.map(task,range(1,12)) #map取代了for+submit
map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3)
    # for url in urls:
    # p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()
 p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
回調函數
相關文章
相關標籤/搜索