[TOC]python
程序:例如XXXX.py這是程序,處於靜態的。數據庫
進程:一個程序運行起來後,代碼+用到的資源稱之爲進程,它是操做系統分配資源的基本單元。緩存
在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;
在當代面向線程設計的計算機結構中,進程是線程的容器。bash
所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。
所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列。多線程
阻塞和非阻塞跟同步和異步無關,主要與程序等待消息通知時的狀態有關。也就是說阻塞與非阻塞主要是從程序(線程)等待消息通知時的狀態角度來說的。併發
1)並行,parallel 強調同一時刻同時執行
2)併發 concurrency 則指的一個時間段內去一塊兒執行app
就緒態:運行的條件都已經慢去,正在等在cpu執行
執行態:cpu正在執行其功能
等待態:等待某些條件知足,例如一個程序sleep了,此時就處於等待態dom
multiprocessing模塊就是跨平臺版本的多進程模塊,提供了一個Process類來表明一個進程對象,這個對象能夠理解爲是一個獨立的進程,能夠執行另外的事情異步
from multiprocessing import Process import time def run_process(): while True: print("子進程----2----") time.sleep(1) if __name__=='__main__': p = Process(target=run_process) # target指定目標函數 p.start() while True: print("主進程----1----") time.sleep(1)
Process語法:
Process([group [, target [, name [, args [, kwargs]]]]])async
參數--------------------------
target:若是傳遞了函數的引用,能夠任務這個子進程就執行這裏的代碼
args:給target指定的函數傳遞的參數,以元組的方式傳遞
kwargs:給target指定的函數傳遞命名參數
name:給進程設定一個名字,能夠不設定
group:指定進程組,大多數狀況下用不到
Process建立的實例對象的經常使用方法:
方法--------------------------
start():啓動子進程實例(建立子進程)
is_alive():判斷進程子進程是否還在活着
join([timeout]):是否等待子進程執行結束,或等待多少秒
terminate():無論任務是否完成,當即終止子進程
Process建立的實例對象的經常使用屬性:
屬性-------------------------
name:當前進程的別名,默認爲Process-N,N爲從1開始遞增的整數
pid:當前進程的pid(進程號)
from multiprocessing import Process import time import os def run_process(): while True: print("子進程----pid:{}----".format(os.getpid())) print() time.sleep(1) if __name__=='__main__': p = Process(target=run_process) p.start() while True: print("主進程----pid:{}----".format(os.getpid())) time.sleep(1)
from multiprocessing import Process import time import os def run_process(course, teacher, *args, **kwargs): while True: print("子進程----pid:{}----{}上{}課".format(os.getpid(), teacher, course)) print() time.sleep(1) if __name__=='__main__': p = Process(target=run_process, args=('語文',), kwargs={'teacher':'張三'}) p.start() while True: print("主進程----pid:{}----{}上{}課".format(os.getpid(),'李四','數學')) time.sleep(1)
from multiprocessing import Process import time import os num_list = [0 , 1, 3, 4, 5, 6, 7, 8, 9, 10] i = 3 def run_process1(): global i while i: print("子進程----pid:{}----".format(os.getpid())) num_list.pop() print(num_list) i = i - 1 time.sleep(1) def run_process2(): global i while i: print("子進程----pid:{}----".format(os.getpid())) num_list.append(i+1) print(num_list) i = i - 1 time.sleep(1) if __name__=='__main__': p = Process(target=run_process1) p.start() p = Process(target=run_process2) p.start()
輸出
子進程----pid:10187---- [0, 1, 3, 4, 5, 6, 7, 8, 9] 子進程----pid:10188---- [0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4] 子進程----pid:10187---- [0, 1, 3, 4, 5, 6, 7, 8] 子進程----pid:10188---- [0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3] 子進程----pid:10187---- [0, 1, 3, 4, 5, 6, 7] 子進程----pid:10188---- [0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3, 2]
可使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue自己是一個消息列隊程序。
示例
from multiprocessing import Process, Queue import time, random def worker(q): """完成文件""" for i in range(10): file_num = random.randint(1, 100) print('已完成工做{}...'.format(file_num)) q.put(file_num) time.sleep(1) def boss(q): """查看文件""" while True: if not q.empty(): file_num = q.get(True) print('已查看工做{}...'.format(file_num)) time.sleep(1) else: break if __name__=='__main__': # 建立Queue,並傳給各個子進程: q = Queue(5) pw = Process(target=worker, args=(q,)) pb = Process(target=boss, args=(q,)) pw.start() # pw.join() pb.start() # pb.join()
當須要建立的子進程數量很少時,能夠直接利用multiprocessing中的Process動態成生多個進程,但若是是上百甚至上千個目標,手動的去建立進程的工做量巨大,此時就能夠用multiprocessing模塊提供的Pool方法。
初始化Pool時,能夠指定一個最大進程數,當有新的請求提交到Pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,纔會用以前的進程來執行新的任務。
示例
from multiprocessing import Pool import os, time, random def worker(msg): t_start = time.time() print("start pro {},pid爲{}".format(msg,os.getpid())) # random.random()隨機生成0~1之間的浮點數 time.sleep(random.random()*2) t_stop = time.time() print(msg,"執行完畢,耗時{:.2f}" .format(t_stop-t_start)) # 定義一個進程池,最大進程數3 po = Pool(5) for i in range(0,10): # Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,)) # 每次循環將會用空閒出來的子進程去調用目標 po.apply_async(worker,(i,)) print("----start----") time.sleep(10) po.close() # 關閉進程池,關閉後po再也不接收新的請求 po.join() # 等待po中全部子進程執行完成,必須放在close語句以後 print("-----end-----")
使用Pool建立進程,就須要使用multiprocessing.Manager()中的Queue()
示例
from multiprocessing import Pool, Manager import time, random def worker(q): """完成文件""" for i in range(10): file_num = random.randint(1, 100) print('已完成工做{}...'.format(file_num)) q.put(file_num) time.sleep(1) def boss(q): """查看文件""" while True: if not q.empty(): file_num = q.get(True) print('已查看工做{}...'.format(file_num)) time.sleep(1) else: break if __name__=='__main__': # 建立Queue,並傳給各個子進程: q = Manager().Queue() po = Pool() po.apply_async(worker, (q,)) time.sleep(1) # 先讓上面的任務向Queue存入數據,而後再讓下面的任務開始從中取數據 po.apply_async(boss, (q,)) po.close() po.join()
如今操做系統提出進程的概念,每個進程都認爲本身獨佔全部的計算機硬件資源。
進程就是獨立的王國,進程間不能夠隨便的共享數據。
線程就是省份,同一個進程內的線程能夠共享進程的資源,每個線程都擁有本身獨立的堆棧。
線程一樣有着相似進程的狀態
1)運行態:該時刻,該線程正在佔用CPU
2)就緒態:可隨時轉換爲運行態,由於其餘線程正在運行而暫停,該進程不佔用CPU
3)阻塞態:除非某些外部事件發生,不然線程不能運行
Python線程的操做可使用threading模塊,threading模塊是對更底層thread作了一些包裝的,能夠更加方便的被使用。
Thread類:
def __init__(self, group=None, target=Nonoe, name=None, args=(), kwargs=None, daemon=None)
target 線程調用的對象,就是目標函數
name 爲線程起個名字(能夠重名,由於線程區分靠ID,不靠名字)
args,爲目標函數傳遞實參,元組
kwargs, 爲目標函數關鍵字傳參,字典
threading的屬性和方法
current_thread() 返回當前線程對象
main_thread() 返回主線程對象
active_count() 當前處於alive狀態的線程個數
enumerate() 返回全部活着的線程的列表,不包括已經終止的和未開始的線程
get_ident() 返回當前線程的ID,非0整數
Thread實例的屬性和方法
name 只是一個名字
ident 線程ID
is_alive() 返回線程是否活着
start() 啓動線程,每個線程必須且只能執行該方法一次
run() 運行線程函數
import threading import time def worker(): for _ in range(10): time.sleep(0.5) print('start') print(threading.get_ident()) # 返回當前線程對象線程id print('Thread over') t = threading.Thread(target=worker) t.start()
import threading import time def finish_working(): for i in range(5): print("線程:{} --完成工做加{}".format(threading.currentThread(), i)) print(threading.current_thread()) time.sleep(1) if __name__ == "__main__": for i in range(5): t = threading.Thread(target=finish_working, name=str(i)) t.start() #啓動線程,即讓線程開始執行
經過使用threading模塊能完成多任務的程序開發,爲了讓每一個線程的封裝性更完美,因此使用threading模塊時,每每會定義一個新的子類class,只要繼承threading.Thread就能夠了,而後重寫run方法。
import threading import time class MyThread(threading.Thread): def run(self): print('run') super().run() def start(self): print('start') super().start() def worker1(): for _ in range(5): time.sleep(0.5) print('線程:{}-woring'.format(threading.currentThread())) print('Thread over') t = MyThread(target=worker1,name='w') t.start()
import time count = 100 def work1(): global count for i in range(3): count += 1 print("----in work1, g_num is {}---".format(count)) def work2(): global count print("----in work2, g_num is {}---".format(count)) print("---線程建立以前g_num is {}---".format(count)) t1 = Thread(target=work1) t1.start() #延時一會,保證t1線程中的事情作完 time.sleep(1) t2 = Thread(target=work2) t2.start()
輸出
---線程建立以前g_num is 100--- ----in work1, g_num is 103--- ----in work2, g_num is 103---
線程同步,線程間協同,經過某種技術,讓一個線程訪問某些數據時,其餘線程不能訪問這些數據,直到該線程完成對數據的操做。
Event事件,是線程間通訊機制中最簡單的實現,使用一個內部的標記flag,經過flag的True或False的變化來進行操做。
名稱 | 含義 |
---|---|
set() | 標記設置爲True |
clear() | 標記設置爲False |
is_set() | 標記是否設置爲True |
wait(timeout=None) | 設置等待標記爲True的時長,None爲無限等待。獲得返回True, 未等到超時了返回False。 |
from threading import Event, Thread import time def boss(event:Event): """ 等待員工全部任務完成,點評 """ print("I'm boss, waiting for u.") event.wait() print('good job') def worker(event:Event, count=10): print("I am working for u") cups = [] while True: print('make 1') time.sleep(0.5) cups.append(1) if len(cups) >= count: event.set() break print('I finished my job. cups={}'.format(cups)) event = Event() w = Thread(target=worker, args=(event, )) b = Thread(target=boss, args=(event, )) w.start() time.sleep(1) b.start(
總結:
使用同一個event對象的標記flag
誰wait就是等到flag變爲True,或者等到超時返回False,不限制等待的個數。
鎖,凡是存在共享資源爭搶的地方均可以使用鎖,從而保證只有一個使用者均可以徹底使用這個資源。
示例 不加鎖:
import threading cups = [] def worker(task=100): while True: count = len(cups) print(count) if count >= task: break cups.append(1) print('{}'.format(threading.current_thread())) print('I finished {} cups'.format(count)) for x in range(10): threading.Thread(target=worker, args=(100, )).start()
以上任務完成的數量會大於100,使用鎖能夠解決
示例
import logging import threading logging.basicConfig(level=logging.INFO) cups = [] # 實例一把鎖 lock = threading.Lock() def worker(lock:threading.Lock,task=100): while True: lock.acquire() # 加鎖 count = len(cups) logging.info(count) if count >= task: lock.release() #記得退出循環時釋放鎖 break cups.append(1) lock.release() # 釋放鎖 logging.info('{}'.format(threading.current_thread())) logging.info('I finished {} cups'.format(count)) for x in range(10): threading.Thread(target=worker, args=(lock, 100)).start()
通常來講加鎖後還有一些代碼實現,在釋放鎖以前還有可能拋異常,一旦出現異常,鎖是沒法釋放,可是當前線程可能由於這個異常被終止了,這就產生了死鎖。
加鎖、解鎖經常使用語句:
1)使用try...finally語句保證鎖的釋放
2)with上下文管理,鎖對象支持上下文管理
示例:
from threading import Thread, Lock import time, logging logging.basicConfig(level=logging.INFO) class Counter: def __init__(self): self.c = 0 self.lock = Lock() def inc(self): try: self.lock.acquire() self.c += 1 logging.info('add {}'.format(self.c)) finally: self.lock.release() def dec(self): try: self.lock.acquire() self.c -= 1 logging.info('sub {}'.format(self.c)) finally: self.lock.release() @property def value(self): with self.lock: return self.c def do(c:Counter, count=100): for _ in range(count): for i in range(-50, 50): if i < 0: c.dec() else: c.inc() c = Counter() c1 = 10 c2 = 10 for i in range(c1): Thread(target=do, args=(c,c2)).start() time.sleep(5) logging.info(c.value)
Condition 用於生產者、消費模型,爲了解決生產者消費速度匹配問題。
構造方法Condition(lock=None), 能夠傳入一個lock或者RLock對象,默認是RLock。
名稱 | 含義 |
---|---|
acquire(*args) | 獲取鎖 |
wait(self, timoout=None) | 等待或超時間 |
notify(n=1) | 喚醒至多指定個數的等待的線程,沒有等待的線程沒有操做 |
notiy_all() | 喚醒全部等待的線程 |
示例1 不使用Condition
import threading import logging import random logging.basicConfig(level=logging.INFO) class Dispatcher: def __init__(self, data=0): self.data = data self.event = threading.Event() def produce(self): for i in range(100): data = random.randint(1,100) self.data = data logging.info("produce--{}".format(self.data)) self.event.wait(1) def custom(self): while True: logging.info("curstom---{}".format(self.data)) self.event.wait(1) d = Dispatcher() p = threading.Thread(target=d.produce) c = threading.Thread(target=d.custom) c.start() p.start()
示例2 使用Condition
import threading import logging import random logging.basicConfig(level=logging.INFO) class Dispatcher: def __init__(self, data=0): self.data = data self.event = threading.Event() self.cond = threading.Condition() def produce(self): for i in range(100): data = random.randint(1,100) with self.cond: self.data = data self.cond.notify_all() logging.info('produce {}'.format(self.data)) self.event.wait(1) def custom(self): while True: with self.cond: self.cond.wait() logging.info('custom {}'.format(self.data)) self.event.wait(0.5) d = Dispatcher() p = threading.Thread(target=d.produce) c = threading.Thread(target=d.custom) c.start() p.start()
示例3 多個消費者
import threading import logging import random logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s') class Dispatcher: def __init__(self, data=0): self.data = data self.event = threading.Event() self.cond = threading.Condition() def produce(self): for i in range(100): data = random.randint(1,100) with self.cond: self.data = data self.cond.notify(1) logging.info('pru {}'.format(self.data)) self.event.wait(1) def custom(self): while True: with self.cond: self.cond.wait() logging.info("線程{}--消費{}".format(threading.get_ident(), self.data)) self.event.wait(0.5) d = Dispatcher() p = threading.Thread(target=d.produce) c = threading.Thread(target=d.custom) c1 = threading.Thread(target=d.custom) c.start() c1.start() p.start()
總結:
Condition是用於生產者消費者模型中,解決生產者消費者速度匹配的問題。
採用通知機制,很是有效率。
使用方式:
使用Condition必須先acquire,用玩release,由於內部使用鎖,默認使用RLock,最好的方式是使用with上下文。
消費者wait,等待通知。
生產者生產好消息,對消費者發通知,可使用notify或者notify_all方法。
名稱 | 含義 |
---|---|
Barrier(parties, action=None, timeout=None) | 構建Barrier對象,指定參與方數目。timeout是wait方法未指定超時的默認值。 |
n_waiting | 當前在屏障中等待的線程數 |
parties | 各方數,就是須要多少個等待 |
wait(timeout=None) | 等待經過屏障,返回0到[線程數-1]的整數,每一個線程返回不一樣。若是wait方法設置了超時,並超時發送,屏障將處於broken狀態。 |
方法:
名稱 | 含義 |
---|---|
broken | 若是屏障處於打破狀態,返回True |
abort() | 將屏障至於broken狀態,等待中的線程或者調用等待方法的線程都會拋出BrokenBarrierError異常, 直到reset方法來恢復屏障 |
reset() | 恢復屏障,從新開始攔截 |
示例
import threading import logging logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s') def worker(barrier:threading.Barrier): logging.info('n_waiting={}'.format(barrier.n_waiting)) try: bid = barrier.wait() logging.info("after barrier {}".format(bid)) except threading.BrokenBarrierError: logging.info('Broken Barrier in {}'.format(threading.current_thread().name)) barrier = threading.Barrier(3) for i in range(5): #調整數字看結果 threading.Thread(target=worker, args=(barrier, )).start()
全部線程衝到了Barrier前等待,直到到達parties的數目,屏障打開,全部線程中止等待,繼續執行。
再有線程wait,屏障就緒等到到達參數方數目。
Barrier應用場景:併發初始化全部線程都必須初始化完成後,才能繼續工做,例如運行前加載數據、檢查,若是這些工做沒完成,就開始運行,將不能正常工做。10個線程作10種準備工做,每一個線程負責一種工做,只有這10個線程都完成後,才能繼續工做,先完成的要等待後完成的線程。例如,啓動一個程序,須要先加載磁盤文件、緩存預熱、初始化鏈接池等工做。這些工做能夠齊頭並進,不過只有都知足了,程序才能繼續向後執行。假設數據庫鏈接失敗,則初始化工做失敗,就要about,屏障broken,全部線程收到異常退出。