進程本質上就是一段程序的運行過程,由程序、數據集、進程控制塊組成。每一個進程都有本身的地址空間、數據棧以及其餘用於跟蹤進程執行的輔助數據。操做系統管理全部的進程,併爲他們合理分配資源。
線程是進程中的執行單元,能夠共享進程中的資源。
進程之間是相互獨立的,因此進程是最小的資源單位。html
關於並行和併發python
並行:系統能同時處理多個任務
併發:系統能夠處理多個任務git
import threading
import os
def work(num1, num2, name, **kwargs):
print(num1, num2) # 12 123
print(name) # lczmx
print(kwargs) # {'age': 20}
print("pid:", os.getpid()) # pid: 12932
if __name__ == "__main__":
t1 = threading.Thread(target=work, args=(12, 123),
kwargs={"name": "lczmx", "age": 20})
t2 = threading.Thread(target=work, args=(1, 3),
kwargs={"name": "xxx", "age": 20})
t1.start() # 開始線程活動
t1.join() # 等待,直到線程終結
t2.start()
t2.join()
print("pid:", os.getpid()) # pid: 12932
定義一個類,繼承threading.Thread、重寫run方法也能夠
重寫__init__
方法的話要super.__init__()
github
import threading
class MyThread(threading.Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
pass
def func2(self):
pass
if __name__ == '__main__':
t = MyThread()
t.start()
t.join()
Threading對象的方法web
is_alive(): 返回線程是否活動的。
getName(): 返回線程名,也能夠在建立時經過name參數指定。
setName(): 設置線程名。算法
默認狀況下,主線程會等到全部子線程執行完以後纔會退出,但守護線程並不會。
守護線程就是跟隨主線程一塊兒結束的線程,守護線程經過setDaemon方法實現,其內部時設置daemon屬性,能夠被繼承,因此daemon默認爲False。編程
import threading
import time
def work(sleep_time=0.5):
time.sleep(sleep_time)
print("sleep time: ", sleep_time)
if __name__ == "__main__":
t1 = threading.Thread(target=work, args=(1,))
t1.setDaemon(True) # setDaemon要在start以前
t1.start()
print("exit")
注意如下例子segmentfault
import threading
import time
def work(sleep_time=0.5):
time.sleep(sleep_time)
print("sleep time: ", sleep_time)
if __name__ == "__main__":
t1 = threading.Thread(target=work, args=(1,)) # 1秒
t2 = threading.Thread(target=work, args=(3,)) # 3秒
t1.setDaemon(True)
t2.setDaemon(True)
t1.start()
t2.start()
time.sleep(2) # 2秒
print(t1.is_alive()) # False
print(t2.is_alive()) # True
鎖主要時用來解決在cpu切換時造程序取得的數據不一樣步的問題。
好比這個例子:windows
from threading import Thread
import os
import time
def work():
global n
temp = n
time.sleep(0.01)
n = temp - 1
if __name__ == '__main__':
n = 100
l = []
for i in range(100):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) # 結果可能爲99或98,但幾乎不爲零
爲了解決這個問題,咱們可使用串行的方式讓全部的代碼按照順序執行,可是這就失去了多線程的意義。那麼只要串行部分代碼就既能享受多線程的優點,又能夠保證數據的安全了。也就是說,鎖作的工做就是使操做數據的那部分代碼串行。安全
使用threading.Lock
獲取一把鎖,它由一個acquire()
和release()
方法控制鎖定和釋放。
from threading import Thread, Lock
import os
import time
lock = Lock()
def work():
global n
lock.acquire()
temp = n
time.sleep(0.01)
n = temp - 1
lock.release()
if __name__ == '__main__':
n = 100
l = []
for i in range(100):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) # 如今結果爲0
上面說過,鎖就是把部分代碼變爲串行,只有當鎖被釋放後才能執行後面的代碼。死鎖的一個緣由是互斥,還有多是粗枝大葉,忘記release()了。
from threading import Thread, Lock
import time
lockA = Lock()
lockB = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
lockA.acquire()
print("%s得到鎖A" % self.name)
lockB.acquire()
print("%s得到鎖B" % self.name)
lockB.release()
print("%s釋放鎖B" % self.name)
lockA.release()
print("%s釋放鎖A" % self.name)
def func2(self):
lockB.acquire()
print("%s得到鎖B" % self.name)
time.sleep(2)
lockA.acquire()
print("%s得到鎖A" % self.name)
lockA.release()
print("%s釋放鎖A" % self.name)
lockB.release()
print("%s釋放鎖B" % self.name)
if __name__ == '__main__':
for i in range(10):
t = MyThread(name="線程%d" % i)
t.start()
""" 線程0得到鎖A 線程0得到鎖B 線程0釋放鎖B 線程0釋放鎖A 線程0得到鎖B 線程1得到鎖A 卡死了 """
解決死鎖的好方式就是用遞歸鎖,而使用通常的鎖的話能夠用with
關鍵詞,以防忘記釋放鎖了。
遞歸鎖也是鎖,其內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
from threading import Thread, RLock
import time
lockA = lockB = RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
lockA.acquire()
print("%s得到鎖A" % self.name)
lockB.acquire()
print("%s得到鎖B" % self.name)
lockB.release()
print("%s釋放鎖B" % self.name)
lockA.release()
print("%s釋放鎖A" % self.name)
def func2(self):
lockB.acquire()
print("%s得到鎖B" % self.name)
time.sleep(2)
lockA.acquire()
print("%s得到鎖A" % self.name)
lockA.release()
print("%s釋放鎖A" % self.name)
lockB.release()
print("%s釋放鎖B" % self.name)
if __name__ == '__main__':
for i in range(10):
t = MyThread(name="線程%d" % i)
t.start()
信號量就是一把鎖,以前咱們說的threading.Lock
是互斥鎖(Mutual exclusion,縮寫 Mutex)實質上就是信號量爲一的情景。信號量能夠用來限定某些資源能夠同時由幾個線程訪問,訪問時一樣要acquire,出來時一樣也release。
import threading
import time
sm = threading.Semaphore(5)
def foo():
sm.acquire()
# 打印當前線程的名字
print("%s ..." % threading.current_thread().getName())
time.sleep(1)
sm.release()
if __name__ == "__main__":
for i in range(9):
t = threading.Thread(target=foo)
t.start()
關於GIL(global interpreter lock),點擊這裏。
event 同步條件
因爲線程之間是相互獨立的,彼此不能直接確認狀態,爲此python提供了threading.Event
對象,能夠在不一樣線程間傳遞狀態,其由一下方法:
.wait(timeout=None)
,event變爲True,timeout爲None時,爲阻塞;反之則爲等待秒數(非阻塞)
.set()
,設置event的值爲True
.clear()
,恢復event的狀態值爲False。
.is_set
,返回event狀態值
根據上述,能夠把Event的狀況分爲如下幾種:
import threading
import time
e = threading.Event()
def foo():
print("event狀態:", e.is_set())
print("等待。。。。")
if e.wait(): # 默認阻塞
print("event狀態:", e.is_set())
print("收到同步條件,ok")
def bar():
time.sleep(2)
e.set()
if __name__ == '__main__':
f = threading.Thread(target=foo)
b = threading.Thread(target=bar)
f.start()
b.start()
queue 線程隊列
線程隊列特別適用於消息必須安全地在多線程間交換的線程編程,線程隊列有三種類型,在實例化的時候根據需求指定:
queue.Queue(maxsize=0)
queue.LifoQueue(maxsize=0)
queue.PriorityQueue(maxsize=0)
注:maxsize
參數指定隊列的大小,當maxsize
<= 0 時,隊列的元素個數沒有限制。
這三者都返回queue.Queue
對象的方法,由於LifoQueue
和PriorityQueue
都繼承queue.Queue
,Queue對象擁有如下方法:
方法 | 說明 |
---|---|
.put(item, block=True, timeout=None) |
將 item 放入隊列,block默認爲True,表示阻塞。優先級隊列的item要包含優先級如:q.put([2, "abc"]) |
.get(block=True, timeout=None) |
從隊列中移除並返回一個項目。block默認爲True,表示阻塞 |
.qsize() |
返回隊列的大體大小 |
.empty() |
若是隊列爲空,返回 True ,不然返回 False |
.full() |
若是隊列是滿的返回 True ,不然返回 False 。 |
.task_done() |
完成一個任務後,向隊列發信號(join() 用到)。 |
.join() |
阻塞到 隊列中全部的元素 都 被 接 收 和 處 理 完畢(根據收到的task_done信號肯定)。 |
關於task_done與join:
Queue內部有一個unfinished_tasks屬性(默認爲0),put時自增1,task_done調用時自減1
join的邏輯是while self.unfinished_tasks: self.all_tasks_done.wait()
,當unfinished_tasks爲0的時候就跳出循環,中止阻塞狀態。
import threading
import queue
import time
q = queue.Queue(5) # 只存5個元素
def worker():
while True:
print("qsize: ", q.qsize())
item = q.get() # 隊列爲空時會阻塞
print(f'Working on {item}')
time.sleep(0.5) # 模擬處理數據的時間
print(f'Finished {item}')
q.task_done() # 已經執行
# 開啓爲worker線程,處理隊列,設置爲守護線程
threading.Thread(target=worker, daemon=True).start()
print("隊列是否爲空:", q.empty())
# 往隊列中添加元素
for item in range(10):
if q.full():
print("已經滿了,阻塞。。。。")
q.put(item) # 隊列滿的時候會阻塞
print('所有元素已經放入隊列中')
# 會一直阻塞,知道unfinished_tasks爲0
q.join()
print('所有任務已完成')
PriorityQueue有點特殊,單獨舉例:
import queue
q = queue.PriorityQueue()
q.put([3, "c"])
q.put([1, "a"])
q.put([2, "b"])
print(q.get()) # [1, 'a']
print(q.get()) # [2, 'b']
print(q.get()) # [3, 'c']
python中使用multiprocessing
模塊來實現多進程。
import multiprocessing
def work(num, name, age):
print(f"num: {num}, name: {name}, age: {age}")
if __name__ == '__main__': # 不要省略了這個,不然報錯
p = multiprocessing.Process(target=work, args=(
1, ), kwargs={"name": "lczmx", "age": 22})
p.start()
p.join()
方法二
要重寫__init__
方法的話要super.__init__()
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
self.func1()
self.func2()
def func1(self):
print("func1")
def func2(self):
print("func2")
if __name__ == "__main__":
p = MyProcess()
p.start()
一些經常使用方法
multiprocessing.set_start_method('spawn')
設置啓動方法,關於啓動方法類型及介紹見文檔Process對象.terminate()
當即終止進程Process對象.pid
返回進程ID。在生成該進程以前,這將是 None
Process對象.daemon
設置守護進程,和守護線程同樣,能夠在建立進程的時候經過daemon形參來設置。Process對象.name
與threading相似Process對象.is_alive
與threading相似Process對象.join
與threading相似主進程建立守護進程
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
使用多進程時,通常使用消息機制實現進程間通訊,儘量避免使用鎖等同步原語。
task_done
和join
(multiprocessing.JoinableQueue(maxsize=0)
有這兩個方法,但要必需要手動調用task_done,不然用於統計未完成任務的信號量最終會溢出並拋出異常)
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # [42, None, 'hello']
p.join()
conn1, conn2 = multiprocessing.Pipe([duplex])
conn1和conn2是一對 Connection 對象, 分別表示管道的兩端。from multiprocessing import Process, Pipe
def f(conn):
print(conn.recv()) # [1, '12', True]
if __name__ == '__main__':
conn1, conn2 = Pipe() # 默認爲雙向
p = Process(target=f, args=(conn2,))
p.start()
conn1.send([1, "12", True])
p.join()
線程和進程的建立、切換、關閉都須要必定的成本,對於某些重複次數多且聲明週期短的任務可使用線/進程池,線/進程池的數量並非越多越好,太多可能得不償失,甚至致使python解釋器崩潰。
使用線程池要用到concurrent.futures.ThreadPoolExecutor
使用進程池要用到concurrent.futures.ProcessPoolExecutor
線程池和進程池都提供瞭如下經常使用方法:
submit(fn, *args, **kwargs)
:將 fn 函數提交給線/進程池。
*args 表明傳給 fn 函數的參數,*kwargs 表明以關鍵字參數的形式爲 fn 函數傳入參數。
map(func, *iterables, timeout=None, chunksize=1)
:該函數相似於全局函數 map(func, *iterables)
該函數將會啓動多個線程,以異步方式當即對 iterables 執行 map 處理。
shutdown(wait=True, *, cancel_futures=False)
當待執行的 future 對象完成執行後向執行者發送信號,它就會釋放正在使用的任何資源。
python3.9纔開始增長cancel_futures參數
以線程池爲例(與with搭配使用更好)
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(8)
def work(num, name="unknown"):
print(name, num)
for i in range(10):
pool.submit(work, i, name="work-%s" % i)
pool.map(work, [1, 2, 3, 45])
# pool.shutdown(wait=True)
Future對象是submit方法的返回值,其自己也有一些實用的方法:
result(timeout=None)
返回 函數的返回值。若是調用還沒完成那麼這個方法將等待 timeout 秒。超時則觸發`concurrent.futures.TimeoutError
exception(timeout=None)
返回由調用函數引起的異常。若是調用還沒完成那麼這個方法將等待 timeout 秒。超時則觸發concurrent.futures.TimeoutError
add_done_callback(fn)
回調函數,將 fn 附加到future對象。當 future 對象被取消或完成運行時,將會調用 fn,而這個future 對象將做爲它惟一的參數。
import time
from concurrent.futures import ThreadPoolExecutor
def callback(future):
""" 回調函數,future是concurrent.futures._base.Future對象 """
print("result", future.result()) # result 123
print("exception", future.exception()) # exception None
def work(num):
if not isinstance(num, int):
raise TypeError
return num
with ThreadPoolExecutor(8) as executor:
res = executor.submit(work, 123)
res.add_done_callback(callback)
進程池
# ...略
def main():
with ProcessPoolExecutor() as executor:
res = executor.submit(work, 123)
res.add_done_callback(callback)
if __name__ == '__main__': # 不要省略了這個,不然報錯
main()
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題,經過一個容器來解決生產者和消費者的強耦合問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。在通常的高併發程序中一般就會有這樣的場景出現:生產多快,處理不過來;生產太慢,等半天沒得處理。因此要引入生產者消費者模型:
import threading
import queue
import time
import random
class Consumer(threading.Thread):
""" 消費者 """
def __init__(self, q, lock, name):
super().__init__()
self.q = q # 阻塞消息隊列
self.lock = lock # 互斥鎖
self.name = "消費者-" + str(name)
self.daemon = True # 設置爲守護線程
def run(self):
while True:
item = self.q.get() # 有則取,無則阻塞
with self.lock: # 使用上下文管理協議使用鎖
print(f"{self.name}: 處理{item}....")
time.sleep(random.uniform(1, 2)) # 模擬處理時間1~2的浮點數
self.q.task_done() # 調用task_done()
class Producer(threading.Thread):
""" 生產者 """
def __init__(self, q, count, name):
super().__init__()
self.q = q # 阻塞隊列
self.count = count # 生產幾個數據
self.name = "生產者-" + str(name)
def run(self):
for num in range(self.count):
data = "data-%d" % num
print(f"{self.name}: 生成數據 {data}")
time.sleep(random.random()) # 模擬處理時間0~1的浮點數
self.q.put(data) # 添加數據,滿則阻塞
# 由於消費者是守護線程,其是否能夠退出要看生產者
self.q.join() # 等待全部的數據都處理完了,才退出
if __name__ == '__main__':
q = queue.Queue()
lock = threading.Lock()
# 使用map生成,並啓動消費者
list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙", "丙"]))
# 生成者列表
producer_list = map(lambda name: Producer(q, 20, name), ["大廚", "小廚"])
for p in producer_list:
p.start()
for p in producer_list:
p.join()
# 等結束
多進程實現:
import multiprocessing
import time
import random
class Consumer(multiprocessing.Process):
""" 消費者 """
def __init__(self, q, lock, name):
super().__init__()
self.q = q # 阻塞消息隊列
self.lock = lock # 互斥鎖
self.name = "消費者-" + str(name)
self.daemon = True # 設置爲守護進程
def run(self):
while True:
item = self.q.get() # 有則取,無則阻塞
with self.lock: # 使用上下文管理協議使用鎖
print(f"{self.name}: 處理{item}....")
time.sleep(random.uniform(1, 2)) # 模擬處理時間1~2的浮點數
self.q.task_done() # 調用task_done()
class Producer(multiprocessing.Process):
""" 生產者 """
def __init__(self, q, count, name):
super().__init__()
self.q = q # 阻塞隊列
self.count = count # 生產幾個數據
self.name = "生產者-" + str(name)
def run(self):
for num in range(self.count):
data = "data-%d" % num
print(f"{self.name}: 生成數據 {data}")
time.sleep(random.random()) # 模擬處理時間0~1的浮點數
self.q.put(data) # 添加數據,滿則阻塞
# 由於消費者是守護進程,其是否能夠退出要看生產者
self.q.join() # 等待全部的數據都處理完了,才退出
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
lock = multiprocessing.Lock()
# 啓動兩個消費者進程
list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙"]))
# 生成者只開一個進程
p = Producer(q, 20, "大廚")
p.start()
p.join()
# 等結束
aiohttp是一個基於asyncio實現對http協議支持的第三方庫,點擊查看如何使用aiohttp。
進程的執行是要靠操做系統調度的,爲了保證不影響後面程序的運行,因此在執行過程當中遇到阻塞或超過期間輪詢時cpu會切換不一樣的進程執行
當咱們寫的程序須要數據即有IO的時候可使用如下的四種模式來解決問題。
不一樣平臺有不一樣的實現IO多路複用的模塊,windows下支持select
且僅適用於套接字;Linux下至此select
、poll
、epoll
函數的訪問,這些函數在大多數操做系統中是可用的;在 Solaris下爲devpoll
; BSD 上可用kqueue
;在這些操做系統上,適用於套接字和其餘文件類型。
水平觸發
對於讀:只要緩衝內容不爲空返回讀就緒
對於寫:只要緩衝區還不滿返回寫就緒
邊緣觸發
對於讀:緩衝區由空變爲不空 或 數據變多 等時候返回讀就緒
對於寫:緩衝區由滿變爲空 或 數據變少 等時候返回寫就緒
select和poll都是使用的水平觸發方式。epoll既支持水平觸發也支持邊緣觸發,默認是水平觸發。
在python中要實現IO多路複用,可使用select
或selectors
,selectors
是對select
的進一步封裝,使用selectors.DefaultSelector()
能夠自動選擇當前平臺最高效的接口。因此推薦使用selectors
模塊。
使用selectors
模塊主要要用到如下幾個方法:
selectors.DefaultSelector()
以kqueue > epoll > devpoll > poll > select等優先級返回選擇器類選擇器類.register(fileobj, event, data=None)
註冊一個用於選擇的文件對象,在其上監視 I/O 事件。
selectors.EVENT_READ
:可讀, selectors.EVENT_WRITE
:可寫)選擇器類.unregister(fileobj)
選擇器類.select(timeout=None)
namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
,fd是文件描述符,其餘的都是register
函數的參數選擇器類.close()
來自python官方文檔的例子:
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # 等鏈接是讀
print('accepted', conn, 'from', addr)
conn.setblocking(False) # 設置非阻塞
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # 接收消息是讀
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data)
else: # 斷開鏈接
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False) # 設置非阻塞
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events: # 一直阻塞,直到有數據來
print("event 循環")
# mask是位掩碼EVENT_READ或EVENT_WRITE
# 獲得回調函數,這裏是read或accept
callback = key.data
callback(key.fileobj, mask)
運行這段代碼,並用其它終端鏈接:
>>> import socket >>> sock = socket.socket() >>> sock.connect(("localhost", 1234)) >>> sock.send(b"hello world") 11