Python併發式編程

進程

進程就是一個程序在一個數據集上的一次動態執行過程,通常由程序、數據集和進程控制塊三部分組成;多線程

程序用來描述進程要完成哪些功能以及如何完成;併發

數據集則是程序在執行過程當中所須要使用的資源;app

進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系
統感知進程存在的惟一標誌.dom

在一個程序的運行過程當中,由於一個CPU在一個時間點只能運行一個程序,不過爲了保持併發的效果,程序必須不斷地切換,使得每個程序在必定時間內都能被執行一次.這樣進程就出現了.async

線程

一個總體的程序運行可能要被分紅許多不一樣的進程,可是進程之間卻沒法進行通訊,所以爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹一件事的缺陷,線程就誕生了.函數

線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,它沒有本身獨立的內存單位,而是和多個線程共享一個內存單位.其本質是由線程ID、程序計數器、寄存器集合和堆棧共同組成.線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能.線程沒有本身的系統資源.性能

進程是最小的資源單位,線程是最小的執行單位ui

GIL(Global Interpreter Lock)

在Python裏面,無論有多少CPU和過程,Python都只會默認的讓一個CPU按順序地一個一個地執行進程.操作系統

線程的調用方式

直接調用

import threading, time  ##調用threading模塊

def test(num):   ##建立一個函數
    print("It's %s" %num)
    time.sleep(3)
    print("%s end" %num)

if __name__ == "__main__":
    t1 = threading.Thread(target = test, args = (1,))  ##調用Thread方法,並傳入函數和值
    t2 = threading.Thread(target = test, args = (2,))

    t1.start()  ##讓線程開始運做
    t2.start()

    
    print("End")
    
   
It's 1
It's 2
End
1 end
2 end

繼承調用

import threading, time

class MyTread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):
        print("我打印的數值是%s" %self.num)
        time.sleep(3)
        print("%s打印完了" %self.getName())

if __name__ == "__main__":
    s1 = MyTread(1)
    s2 = MyTread(2)

    s1.start()
    s2.start()
    print("End……")
    
我打印的數值是1
我打印的數值是2
End……
##(等了3秒)
Thread-1打印完了
Thread-2打印完了

join&Daemon方法

在子線程完成運行以前,這個子線程的父進程將一直被阻塞

import threading, time

class ListenMusic(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        print("%s is listening music! %s" %(self.name,time.ctime()))
        time.sleep(4)
        print("OK! %s" %time.ctime())

class DoHomework(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        print("%s is doing homework! %s" %(self.name, time.ctime()))
        time.sleep(2)
        print("Homework has been done! %s" %time.ctime())

if __name__ == "__main__":
    s1 = ListenMusic("Hermaeus")
    s2 = DoHomework("YuanMing")

    s1.start()
    s2.start()
    # s1.join()
    s2.join()
    print("End…… %s" %time.ctime())
    
Hermaeus is listening music! Wed Aug 29 17:49:54 2018
YuanMing is doing homework! Wed Aug 29 17:49:54 2018
Homework has been done! Wed Aug 29 17:49:56 2018
End…… Wed Aug 29 17:49:56 2018
OK! Wed Aug 29 17:49:58 2018

Daemon(True)

將線程申明爲守護線程,只要主線程執行完退出那麼守護線程也會退出.

import threading, time

class ListenMusic(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        print("%s is listening music! %s" %(self.name,time.ctime()))
        time.sleep(4)
        print("OK! %s" %time.ctime())

class DoHomework(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        print("%s is doing homework! %s" %(self.name, time.ctime()))
        time.sleep(2)
        print("Homework has been done! %s" %time.ctime())

if __name__ == "__main__":
    s1 = ListenMusic("Hermaeus")
    s2 = DoHomework("YuanMing")

    s1.setDaemon(True)  ##這個必須放在start()方法前面

    s1.start()
    s2.start()

    print("End…… %s" %time.ctime())
    
    
Hermaeus is listening music! Wed Aug 29 18:32:52 2018
YuanMing is doing homework! Wed Aug 29 18:32:52 2018
End…… Wed Aug 29 18:32:52 2018
Homework has been done! Wed Aug 29 18:32:54 2018

同步鎖

執行下面代碼,會每獲得不一樣的值,那是由於在執行IO操做時,有的線程沒有了原來的數據,又有的線程拿到了被執行事後的數據,致使數據之間混亂.

import time
import threading

def addNum():
    global num 
    temp=num
    time.sleep(0.0012)
    num =temp-1

num = 100  

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()

print('final num:', num )

咱們可使用同步鎖來使得數據之間能同步,

import time, threading

def test():

    global num
    lock.acquire() ##用acquire和release鎖住
    temp = num
    time.sleep(0.00001)
    num = temp - 1
    lock.release()

num = 100
lock = threading.Lock()  ##實例化一個同步鎖
l = []

for i in range(100):
    s = threading.Thread(target = test)
    s.start()
    l.append(s)

for ready in l:
    ready.join()  ##必需要join每個線程

print(num)

死鎖

在多個線程裏面,幾個線程佔有必定資源,而且等待對方釋放資源以使用,這樣就會致使死鎖現象

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
        
Thread-1 gotlockA Wed Aug 29 19:16:53 2018
Thread-1 gotlockB Wed Aug 29 19:16:56 2018
Thread-1 gotlockB Wed Aug 29 19:16:56 2018
Thread-2 gotlockA Wed Aug 29 19:16:56 2018
………………………………(會僵死在這裏)

遞歸鎖

爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lock.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lock.acquire()
        print(self.name,"gotlockB",time.ctime())
        lock.release()
        lock.release()

    def doB(self):
        lock.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lock.acquire()
        print(self.name,"gotlockA",time.ctime())
        lock.release()
        lock.release()

    def run(self):
        self.doA()
        self.doB()

if __name__=="__main__":
    lock = threading.RLock()  ##建立一個遞歸鎖的實例
    threads = []
    for i in range(2):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

同步條件

同步條件是一個簡單的同步對象,一個同步條件就至關於一箇中間標誌;線程會等待直到標誌被設置和清除。

當標誌被設定後,wait方法不會形成任何影響,可是當標誌被清除後,wait會阻塞線程

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚你們都要加班到22:00。")
        print(event.isSet())
        event.set()
        time.sleep(3)
        print("BOSS:<22:00>能夠下班了。")
        print(event.isSet())
        event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
        
BOSS:今晚你們都要加班到22:00。
False
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
BOSS:<22:00>能夠下班了。
False
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!

信號量(Semaphore)

信號量用來控制線程併發數,BoundedSemaphore和Semaphore管理一個內置的計數器,調用一次acquire方法時-1,

調用一次release方法時+1。若是計數器爲0時,將會阻塞,直到其餘線程release釋放。BoundedSemaphore會在每一次調用release方式檢查計數器是否超過了計數器的初始值,若是超過了就會拋出一個異常。

import threading, time

class MyThread(threading.Thread):
    def run(self):
        if semaphore.acquire():  ## 判斷
            print("%s  start   %s" %(self.getName, time.ctime))
            time.sleep(2.5)
            print("%s  end    %s" %(self.getName, time.ctime))
            semaphore.release()


if __name__ == "__main__":
    semaphore = threading.Semaphore(5)  ##實例化一個信號量
    threads = []
    for i in range(20):
        threads.append(MyThread())

    for i in threads:
        i.start()

    for i in threads:
        i.join()

隊列

這是解決多線程的重要利器,可讓每個線程都能拿到隊列裏面的數據。

import time, threading, queue, random

q_list = queue.Queue()  ##實例化一個隊列,裏面自定義了一個maxsize爲無限大小,可修改
##Queue是先進先出,LifoQueue是先進後出,PriorityQueue是優先級別越低越先出來
def add_num():
    for i in range(10):
        q_list.put(i)    ##put是放入數據
        time.sleep(random.randint(1,2)) 
    print("add_num run over!")

def minus_num():
    while True:
        if not q_list.empty():  ##empty是判斷隊列是否爲空
            print("取得數據 %s" %q_list.get())
            time.sleep(random.randint(2,4))
        else:
            print("minus_num run over!")
            break

s1 = threading.Thread(target=add_num)
s2 = threading.Thread(target=minus_num)
s1.start()
s2.start()

q_list.qsize()  ##返回隊列大小
q_list.full()   ##判讀隊列是不是滿的
q_list.put()  ##lock爲可選參數,默認爲1,若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元;若是block爲0,put方法將引起Full異常。
q_list.get([block[,timeout]])  ##獲取隊列。選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用;若是隊列爲空且block爲False,隊列將引起Empty異常。timeout是等待時間。
q_list.get_nowait()    ##至關於q_list(False)
q_list.put_nowait()    ##至關於q_list(item, False)
q_list.task_done()     ##完成一項工做後,向任務已經完成的隊列發送信號
q_list.join()    ##等隊列爲空,才執行別的操做

多進程模塊

該模塊能夠利用multiprocessing.Process對象來建立一個進程,其用法與threading包中的同名類一致,因此multiprocessing的很大一部分與threading使用同一套API,只不過換到了多進程的情景。

#####構造方法
Process([group [, target [, name [, args [, kwargs]]]]])  
#group:線程組,由於沒有實現因此引用時提示必須是None
#target:要執行的函數
#name:進程名
#args/kwargs:要傳入的參數
#####實例的方法
is_alive():#返回進程是否在運行
join([timeout]):#阻塞進程,直達調用此方法的進程終止或到達指定的timeout
start():#調用進程
run():#在繼承式調用裏面,必須覆蓋這個函數
terminate():#無論任務是否完成,當即中止工做
#####屬性
daemon:#和線程的setDeamon功能同樣  #p.daemon=True
name:#進程名字
pid:#進程號
#####實例化
from multiprocessing import Process
import time
def f(name):
    time.sleep(1)
    print("Hello %s %s" %(name, time.ctime()))

if __name__ == "__main__":
    p_list = []
    for i in range(3):
        p = Process(target = f, args = ("Hermaeus",))
        p_list.append(p)
        p.start()

#####繼承式
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self, name):
        super(MyProcess, self).__init__()
        self.name = name
        
    def run(self):
        time.sleep(1)
        print("Hello %s %s" %(self.name, time.ctime()))

if __name__ == "__main__":
    p_list = []
    for i in range(3):
        p = MyProcess("Hermaeus")
        p.start()
        p_list.append(p)

進程間通信

Queue

from multiprocessing import Process, Queue

def test(q,i):  ##要把隊列名字和值一塊兒傳進去
    q.put(i)

if __name__ == "__main__":
    
    q = Queue()
    for i in range(3):
        p = Process(target = test, args = (q,i,))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

Pipe

from multiprocessing import Process, Pipe

def f(conn):
    conn.send("OK!")
    response = conn.recv()
    print("response: %s" %response)
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p = Process(target = f, args = (child_conn,))
    p.start()
    print(parent_conn.recv())
    parent_conn.send("From parent_conn!")
    p.join()

Managers

from multiprocessing import Process, Manager

def test(d,l,n):
    d[n] = n
    l.append(n)

if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict() 
        l = manager.list()
        p_list = []
        for i in range(10):
            p = Process(target=test, args=(d,l,i,))
            p.start()
            p_list.append(p)
            
        for p in p_list:  ##必需要有join
            p.join()

        print(d)
        print(l)

進程同步

from multiprocessing import Process, Lock
import time

def test(l,i):
    l.acquire()
    time.sleep(3)
    print("Hi, %s" %i)
    l.release()

if __name__ == "__main__":
    lock = Lock()
    for i in range(3):
        p = Process(target=test, args=(lock, i))
        p.start()

進程池

from multiprocessing import Process, Pool
import time, os

def test(i):
    time.sleep(1)
    print("From test %s" %os.getpid())
    return "Hello, %s" %i

def Fd(arg):
    print(arg)

if __name__ == "__main__":
    pool = Pool(2) ##實例化池
    for i in range(10):
        pool.apply(func=test, args=(i,)) ##同步接口
        pool.apply_async(func=test, args=(i,), callback = Fd) ##回調
    pool.close()
    pool.join()
    print("END......")

協程 (微線程)

yield

import time
import queue

def consumer(name):
    print("--->ready to eat baozi...")
    while True:
        new_baozi = yield  ##在此處卡住,知道另外線程發送send
        print("[%s] is eating baozi %s" % (name,new_baozi))

def producer():

    r = con.__next__()  ##運行consumer
    r = con2.__next__()
    n = 0
    while 1:
        time.sleep(1)
        print("producer is making baozi %s and %s" %(n,n+1) )
        con.send(n)  ##send,傳遞資源
        con2.send(n+1)
        n +=2


if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

Greenlet

from greenlet import greenlet

def quest_foo():
    print("你好!")
    answ_foo.switch()  ##switch()主管切換功能
    print("再見!")
    answ_foo.switch()

def answ_foo():
    print("你好!")
    quest_foo.switch()
    print("再見!")

if __name__ == "__main__":
    quest_foo = greenlet(quest_foo)
    answ_foo = greenlet(answ_foo)
    answ_foo.switch()
相關文章
相關標籤/搜索