進程是cpu資源分配的最小單位,進程是正在進行的一個過程或者說一個任務。
線程是cpu調度的最小單位。
協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。(單線程下的併發)
進程線程區別:python
子進程結束後,子進程的狀態信息仍然保存在系統中,直到父進程結束。(全部進程都會經歷殭屍進程)mysql
父進程已經退出,但它的一個或多個子進程還在運行, 這些進程就會成爲孤兒進程,由init進程收養。sql
守護進程會在主進程代碼執行結束後就終止。安全
方式一:多線程
import time from multiprocessing import Process def task(name): # 子進程執行的任務 print('%s is running' % name) time.sleep(3) print('%s is done' % name) if __name__ == '__main__': # 實例化獲得對象 p1 = Process(target=task, args=('子進程1',)) # 必須加逗號 p2 = Process(target=task, kwargs={'name': '子進程2'}) # 調用對象下的方法,開啓進程 p1.start() # 僅是給操做系統發送信號, 由操做系統開啓子進程 p2.start() print('主')
方式二:併發
import time from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): # 其實是start,可是必須叫run print('%s is running' % self.name) time.sleep(3) print('%s is done' % self.name) if __name__ == '__main__': p1 = MyProcess('子進程1') p2 = MyProcess('子進程2') p1.start() # 啓動進程,並調用該子進程中的p.run() p2.start() print('主')
進程名:print(p.name)
進程PID: print(os.getpid)
print(p.pid)
父進程PID:print(os.getppid)
app
等待進程運行結束:p.join()
查看進程是否在運行:print(p.is_alive())
終止進程:p.terminate()
進程結束時間由操做系統決定。
終止進程但不進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程;若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖。
開啓守護進程:p.daemon = True
必須在p.start(
)以前設置,守護進程將被禁止建立子進程。dom
例子:異步
from multiprocessing import Process, Lock import time def task(name, lock): lock.acquire() print('%s 1' % name) time.sleep(1) print('%s 2' % name) time.sleep(1) print('%s 3' % name) lock.release() if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=task, args=('進程%s' % i, lock)) p.start()
例子:socket
from multiprocessing import Queue que = Queue(3) que.put(1) que.put({'a': 'hello'}) que.put([1, 2, 3]) print(que.full()) # 判斷隊列是否已滿 # que.put(4) #再放就阻塞住了 print(que.get()) print(que.get()) print(que.get()) print(que.empty()) # 隊列是否空了 # print(que.get()) # 再取就阻塞住了
例子:
from multiprocessing import Process, JoinableQueue import time import random def producer(q, name, food): for i in range(2): res = '%s%s' % (food, i) time.sleep(random.randint(1, 3)) print('\033[34m%s 生產了 %s\033[0m' % (name, res)) q.put(res) # 入隊 q.join() # 等到消費者把隊列中的全部的數據都取走以後,生產者才結束 def consumer(q, name): while True: res = q.get() # 出隊 time.sleep(random.randint(1, 3)) print('\033[32m%s 吃 %s\033[0m' % (name, res)) q.task_done() # 發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢了 if __name__ == '__main__': que = JoinableQueue() # 3個生產者 p1 = Process(target=producer, args=(que, '生產者1', '包子')) p2 = Process(target=producer, args=(que, '生產者2', '餃子')) p3 = Process(target=producer, args=(que, '生產者3', '月餅')) # 2個消費者 c1 = Process(target=consumer, args=(que, '消費者1')) c2 = Process(target=consumer, args=(que, '消費者2')) c1.daemon = True # 將消費者設爲守護進程 c2.daemon = True # 消費者在生產者結束後, 隨主進程一塊兒結束 l1 = [p1, p2, p3] l2 = [c1, c2] # 開始生產 for p in l1: p.start() # 開始消費 for c in l2: c.start() # 主進程等生產者p一、p二、p3結束 # 而p一、p二、p3在消費者把全部數據都取乾淨以後結束 for p in l1: p.join() print('主')
使用greenlet
庫:
from greenlet import greenlet def eat(name): print('%s eat 1' % name) g2.switch('egon') # 傳入第一次參數, 以後不用再傳 print('%s eat 2' % name) g2.switch() def play(name): print('%s play 1' % name) g1.switch() print('%s play 2' % name) g1 = greenlet(eat) g2 = greenlet(play) g1.switch('egon') # 在第一次switch時傳入參數,之後都不須要
使用gevent
庫 (沒法識別socket
time
等模塊的阻塞,須要使用gevent自帶的阻塞):
import gevent def eat(name): print('%s eat 1' % name) gevent.sleep(2) # gevent識別到阻塞,進行切換 print('%s eat 2' % name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1 = gevent.spawn(eat, 'egon') g2 = gevent.spawn(play, name='egon') g1.join() g2.join() # gevent.joinall([g1,g2]) # 等待列表中全部協程對象結束 print('主')
導入monkey
可識別socket
time
等阻塞:
from gevent import monkey; monkey.patch_all() # patch_all()必須放在導入socket、time等模塊前,不然gevent沒法識別socket、time的阻塞 import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1 = gevent.spawn(eat) g2 = gevent.spawn(play) gevent.joinall([g1, g2]) print('主')
導入模塊:
from concurrent.futures import ProcessPoolExecutor # 進程池 from concurrent.futures import ThreadPoolExecutor # 線程池
建立進程/線程池:
executor = ProcessPoolExecutor(max_worker=3) # 進程池 executor = ThreadPoolExecutor(max_workers=3) #線程池
將進程/線程放入池內:
future = exector.submit(task, parm) """ executor.map(task, range(1,12)) 至關於: for i in range(11): exector.submit(task, i) """
關閉進程/線程池:
exector.shutdown() # 默認wait參數爲True
wait=True
等待池內全部任務執行完畢回收完資源後再執行後續代碼。
wait=False
不join,直接執行後續代碼。
關閉進程/線程池後,不容許再向已關閉的進程/線程池內加入進程/線程。
拿到進程/線程運行結果:
print(future.result())
回調函數:
future.add_done_callback(func)
: future
的task
結束後,會自動把future對象當作參數傳給回調函數func。
例子:
from concurrent.futures import ProcessPoolExecutor import time import random def task(name): print("%s is running" % name) time.sleep(random.randint(3, 5)) return name def func(future): name = future.result() print("%s's callback function" % name) if __name__ == '__main__': executor = ProcessPoolExecutor(5) futures = [] for i in range(3): future = executor.submit(task, "task"+str(i)) future.add_done_callback(func) futures.append(future) executor.shutdown(True) print("主")
阻塞程度:阻塞IO>非阻塞IO>多路轉接IO>信號驅動IO>異步IO,效率是由低到高。
信號量也是一把鎖,但同一時間能夠被指定大小的任務獲取。
模塊導入:
from threading import Semaphore
設置計數器大小:
sm = Semaphore(value=1) # 計數器大小默認爲1
兩個主要的方法:
acquire() # 內置計數器-1, 當計數器爲0時阻塞,等待其餘線程調用release() release() # 內置計數器+1
例子:
from threading import Thread, Semaphore import threading import time def func(): sm.acquire() print('%s get sm' % threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm = Semaphore(5) for _ in range(23): t = Thread(target=func) t.start()
Event對象:
用於線程間通訊,即程序中的其一個線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,就用到了Event對象。
模塊導入:
from threading import Event
建立Event對象:
event = Event()
相關方法:
event.isSet() # 返回event的狀態值 event.wait([maxtime]) # 若是 event.isSet() == False將阻塞線程, [maxtime]-超時時間 event.set() # 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度 event.clear() # 恢復event的狀態值爲False
例子:
from threading import Thread, Event import threading import time import random def conn_mysql(): count = 1 while not event.is_set(): if count > 3: print('\033[31m[%s]鏈接失敗\033[0m' % threading.current_thread().getName()) exit(0) print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count)) event.wait(1) count += 1 print('<%s>鏈接成功' % threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(1, 4)) event.set() if __name__ == '__main__': event = Event() conn1 = Thread(target=conn_mysql) conn2 = Thread(target=conn_mysql) check = Thread(target=check_mysql) conn1.start() conn2.start() check.start()
定時器Timer:指定n秒後執行某項操做(不會像sleep同樣阻塞)
例子:
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) // 一秒後執行hello函數 t.start()
死鎖現象: 指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,這種永遠在互相等待的進程稱爲死鎖進程。
遞歸鎖: RLock
能夠連續acquire屢次。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
例子:
from threading import Thread, RLock import time mutexA = mutexB = RLock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' % self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' % self.name) mutexB.release() print('\033[41m%s 釋放A鎖\033[0m' % self.name) mutexA.release() print('\033[42m%s 釋放B鎖\033[0m' % self.name) def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' % self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' % self.name) mutexA.release() print('\033[43m%s 釋放B鎖\033[0m' % self.name) mutexB.release() print('\033[44m%s 釋放A鎖\033[0m' % self.name) if __name__ == '__main__': for _ in range(4): t = MyThread() t.start()
cpython中引進GIL,保證同一時刻同一進程中只有一個線程被執行,獲取鎖並獲取資源,避免了多線程併發執行,保證了線程的安全,但沒法使用多核優點。
結論:
多線程用於IO密集型
多進程用於計算密集型