爲保證併發能力,將一個單獨的CPU變成多個虛擬的CPU(多道技術:時間多路複用和空間多路複用+硬件上支持隔離)。【學習轉載至博客園老哥的文章】html
多道技術: (1)空間的複用:如內存中同時有多道程序 (2)時間上的複用:複用一個CPU的時間片,強調:遇到io切,佔用CPU時間過長也切,核心在於切以前將進程的狀態保存下來,這樣才能保證下次切換回來時,能基於上次切走的位置繼續運行python
進程:正在進行的一個過程或一個任務,負責任務的是CPU;同一程序執行兩次,那也就是兩個進程,好比打開兩個chrome瀏覽器,一個看學習視頻,一個寫學習筆記。mysql
併發:是僞並行,看起來是同時運行,單個CPU+多道技術就能夠實現併發。git
並行:同時運行,具備多個CPU才能實現並行。github
1.multiprocessing模塊
multiprocess模塊用來開啓子進程,並在子進程中執行咱們定製的任務(如函數),支持子進程、通訊和共享數據,執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock 等組件;與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於改進程內。
2.Process類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號。
參數介紹:
group參數未使用,值始終爲None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=(1,2,'egon',)
kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
name爲子進程的名稱
方法介紹:
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
屬性介紹:
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
複製代碼
【GitHub示例】算法
from multiprocessing import Process
import time
import random
import os
def task(name):
print("%s is running..." % name)
time.sleep(random.randrange(1, 5))
print("%s is done..." % name)
# 第二種建立子進程的方法
class MyProcess(Process):
def __init__(self, name):
super().__init__() # 將父類的功能進行重用
self.name = name
# start將調用這個run方法
def run(self) -> None:
print("%s %s is running..., parent id is %s" % (self.name, os.getpid(), os.getppid()))
time.sleep(3)
print("%s %s is done-----, parent id is %s" % (self.name, os.getpid(), os.getppid()))
if __name__ == "__main__":
# 第一種方式
p1 = Process(target=task, args=("A", ))
p2 = Process(target=task, args=("B", ))
p3 = Process(target=task, args=("C", ))
p4 = Process(target=task, args=("D", ))
# 只是給操做系統發送了一個信號,由操做系統將父進程地址空間中的數據拷貝給子進程,做爲子進程運行的初始狀態,開啓後再運行task
p1.start()
p2.start()
p3.start()
p4.start()
print('主1......', os.getpid(), os.getppid())
# 執行結果:執行結果由於執行進程會有一段時間,因此先作打印操做
# --------------------
# 主
# 任務名稱 is running...
# 任務名稱 is done...
p = MyProcess('子進程2')
p.start()
# os.getpid() --- 查看子進程的id
# os.getppid() --- 查看父進程的id
print('主2......', os.getpid(), os.getppid())
複製代碼
在主進程運行過程當中若是想要併發的執行其餘的任務,咱們能夠開啓一個子進程。sql
主進程和子進程之間的任務執行分兩種狀況:chrome
(1)主進程和子進程之間的執行彼此獨立,主進程的任務執行完畢後須要等待子進程執行完畢,而後統一回收資源;
(2)主進程執行到某一階段時,須要子進程執行完畢後再繼續執行主進程的任務,這時候就須要檢測(**join方法的做用就在這**)子進程何時執行完畢再執行主進程的任務,不然進程任務就會阻塞。
複製代碼
join方法實現子進程的檢測、實現子進程的併發和子進程的串行任務,咱們能夠經過is_alive()來判斷子進程的存活狀態,Process(target=join_task('task進程'), name="自定義的子進程的名稱")定義進程的名稱,默認爲Process-1,使用p_task.name獲取進程的名稱。同時進程之間的空間是相互隔離的,即數據互不影響。【GitHub示例】編程
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 71_Process_join.py @Time : 2019/10/10 23:38 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from multiprocessing import Process
import time
import os
def join_task(name):
print("%s id: %s is running..., parent id is <%s>" % (name, os.getpid(), os.getppid()))
time.sleep(3)
print("%s id: %s has done===, parent id is <%s>" % (name, os.getpid(), os.getppid()))
# join併發
def join_concurrent(name, n):
print("---%s id: %s is running..., parent id is <%s>---" % (name, os.getpid(), os.getppid()))
time.sleep(n)
# print("---%s id: %s has done===, parent id is <%s>---" % (name, os.getpid(), os.getppid()))
# join串行
def join_serial(name, n):
print("$$$%s id: %s is running..., parent id is <%s>$$$" % (name, os.getpid(), os.getppid()))
time.sleep(n)
print("$$$%s id: %s has done===, parent id is <%s>$$$" % (name, os.getpid(), os.getppid()))
n = 100
# 進程之間的空間是隔離的
def spatial_isolation():
global n
n = 0
print("子進程內的n: ", n)
if __name__ == "__main__":
# join檢測子進程
p_task = Process(target=join_task('task進程'), name="自定義的進程的名稱")
p_task.start()
# 加入join方法後必定會等到子進程結束之後纔會執行主進程
print("子進程存活狀態:", p_task.is_alive())
p_task.join()
print("子進程存活狀態:", p_task.is_alive())
print("主進程的id:%s, 及父進程的id:%s" % (os.getpid(), os.getppid()))
print("驗證了存在殭屍進程:", p_task.pid)
print("子進程的名稱:", p_task.name)
# join併發
start_time_concurrent = time.time()
p_concurrent_1 = Process(target=join_concurrent, args=('join併發1', 3))
p_concurrent_2 = Process(target=join_concurrent, args=('join併發2', 2))
p_concurrent_3 = Process(target=join_concurrent, args=('join併發3', 3))
p_concurrent_1.start()
p_concurrent_2.start()
p_concurrent_3.start()
# 三個是以併發進行的,只是等待最長的程序執行完畢纔算結束
p_concurrent_1.join()
p_concurrent_2.join()
p_concurrent_3.join()
print("join併發主進程總耗時:", (time.time() - start_time_concurrent))
# join串行
start_time_serial = time.time()
p_serial_1 = Process(target=join_concurrent, args=('join串行1', 3))
p_serial_2 = Process(target=join_concurrent, args=('join串行2', 2))
p_serial_3 = Process(target=join_concurrent, args=('join串行3', 1))
# 每一個都是執行完之後再進行下一步
p_serial_1.start()
p_serial_1.join()
p_serial_2.start()
p_serial_2.join()
p_serial_3.start()
p_serial_3.join()
print("join串行主進程總耗時:", (time.time() - start_time_serial))
# 進程之間的空間是隔離的
p_spatial_isolation = Process(target=spatial_isolation)
p_spatial_isolation.start()
print('主進程內n: ', n)
複製代碼
若是咱們有兩個任務須要併發執行,那麼開一個主進程和一個子進程分別去執行就ok了,若是子進程的任務在主進程任務結束後就沒有存在的必要了,那麼該子進程應該在開啓前就被設置成守護進程。主進程代碼運行結束,守護進程隨即終止;未加守護進程的子進程會繼續執行。守護進程(這樣看來守護進程更像是陪葬進程)的特色:json
守護進程會在主進程代碼結束後就結束了;
守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children。
複製代碼
【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 71_Process_protect.py @Time : 2019/10/11 12:54 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from multiprocessing import Process
import time
import os
def task1(name, n):
print("%s id: %s is running..., parent id is <%s>" % (name, os.getpid(), os.getppid()))
time.sleep(n)
print("%s id: %s has done===, parent id is <%s>" % (name, os.getpid(), os.getppid()))
# 字進程不容許建立子進程
# p = Process(target=time.sleep, args=(2, 3))
# p.start()
def task2(name, n):
print("%s id: %s is running..., parent id is <%s>" % (name, os.getpid(), os.getppid()))
time.sleep(n)
print("%s id: %s has done===, parent id is <%s>" % (name, os.getpid(), os.getppid()))
if __name__ == "__main__":
p_task1 = Process(target=task1, args=("task1", 3))
p_task2 = Process(target=task2, args=("task2", 3))
#必定要在p.start()前設置, 設置p爲守護進程, 禁止p建立子進程, 而且父進程代碼執行結束, p即終止運行
p_task1.daemon = True
# 未加守護進程的任務還會繼續運行
p_task1.start()
p_task2.start()
print("主程序%s......")
複製代碼
保證數據輸出不錯亂。利用互斥鎖,能夠經過添加互斥鎖的位置,實現部分程序執行達到串行的效果,其餘程序仍然能夠並行執行,而添加了join只能執行完了以後才能執行下一個,若做爲每項都添加一個join,則都要串行執行。從而大大下降了效率。【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 71_Process_mutextlock.py @Time : 2019/10/11 16:40 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from multiprocessing import Process, Lock
import time
import json
# 未加互斥鎖
def unlock(name):
print("%s 1" % name)
time.sleep(1)
print("%s 2" % name)
time.sleep(1)
print("%s 3" % name)
# ------------------------------------------------------------
# 加互斥鎖
def lock(name, mutex):
mutex.acquire()
print('%s 1' % name)
time.sleep(1)
print('%s 2' % name)
time.sleep(1)
print('%s 3' % name)
mutex.release()
# ------------------------------------------------------------
# 模擬搶票過程
def search_ticket(name):
time.sleep(1)
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
print("<%s> 查看到剩餘票數爲【%s】" % (name, dic['count']))
def buy_ticket(name):
time.sleep(1)
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(3)
json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
print('<%s> 購票成功' % name)
else:
print('<%s> 暫無餘票' % name)
def ticket_task(name, mutex):
# 查票每一個人均可以執行,是併發執行操做
search_ticket(name)
mutex.acquire() # 添加互斥鎖,達到串行執行,使得只有一我的購票成功
buy_ticket(name)
mutex.release()
if __name__ == "__main__":
# for i in range(3):
# p_unlock = Process(target=unlock, args=("未進程%s" % i,))
# p_unlock.start()
# 執行結果:
# ------------------------
# 進程0 1
# 進程1 1
# 進程2 1
# 進程0 2
# 進程1 2
# 進程2 2
# 進程0 3
# 進程1 3
# 進程2 3
# mutex = Lock()
# for i in range(3):
# p_lock = Process(target=lock, args=("加鎖進程%s" % i, mutex))
# p_lock.start()
# 執行結果:
# ------------------------
# 加鎖進程0 1
# 加鎖進程0 2
# 加鎖進程0 3
# 加鎖進程1 1
# 加鎖進程1 2
# 加鎖進程1 3
# 加鎖進程2 1
# 加鎖進程2 2
# 加鎖進程2 3
# 模擬搶票
mutex = Lock()
for i in range(10):
p_ticket = Process(target=ticket_task, args=("乘客%s" % i, mutex))
p_ticket.start()
# 利用互斥鎖,能夠經過添加互斥鎖的位置,實現部分程序執行達到串行的效果,其餘程序仍然能夠並行執行,而添加了join只能執行完了以後才能執行下一個,若做爲每項都添加一個join,則都要串行執行。從而大大下降了效率。
p_ticket.join()
複製代碼
隊列的意義:解決你們共享硬盤文件,效率低以及使用內存解決加鎖這個繁瑣的步驟。multiprocessing模塊提供了IPC(internet process communicate)進程之間的通訊,隊列以及管道,這兩種方式都是使用消息傳遞的,隊列就是管道加鎖的實現。【GitHub示例】
Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
參數:
maxsize---隊列中最大的項數,能夠放置任意類型的數據,省略則無大小限制。
--- 可是:
隊列內存放的是消息而非大數據
隊列佔用的是內存空間,於是maxsize即使是無大小限制也受限於內存大小
方法:
q.put --- 用以插入數據到隊列中,數據不宜過大
q.get --- 能夠從隊列讀取並刪除一個元素
複製代碼
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 71_Process_queue.py @Time : 2019/10/11 19:50 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from multiprocessing import Process, Queue
q = Queue(3)
q.put(1)
q.put(2)
print("當前隊列是否已滿:", q.full())
q.put(3)
# 判斷隊列是否滿了
print("當前隊列是否已滿:", q.full())
try:
q.put(4, block=True, timeout=3)
except:
print("隊列已滿,當前隊列的深度爲:%s" % q.qsize())
print("--------華麗的分割線--------")
print(q.get())
print(q.get())
print("當前隊列是否已空:%s" % q.empty())
print(q.get())
# 判斷隊列是否爲空
print("當前隊列是否已空:%s" % q.empty())
try:
q.get(block=True, timeout=3)
except:
print("隊列已空,當前隊列的深度爲:%s" % q.qsize())
print("--------華麗的分割線--------")
if __name__ == "__main__":
pass
複製代碼
生產者(生產數據的任務),消費者(處理數據的任務)。生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。
生產者和消費者之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接給阻塞隊列,消費者直接從阻塞隊列裏取。阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力,阻塞隊列就是用來給生產者和消費者解耦的。
【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 71_producer_consumer_model.py @Time : 2019/10/11 20:22 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from multiprocessing import Process, Queue
import time
def producer(q, name):
for i in range(3):
res = "包子%s" % i
time.sleep(0.5)
print("%s 生產了 %s" % (name, res))
q.put(res)
def consumer(q, name):
while True:
res = q.get()
if res is None:
break
time.sleep(1)
print("%s 吃了%s" % (name, res))
if __name__ == "__main__":
# 隊列容器
q = Queue()
# 生產者們
p1 = Process(target=producer, args=(q, '生產者1'))
p2 = Process(target=producer, args=(q, '生產者2'))
p3 = Process(target=producer, args=(q, '生產者3'))
# 消費者們
c1 = Process(target=consumer, args=(q, '消費者1'))
c2 = Process(target=consumer, args=(q, '消費者2'))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
# 生產完了纔開始吃
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print("主進程")
複製代碼
JoinableQueue對象中隊列容許羨慕的使用者通知生產者項目已經被成功處理,通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue基本使用
參數
maxsize --- 隊列中容許的最大項數,省略則無大小限制;
方法
JoinableQueue的方法共用與Queue的方法
q.task_done() --- 使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
q.join() --- 生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
複製代碼
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 71_producer_consumer_model.py @Time : 2019/10/11 20:22 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from multiprocessing import Process, Queue, JoinableQueue
import time
def producer_j(qj, name):
for i in range(3):
res = "包子%s" % i
time.sleep(0.5)
print("%s 生產了 %s" % (name, res))
qj.put(res)
qj.join()
def consumer_j(qj, name):
while True:
res = qj.get()
if res is None:
break
time.sleep(1)
print("%s 吃了%s" % (name, res))
qj.task_done()
if __name__ == "__main__":
# ------------------------
qj = JoinableQueue()
# 生產者們
pj1 = Process(target=producer_j, args=(qj, "生產者j1"))
pj2 = Process(target=producer_j, args=(qj, "生產者j2"))
pj3 = Process(target=producer_j, args=(qj, "生產者j3"))
# 消費者們
cj1 = Process(target=consumer_j, args=(qj, "消費者j1"))
cj2 = Process(target=consumer_j, args=(qj, "消費者j2"))
cj1.daemon = True # 若不添加守護進程,則會卡在消費者
cj2.daemon = True
pj1.start()
pj2.start()
pj3.start()
cj1.start()
cj2.start()
pj1.join()
pj2.join()
pj3.join()
print('JoinableQueue主進程')
複製代碼
線程:把進程看成車間的話,顧名思義線程就是一條流水線工做的過程,並且這條流水線必須屬於一個車間,一個進程中至少有一個線程,進程負責把線程整合起來。
多進程:在一個進程中存在多個線程,多個線程共享該進程的地址空間,至關於一個車間內有多條流水線,都共用一個車間的資源。
第一種:調用Thread類的方法; 第二種:繼承Thread類 【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 72_Thread.py @Time : 2019/10/12 12:18 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from threading import Thread
import time
import random
def thread_one(name):
print("%s is running." % name)
time.sleep(random.randrange(1, 3))
print("%s run end====" % name)
class ThreadTwo(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print("%s is running..." % self.name)
time.sleep(random.randrange(1, 3))
print("%s run end two====" % self.name)
if __name__ == "__main__":
# 建立線程第一種方式
to = Thread(target=thread_one, args=("第一種建立線程的方式...", ))
to.start()
print("主線程一")
# 建立線程第二種方式
tt = ThreadTwo('第二種建立線程的方式...')
tt.start()
print("主線程二")
複製代碼
【GitHub示例】 區別一:開啓速度:
(1)t1.start() 的同時,線程就開啓了(先打印t1 running 肉眼可觀察的速度)
(2)p1.start() 將開啓進程的信號發給操做系統後,操做系統要申請內存空間,讓好拷貝父進程地址空間到子進程,開銷遠大於線程。
複製代碼
# 1. 開啓速度
# 1.1 在主進程下開啓線程
from threading import Thread
from multiprocessing import Process
import time
def run(name):
print("%s thread is running..." % name)
time.sleep(2)
print("%s thread run done===" % name)
# 1.2 在主進程下開啓子進程
def child_process(name):
print("%s process is running..." % name)
time.sleep(2)
print("%s process run done===" % name)
if __name__ == "__main__":
t1 = Thread(target=run, args=('主進程下的線程', ))
t1.start()
print("\n主進程下有線程")
p1 = Process(target=child_process, args=("主進程下的進程", ))
p1.start()
print("\n主進程下有子進程")
複製代碼
區別二:pid不一樣
(1)同一進程下的線程的pid跟主進程的pid同樣
(2)開多個進程,每一個進程都有不一樣的pid
複製代碼
# 2.pid不一樣
# 2.1 在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣【pid(process id:進程的id號)】
import os
def pid_thread_task(name):
# 一個進程內多個線程是平級別的
print("%s 子進程:%s" % (name, os.getpid()))
# 2.2 開多個進程,每一個進程都有不一樣的pid
def pid_process_task(name):
print("%s 子進程的pid: %s, 父進程的pid: %s" %(name, os.getpid(), os.getppid()))
if __name__ == "__main__":
# 2.1
pid_thread_task_1 = Thread(target=pid_thread_task, args=(' 線程1', ))
pid_thread_task_2 = Thread(target=pid_thread_task, args=(' 線程2', ))
pid_thread_task_1.start()
pid_thread_task_2.start()
print("\n 2.1的主進程:", os.getpid())
# 2.2
pid_process_task_1 = Process(target=pid_process_task, args=(' 進程1', ))
pid_process_task_2 = Process(target=pid_process_task, args=(' 進程1', ))
pid_process_task_1.start()
pid_process_task_2.start()
print("\n 2.2的主進程:", os.getpid())
複製代碼
區別三:同一進程的多個線程共享該進程的地址空間;父進程與子進程不共享地址空間,進程之間的地址空間是隔離的
# 3.同一進程內的多個線程共享該進程的地址空間,父進程與子進程不共享地址空間,進程之間的地址空間是隔離的
n = 100
def task3(name, num1):
global n
n = 0
print("=====%s=====" % name)
n = n + num1
print("-----%s-----" % n)
if __name__ == "__main__":
# 3
p3 = Process(target=task3, args=("進程31", 100))
t31 = Thread(target=task3, args=("線程31", 100))
t32 = Thread(target=task3, args=("線程32", 200))
# p3.start()
# p3.join()
t31.start()
t32.start()
print("\n\n【p3主進程】", n)
複製代碼
彙總下二者的區別: (1)啓動線程的速度要比啓動進程的速度快不少,啓動進程進程的開銷更大 (2)在主進程下面開啓的多個線程,每一個線程都和主進程的pid(進程的id)一致;在主進程下開啓多個子進程,每一個進程都有不同的pid (3)統一進程內的多個線程共享該進程的地址空間;父進程與子進程不共享地址空間,代表進程之間的地址空間是隔離的
【GitHub示例】 Thread實例對象的方法:
isAlive():返回線程是否活動的;
getName():返回線程名;
setName():設置線程名
複製代碼
from threading import Thread
import time
def task():
# 獲取當前線程的名字
print("%s is running" % currentThread().getName())
time.sleep(1)
print("%s is done ======" % currentThread().getName())
if __name__ == "__main__":
t = Thread(target=task, name='子線程1')
t.start()
t.setName("更名爲新線程名稱1")
t.join()
print("新的子線程的名稱爲:", t.getName())
print("線程是否存活:", t.isAlive())
複製代碼
threading模塊提供的一些方法:
threading.currentThread():返回當前的線程變量
threading.enumerate():返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount():返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
複製代碼
from threading import Thread, currentThread, active_count, enumerate
import time
def task():
# 獲取當前線程的名字
print("%s is running" % currentThread().getName())
time.sleep(1)
print("%s is done ======" % currentThread().getName())
if __name__ == "__main__":
currentThread().setName("主線程新名字")
print("主線程的名字:", currentThread().getName())
print("線程是否存活:", t.isAlive())
t.join()
print("查看活躍的線程數:", active_count())
print("將當前活躍的線程顯示出來:", enumerate())
複製代碼
不管是進程仍是線程,都遵循:守護XXXX非等待主XXXX運行完畢後被銷燬 對於程序運行完畢的一點補充:
(1)對主進程來講,運行完畢指的是主進程代碼運行完畢;主進程在代碼結束之後就算運行完畢了(守護進程在此時就別回收了),而後主進程會一直等非守護的子進程運行完畢後回收子進程的資源,不然非產生殭屍進程,纔會結束。
(2)對主線程來講,運行完畢子的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢。主線程在其餘非守護線程運行完畢纔算運行完畢(守護線程在此時會被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程保證非守護線程都運行完畢後才結束。
複製代碼
【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 72_Thread_protect.py @Time : 2019/10/14 17:37 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from threading import Thread
import time
def run(name):
time.sleep(2)
print("%s is running..." % name)
def thread1():
print(123)
time.sleep(1)
print("end123")
def thread2():
print(456)
time.sleep(0.5)
print("end456")
if __name__ == "__main__":
#t = Thread(target=run, args=("線程1", ))
# # t.setDaemon(True) 等價於 t.daemon = True
# # t.setDaemon(True)
# t.daemon = True
# t.start()
# print("主進程")
# print(t.is_alive())
# 輸出結果:
# ---------------------由於主進程結束,守護主線程也就跟着結束了,因此不打印守護線程的語句
# 主進程
t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.daemon = True
t1.start()
t2.start()
print("主進程2")
複製代碼
線程中的互斥鎖的目的:一個進程內的多個線程是共享彼此的地址空間,所以彼此之間數據也是共享的,所以帶來的競爭可能將數據改亂。【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 72_Thread_lock.py @Time : 2019/10/14 19:28 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from threading import Thread, Lock
import time
n1 = 100
n2 = 100
def unlock_task():
# 未加鎖以前
global n1
temp = n1
# 未加鎖以前,100個線程都停留在這而且temp都等於100
time.sleep(0.1)
n1 = temp - 1
def lock_task():
global n2
# 開始加鎖
mutex.acquire()
temp = n2
time.sleep(0.1)
n2 = temp - 1
# 解鎖
mutex.release()
if __name__ == "__main__":
# 未加鎖以前
t_1 = []
for i in range(100):
t = Thread(target=unlock_task)
t_1.append(t)
t.start()
for t in t_1:
t.join()
print('未加鎖的主進程', n1)
# ----- 執行結果 -----
# 主進程 100
# 加鎖
mutex = Lock()
t_2 = []
for i2 in range(100):
t2 = Thread(target=lock_task)
t_2.append(t2)
t2.start()
for t2 in t_2:
t2.join()
print("加鎖後的主進程", n2)
複製代碼
【GitHub示例】
信號量:信號量也是一把鎖,對比互斥鎖同一時間只能有一個任務簽到鎖去執行,信號量同一時間能夠有指定多個任務拿鎖去執行。
# 1. 信號量(Semaphore)
from threading import Thread, Semaphore, currentThread
# 信號量大小,也就是同一時間3個任務去拿鎖
sm = Semaphore(3)
def semaphore_task():
sm.acquire()
print("%s in" % currentThread().getName())
sm.release()
if __name__ == "__main__":
for i1 in range(10):
t1 = Thread(target=semaphore_task)
t1.start()
複製代碼
Event:每一個線程都是獨立運行且狀態不可預測的,若是某些程序須要經過某個線程的狀態來肯定襲擊下一步的操做,那就用Event。初始狀況下,Event對象中信號標誌被設置爲假;若是有線程等待一個Event對象,而這個對象的標誌爲假,那麼這個線程將會被一直阻塞到該標誌爲真,一個線程若是將Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,則直接忽略這個時間繼續執行。其實Event對象就像一個紅綠燈同樣來控制先後車輛(線程)的運行。
event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。
複製代碼
# 有多個工做線程嘗試連接MySQL,咱們想要在鏈接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,
# 若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做
from threading import Event, Thread, currentThread
import time
event = Event()
def conn_mysql():
n = 0
while not event.is_set():
# print("event.is_set(): ", event.is_set())
if n == 3:
print("%s try many times" % currentThread().getName())
return
print("%s try %s" % (currentThread().getName(), n))
event.wait(0.5)
n += 1
print("%s is connected" % currentThread().getName())
def check():
print("%s is checking" % currentThread().getName())
time.sleep(1)
event.set()
if __name__ == "__main__":
# 2.Event
for i2 in range(3):
t = Thread(target=conn_mysql)
t.start()
t = Thread(target=check)
t.start()
複製代碼
定時器: 定時器,指定n秒後執行某操做
from threading import Timer
def after_run():
print("定時器指定時長後再執行程序")
if __name__ == "__main__":
# 3. Timer
t3 = Timer(2, after_run)
t3.start()
複製代碼
# 利用定時器完成驗證碼的實現
pass
複製代碼
隊列是特別有用在線程編程時必須在多個線程之間交換安全信息。【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 72_Thread_queue.py @Time : 2019/10/14 21:27 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
import queue
# 1. 先進先出
first_in_first_out = queue.Queue(5)
first_in_first_out.put('first')
first_in_first_out.put(2)
first_in_first_out.put('third')
# block = True時阻塞,timeout=3,等待三秒後若是尚未從裏面取出數據,則阻塞
first_in_first_out.put(4, block=True, timeout=3)
print(first_in_first_out.get())
print(first_in_first_out.get())
print(first_in_first_out.get())
print(first_in_first_out.get(block=False))
print("\n")
# 2. 後進先出
last_in_first_out = queue.LifoQueue(3)
last_in_first_out.put("L1")
last_in_first_out.put("L2")
last_in_first_out.put("L3")
print(last_in_first_out.get())
print(last_in_first_out.get())
print(last_in_first_out.get())
print("\n")
# 3. 優先級隊列(存儲數據時可設置優先級的隊列)
priority = queue.PriorityQueue(3)
priority.put((10, 'one'))
priority.put((40, 'two'))
priority.put((20, 'three'))
print(priority.get())
print(priority.get())
print(priority.get())
複製代碼
目的:解決多線程或多進程實現併發的套接字通訊會使服務的開啓的進程或線程數都會隨着開發的客戶端數增多最後可能因不堪重負而癱瘓,進程池或線程池的用途就是對服務端開啓的進程數或線程數加以控制,讓程序本身能夠在一個可承受的範圍內運行。【GitHub示例】
concurrent.futures 模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
基本方法:
submit(fn, *args, **kwargs) --- 異步提交任務
map(func, *iterables, timeout=None, chunksize=1) --- 取代for循環submit的操做
shutdown(wait=True) --- 至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前
result(timeout=None) --- 取得結果
add_done_callback(fn) --- 回調函數
複製代碼
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 72_ThreadPool_ProcessPool.py @Time : 2019/10/15 10:20 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import os
import random
def task(name):
print("name: %s pid: %s run" % (name, os.getpid()))
time.sleep(random.randrange(1, 3))
if __name__ == "__main__":
# 設置最大同時運行的進程數
# p = ProcessPoolExecutor(4)
p = ThreadPoolExecutor(4)
for i in range(10):
# 異步提交任務,提交後不用管進程是否執行
p.submit(task, '任務 %s' % i)
# 將進程池的入口關閉,等待任務提交結束後才執行後面的任務, 默認wait=True
p.shutdown(wait=True)
print("主進程")
複製代碼
同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果再執行下一行代碼,致使程序使串行執行,同步時提交任務的一種方式,不是阻塞的結果
異步調用:提交完任務後,不等待任務執行完畢。 【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 72_Thread_Asynchronous_call_recall.py @Time : 2019/10/15 11:12 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
from concurrent.futures import ThreadPoolExecutor
import time
import random
def swim(name):
print("%s is swimming" % name)
time.sleep(random.randint(1, 5))
swim_len = random.randint(10, 20)
return {"name": name, 'swim_len': swim_len}
def distance(swim_res):
# swim_res = swim_res.result()
name = swim_res['name']
s_length = swim_res['swim_len']
print("%s 遊了 %s km" % (name, s_length))
if __name__ == '__main__':
# 1. 同步調用
pool = ThreadPoolExecutor(13)
swim_res1 = pool.submit(swim, 'A').result()
distance(swim_res1)
swim_res2 = pool.submit(swim, 'B').result() # result取得結果
distance(swim_res2)
swim_res3 = pool.submit(swim, 'C').result()
distance(swim_res3)
# 2. 異步調用
# pool = ThreadPoolExecutor(13)
# pool.submit(swim, 'AA').add_done_callback(distance)
# pool.submit(swim, 'BB').add_done_callback(distance)
# pool.submit(swim, 'CC').add_done_callback(distance)
print("#############################")
複製代碼
協程:協程,又稱微線程,纖程。英文名Coroutine。協程其實能夠認爲是比線程更小的執行單元,他自帶CPU上下文。這樣只要在合適的時機,咱們能夠把一個協程切換到另外一個協程。只要這個過程當中保存或恢CPU上下文那麼程序仍是能夠運行的。
線程和進程都有搶佔式特色,線程和進程之間的切換咱們是不能參與的。協程是非搶佔式的,協程的切換是由用戶來控制的,協程主要解決的是IO的操做。
協程的優勢:
(1)協程的切換是子程序切換,是由程序自身控制,沒有線程切換的開銷,和多線程比線程的數量越多,協程的性能優點越明顯
(2)不須要多線程鎖的機制,由於只有一個線程,也不存在同時寫變量的衝突。在協程中共享資源不須要加鎖,只須要判斷狀態就行了。因此執行效率比多線程好。
如何利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。
複製代碼
生成器中的yield只有在調用的時候才執行。【GitHub示例】
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
""" @File : 73_Coroutine_yeild.py @Time : 2019/10/15 12:38 @Author : Crisimple @Github : https://crisimple.github.io/ @Contact : Crisimple@foxmail.com @License : (C)Copyright 2017-2019, Micro-Circle @Desc : None """
import time
def task1(name):
for i in range(3):
print("%s is running...%s" % (name, i))
yield
time.sleep(1)
def task2(name):
for i in range(3):
print("%s is running...%s" % (name, i))
yield
time.sleep(1)
def main():
g1 = task1('任務1')
g2 = task2('任務2')
for i in range(3):
next(g1)
next(g2)
print("執行完畢")
if __name__ == "__main__":
main()
複製代碼
Python有兩種垃圾回收機制:
當數據的引用數變成0的時候,python解釋器就認爲這個數據是來及,進行垃圾回收,釋放空間
複製代碼
經過對象存在的時間不一樣,採用不一樣的算法來回收垃圾,形象的比喻, 三個鏈表,零代鏈表上的對象(新建立的對象都加入到零代鏈表),引用數都是一,每增長一個指針,引用加一,隨後python會檢測列表中的互相引用的對象,根據規則減掉其引用計數. GC算法對鏈表一的引用減一,引用爲0的,清除,不爲0的到鏈表二,鏈表二也執行GC算法,鏈表三同樣. 存在時間越長的數據,越是有用的數據.複製代碼