方法形式建立線程python
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')
類形式建立線程(必須內含有 run方法 以及繼承 Thread)mysql
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
join 方法實例sql
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主線程') print(t.is_alive()) ''' egon say hello 主線程 False '''
守護線程實例數據庫
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
用法和場景同 進程 編程
import threading R=threading.Lock() R.acquire() ''' 對公共數據的操做 ''' R.release()
死鎖實例多線程
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
科學家吃麪死鎖問題app
import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
解決方式:重入鎖dom
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
信號量 Semaphore 舉例異步
from threading import Thread,Semaphore,currentThread import time,random sm = Semaphore(5) #運行的時候有5我的 def task(): sm.acquire() print('\033[42m %s上廁所'%currentThread().getName()) time.sleep(random.randint(1,3)) print('\033[31m %s上完廁所走了'%currentThread().getName()) sm.release() if __name__ == '__main__': for i in range(20): #開了10個線程 ,這20人都要上廁所 t = Thread(target=task) t.start()
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。async
若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。
爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。
在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。
一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。
若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
簡單來講:
當存在多個線程之間有這狀態依附時,使用 Event
Event 對象的 初始 狀態值 爲 False 當 isSet 判斷狀態時就一直 wait 阻塞
當達到需求的狀態時,經過 set 將狀態置 True (或者經過 clear 恢復成 False)
爲 True 時才能夠正確下面的代碼執行
from threading import Event Event.isSet() #返回event的狀態值 Event.wait() #若是 event.isSet()==False將阻塞線程; Event.set() #設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; Event.clear() #恢復
# 首先定義兩個函數,一個是鏈接數據庫 # 一個是檢測數據庫 from threading import Thread, Event, currentThread import time e = Event() def conn_mysql(): '''連接數據庫''' count = 1 while not e.is_set(): # 當沒有檢測到時候 is_set 爲 False if count > 3: # 若是嘗試次數大於3,就主動拋異常 raise ConnectionError('嘗試連接的次數過多') print('\033[45m%s 第%s次嘗試' % (currentThread(), count)) e.wait(timeout=1) # 等待檢測(裏面的參數是超時1秒) count += 1 print('\033[44m%s 開始連接...' % (currentThread().getName())) def check_mysql(): '''檢測數據庫''' print('\033[42m%s 檢測mysql...' % (currentThread().getName())) time.sleep(5) e.set() # 檢測成功後設置爲 True if __name__ == '__main__': for i in range(3): # 三個去連接 t = Thread(target=conn_mysql) t.start() t = Thread(target=check_mysql) t.start()
from threading import Thread,Event,currentThread import time e = Event() def traffic_lights(): '''紅綠燈''' time.sleep(5) e.set() def car(): '''車''' print('\033[42m %s 等綠燈\033[0m'%currentThread().getName()) e.wait() print('\033[44m %s 車開始通行' % currentThread().getName()) if __name__ == '__main__': for i in range(10): t = Thread(target=car) #10輛車 t.start() traffic_thread = Thread(target=traffic_lights) #一個紅綠燈 traffic_thread.start()
from threading import Timer def func(n): print('hello,world',n) t = Timer(3,func,args=(123,)) #等待三秒後執行func函數,由於func函數有參數,那就再傳一個參數進去 t.start()
在Cpython解釋器中,由於有GIL鎖的存在同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點。
同一時刻同一進程中只有一個線程被執行
from concurrent.futures import ThreadPoolExecutor from threading import currentThread import os,time,random def task(n): print('%s:%s is running'%(currentThread().getName(),os.getpid())) #看到的pid都是同樣的,由於線程是共享了一個進程 time.sleep(random.randint(1,3)) #I/O密集型的,,通常用線程,用了進程耗時長 return n**2 if __name__ == '__main__': start = time.time() p = ThreadPoolExecutor() #線程池 #若是不給定值,默認cup*5 l = [] for i in range(10): #10個任務 # 線程池效率高了 obj = p.submit(task,i) #至關於apply_async異步方法 l.append(obj) p.shutdown() #默認有個參數wite=True (至關於close和join) print('='*30) print([obj.result() for obj in l]) print(time.time() - start) #3.001171827316284 """ ThreadPoolExecutor-0_0:12816 is running ThreadPoolExecutor-0_1:12816 is running ThreadPoolExecutor-0_2:12816 is running ThreadPoolExecutor-0_3:12816 is running ThreadPoolExecutor-0_4:12816 is running ThreadPoolExecutor-0_5:12816 is running ThreadPoolExecutor-0_6:12816 is running ThreadPoolExecutor-0_7:12816 is running ThreadPoolExecutor-0_8:12816 is running ThreadPoolExecutor-0_9:12816 is running ============================== [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 3.0191218852996826 """
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import requests import time,os def get_page(url): print('<%s> is getting [%s]'%(os.getpid(),url)) response = requests.get(url) if response.status_code==200: return {'url':url,'text':response.text} def parse_page(res): res = res.result() # 須要用 result 拿到對象結果 print('<%s> is getting [%s]'%(os.getpid(),res['url'])) with open('db.txt','a') as f: parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text'])) f.write(parse_res) if __name__ == '__main__': # p = ThreadPoolExecutor() p = ProcessPoolExecutor() l = [ 'http://www.baidu.com', 'http://www.baidu.com', 'http://www.baidu.com', 'http://www.baidu.com', ] for url in l: res = p.submit(get_page,url).add_done_callback(parse_page) #這裏的回調函數拿到的是一個對象。 # 須要先把返回的res獲得一個結果。即在前面加上一個res.result() ,誰好了誰去掉用回調函數 # 回調函數也是一種編程思想。不只開線程池用,開線程池也用 p.shutdown() #至關於進程池裏的close和join print('主',os.getpid())
map 也能夠回調函數,因此功能上能夠替換
# 咱們的那個p.submit(task,i)和map函數的原理相似。咱們就 # 能夠用map函數去代替。更減縮了代碼 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random def task(n): print('[%s] is running'%os.getpid()) time.sleep(random.randint(1,3)) #I/O密集型的,,通常用線程,用了進程耗時長 return n**2 if __name__ == '__main__': p = ProcessPoolExecutor() obj = p.map(task,range(10)) p.shutdown() #至關於close和join方法 print('='*30) print(obj) #返回的是一個迭代器 print(list(obj)) # map函數應用