目錄python
進程就是一個程序在一個數據集上的一次動態執行過程,通常由程序、數據集和進程控制塊三部分組成;多線程
程序用來描述進程要完成哪些功能以及如何完成;併發
數據集則是程序在執行過程當中所須要使用的資源;app
進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系
統感知進程存在的惟一標誌.dom
在一個程序的運行過程當中,由於一個CPU在一個時間點只能運行一個程序,不過爲了保持併發的效果,程序必須不斷地切換,使得每個程序在必定時間內都能被執行一次.這樣進程就出現了.async
一個總體的程序運行可能要被分紅許多不一樣的進程,可是進程之間卻沒法進行通訊,所以爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹一件事的缺陷,線程就誕生了.函數
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,它沒有本身獨立的內存單位,而是和多個線程共享一個內存單位.其本質是由線程ID、程序計數器、寄存器集合和堆棧共同組成.線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能.線程沒有本身的系統資源.性能
進程是最小的資源單位,線程是最小的執行單位ui
在Python裏面,無論有多少CPU和過程,Python都只會默認的讓一個CPU按順序地一個一個地執行進程.操作系統
import threading, time ##調用threading模塊 def test(num): ##建立一個函數 print("It's %s" %num) time.sleep(3) print("%s end" %num) if __name__ == "__main__": t1 = threading.Thread(target = test, args = (1,)) ##調用Thread方法,並傳入函數和值 t2 = threading.Thread(target = test, args = (2,)) t1.start() ##讓線程開始運做 t2.start() print("End") It's 1 It's 2 End 1 end 2 end
import threading, time class MyTread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): print("我打印的數值是%s" %self.num) time.sleep(3) print("%s打印完了" %self.getName()) if __name__ == "__main__": s1 = MyTread(1) s2 = MyTread(2) s1.start() s2.start() print("End……") 我打印的數值是1 我打印的數值是2 End…… ##(等了3秒) Thread-1打印完了 Thread-2打印完了
在子線程完成運行以前,這個子線程的父進程將一直被阻塞
import threading, time class ListenMusic(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): print("%s is listening music! %s" %(self.name,time.ctime())) time.sleep(4) print("OK! %s" %time.ctime()) class DoHomework(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): print("%s is doing homework! %s" %(self.name, time.ctime())) time.sleep(2) print("Homework has been done! %s" %time.ctime()) if __name__ == "__main__": s1 = ListenMusic("Hermaeus") s2 = DoHomework("YuanMing") s1.start() s2.start() # s1.join() s2.join() print("End…… %s" %time.ctime()) Hermaeus is listening music! Wed Aug 29 17:49:54 2018 YuanMing is doing homework! Wed Aug 29 17:49:54 2018 Homework has been done! Wed Aug 29 17:49:56 2018 End…… Wed Aug 29 17:49:56 2018 OK! Wed Aug 29 17:49:58 2018
將線程申明爲守護線程,只要主線程執行完退出那麼守護線程也會退出.
import threading, time class ListenMusic(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): print("%s is listening music! %s" %(self.name,time.ctime())) time.sleep(4) print("OK! %s" %time.ctime()) class DoHomework(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.name = name def run(self): print("%s is doing homework! %s" %(self.name, time.ctime())) time.sleep(2) print("Homework has been done! %s" %time.ctime()) if __name__ == "__main__": s1 = ListenMusic("Hermaeus") s2 = DoHomework("YuanMing") s1.setDaemon(True) ##這個必須放在start()方法前面 s1.start() s2.start() print("End…… %s" %time.ctime()) Hermaeus is listening music! Wed Aug 29 18:32:52 2018 YuanMing is doing homework! Wed Aug 29 18:32:52 2018 End…… Wed Aug 29 18:32:52 2018 Homework has been done! Wed Aug 29 18:32:54 2018
執行下面代碼,會每獲得不一樣的值,那是由於在執行IO操做時,有的線程沒有了原來的數據,又有的線程拿到了被執行事後的數據,致使數據之間混亂.
import time import threading def addNum(): global num temp=num time.sleep(0.0012) num =temp-1 num = 100 for i in range(100): t = threading.Thread(target=addNum) t.start() print('final num:', num )
咱們可使用同步鎖來使得數據之間能同步,
import time, threading def test(): global num lock.acquire() ##用acquire和release鎖住 temp = num time.sleep(0.00001) num = temp - 1 lock.release() num = 100 lock = threading.Lock() ##實例化一個同步鎖 l = [] for i in range(100): s = threading.Thread(target = test) s.start() l.append(s) for ready in l: ready.join() ##必需要join每個線程 print(num)
在多個線程裏面,幾個線程佔有必定資源,而且等待對方釋放資源以使用,這樣就會致使死鎖現象
import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join() Thread-1 gotlockA Wed Aug 29 19:16:53 2018 Thread-1 gotlockB Wed Aug 29 19:16:56 2018 Thread-1 gotlockB Wed Aug 29 19:16:56 2018 Thread-2 gotlockA Wed Aug 29 19:16:56 2018 ………………………………(會僵死在這裏)
爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
import threading,time class myThread(threading.Thread): def doA(self): lock.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lock.acquire() print(self.name,"gotlockB",time.ctime()) lock.release() lock.release() def doB(self): lock.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lock.acquire() print(self.name,"gotlockA",time.ctime()) lock.release() lock.release() def run(self): self.doA() self.doB() if __name__=="__main__": lock = threading.RLock() ##建立一個遞歸鎖的實例 threads = [] for i in range(2): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()
同步條件是一個簡單的同步對象,一個同步條件就至關於一箇中間標誌;線程會等待直到標誌被設置和清除。
當標誌被設定後,wait方法不會形成任何影響,可是當標誌被清除後,wait會阻塞線程
import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚你們都要加班到22:00。") print(event.isSet()) event.set() time.sleep(3) print("BOSS:<22:00>能夠下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() BOSS:今晚你們都要加班到22:00。 False Worker:哎……命苦啊! Worker:哎……命苦啊! Worker:哎……命苦啊! Worker:哎……命苦啊! Worker:哎……命苦啊! BOSS:<22:00>能夠下班了。 False Worker:OhYeah! Worker:OhYeah! Worker:OhYeah! Worker:OhYeah! Worker:OhYeah!
信號量用來控制線程併發數,BoundedSemaphore和Semaphore管理一個內置的計數器,調用一次acquire方法時-1,
調用一次release方法時+1。若是計數器爲0時,將會阻塞,直到其餘線程release釋放。BoundedSemaphore會在每一次調用release方式檢查計數器是否超過了計數器的初始值,若是超過了就會拋出一個異常。
import threading, time class MyThread(threading.Thread): def run(self): if semaphore.acquire(): ## 判斷 print("%s start %s" %(self.getName, time.ctime)) time.sleep(2.5) print("%s end %s" %(self.getName, time.ctime)) semaphore.release() if __name__ == "__main__": semaphore = threading.Semaphore(5) ##實例化一個信號量 threads = [] for i in range(20): threads.append(MyThread()) for i in threads: i.start() for i in threads: i.join()
這是解決多線程的重要利器,可讓每個線程都能拿到隊列裏面的數據。
import time, threading, queue, random q_list = queue.Queue() ##實例化一個隊列,裏面自定義了一個maxsize爲無限大小,可修改 ##Queue是先進先出,LifoQueue是先進後出,PriorityQueue是優先級別越低越先出來 def add_num(): for i in range(10): q_list.put(i) ##put是放入數據 time.sleep(random.randint(1,2)) print("add_num run over!") def minus_num(): while True: if not q_list.empty(): ##empty是判斷隊列是否爲空 print("取得數據 %s" %q_list.get()) time.sleep(random.randint(2,4)) else: print("minus_num run over!") break s1 = threading.Thread(target=add_num) s2 = threading.Thread(target=minus_num) s1.start() s2.start() q_list.qsize() ##返回隊列大小 q_list.full() ##判讀隊列是不是滿的 q_list.put() ##lock爲可選參數,默認爲1,若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元;若是block爲0,put方法將引起Full異常。 q_list.get([block[,timeout]]) ##獲取隊列。選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用;若是隊列爲空且block爲False,隊列將引起Empty異常。timeout是等待時間。 q_list.get_nowait() ##至關於q_list(False) q_list.put_nowait() ##至關於q_list(item, False) q_list.task_done() ##完成一項工做後,向任務已經完成的隊列發送信號 q_list.join() ##等隊列爲空,才執行別的操做
該模塊能夠利用multiprocessing.Process對象來建立一個進程,其用法與threading包中的同名類一致,因此multiprocessing的很大一部分與threading使用同一套API,只不過換到了多進程的情景。
#####構造方法 Process([group [, target [, name [, args [, kwargs]]]]]) #group:線程組,由於沒有實現因此引用時提示必須是None #target:要執行的函數 #name:進程名 #args/kwargs:要傳入的參數 #####實例的方法 is_alive():#返回進程是否在運行 join([timeout]):#阻塞進程,直達調用此方法的進程終止或到達指定的timeout start():#調用進程 run():#在繼承式調用裏面,必須覆蓋這個函數 terminate():#無論任務是否完成,當即中止工做 #####屬性 daemon:#和線程的setDeamon功能同樣 #p.daemon=True name:#進程名字 pid:#進程號
#####實例化 from multiprocessing import Process import time def f(name): time.sleep(1) print("Hello %s %s" %(name, time.ctime())) if __name__ == "__main__": p_list = [] for i in range(3): p = Process(target = f, args = ("Hermaeus",)) p_list.append(p) p.start() #####繼承式 from multiprocessing import Process import time class MyProcess(Process): def __init__(self, name): super(MyProcess, self).__init__() self.name = name def run(self): time.sleep(1) print("Hello %s %s" %(self.name, time.ctime())) if __name__ == "__main__": p_list = [] for i in range(3): p = MyProcess("Hermaeus") p.start() p_list.append(p)
from multiprocessing import Process, Queue def test(q,i): ##要把隊列名字和值一塊兒傳進去 q.put(i) if __name__ == "__main__": q = Queue() for i in range(3): p = Process(target = test, args = (q,i,)) p.start() print(q.get()) print(q.get()) print(q.get())
from multiprocessing import Process, Pipe def f(conn): conn.send("OK!") response = conn.recv() print("response: %s" %response) conn.close() if __name__ == "__main__": parent_conn, child_conn = Pipe() p = Process(target = f, args = (child_conn,)) p.start() print(parent_conn.recv()) parent_conn.send("From parent_conn!") p.join()
from multiprocessing import Process, Manager def test(d,l,n): d[n] = n l.append(n) if __name__ == "__main__": with Manager() as manager: d = manager.dict() l = manager.list() p_list = [] for i in range(10): p = Process(target=test, args=(d,l,i,)) p.start() p_list.append(p) for p in p_list: ##必需要有join p.join() print(d) print(l)
from multiprocessing import Process, Lock import time def test(l,i): l.acquire() time.sleep(3) print("Hi, %s" %i) l.release() if __name__ == "__main__": lock = Lock() for i in range(3): p = Process(target=test, args=(lock, i)) p.start()
from multiprocessing import Process, Pool import time, os def test(i): time.sleep(1) print("From test %s" %os.getpid()) return "Hello, %s" %i def Fd(arg): print(arg) if __name__ == "__main__": pool = Pool(2) ##實例化池 for i in range(10): pool.apply(func=test, args=(i,)) ##同步接口 pool.apply_async(func=test, args=(i,), callback = Fd) ##回調 pool.close() pool.join() print("END......")
import time import queue def consumer(name): print("--->ready to eat baozi...") while True: new_baozi = yield ##在此處卡住,知道另外線程發送send print("[%s] is eating baozi %s" % (name,new_baozi)) def producer(): r = con.__next__() ##運行consumer r = con2.__next__() n = 0 while 1: time.sleep(1) print("producer is making baozi %s and %s" %(n,n+1) ) con.send(n) ##send,傳遞資源 con2.send(n+1) n +=2 if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
from greenlet import greenlet def quest_foo(): print("你好!") answ_foo.switch() ##switch()主管切換功能 print("再見!") answ_foo.switch() def answ_foo(): print("你好!") quest_foo.switch() print("再見!") if __name__ == "__main__": quest_foo = greenlet(quest_foo) answ_foo = greenlet(answ_foo) answ_foo.switch()