線程、進程、協程

並行:並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )

併發:併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。

同步:所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。

異步:所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。

阻塞非阻塞:阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的。

python的GIL:不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行。這兩個

Python根據處理事情的類型,是IO密集型仍是計算密集型,選擇不一樣的方式,進程,線程、協程,相互配合來用。可是對於計算密集型,。。。

 

一.   線程:

    線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。python

    線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發 性能。線程沒有本身的系統資源。安全

              <python的線程與threading模塊>                

直接調用:服務器

import threading import time def sayhi(num): #定義每一個線程要運行的函數
 
    print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例
 t1.start() #啓動線程
    t2.start() #啓動另外一個線程
 
    print(t1.getName()) #獲取線程名
    print(t2.getName())

繼承式調用:網絡

import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定義每一個線程要運行的函數

        print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() print("ending......")

join():在子線程完成運行以前,這個子線程的父線程將一直被阻塞。多線程

setDaemon(True):併發

         將線程聲明爲守護線程,必須在start() 方法調用以前設置, 若是不設置爲守護線程程序會被無限掛起。這個方法基本和join是相反的。當咱們 在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成 想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是 只要主線程完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法啦app

# run(): 線程被cpu調度後自動執行線程對象的run方法 # start():啓動線程活動。 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。
 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

關於鎖:dom

  由於線程能直接操做到進程裏的全部變量,若是存在多個線程同時操做同一個變量,存在競爭,最後變量是什麼樣的都不知道,因此在線程須要操做變量前,進行取鎖競爭,異步

  拿到鎖的線程才能對變量操做,操做完後釋放鎖。意味着同時時刻只有一個線程運行那部分代碼。ide

1. threading.Lock()  同步鎖

import time import threading R=threading.Lock() def addNum(): global num #在每一個線程中都獲取這個全局變量
    #num-=1
 R.acquire() temp=num #print('--get num:',num )
    time.sleep(0.01) num =temp-1 #對此公共變量進行-1操做
 R.release() num = 100  #設定一個共享變量
thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部線程執行完畢
 t.join() print('final num:', num )

 

  爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。

2. threading.RLock()  遞歸鎖

import threading import time class MyThread(threading.Thread): def actionA(self): r_lcok.acquire() #count=1
        print(self.name,"gotA",time.ctime()) time.sleep(2) r_lcok.acquire() #count=2

        print(self.name, "gotB", time.ctime()) time.sleep(1) r_lcok.release() #count=1
        r_lcok.release() #count=0


    def actionB(self): r_lcok.acquire() print(self.name, "gotB", time.ctime()) time.sleep(2) r_lcok.acquire() print(self.name, "gotA", time.ctime()) time.sleep(1) r_lcok.release() r_lcok.release() def run(self): self.actionA() self.actionB() if __name__ == '__main__': # A=threading.Lock()
    # B=threading.Lock()
 r_lcok=threading.RLock() L=[] for i in range(5): t=MyThread() t.start() L.append(t) for i in L: i.join() print("ending....")

3. 關於 event事件

Python線程的 event 事件 能夠看作是 事件驅動模型。當一個線程經過事件驅動模型發送了一個信號,另外一個線程經過事件驅動模型獲取了該信號,從而作出反應。

只是這個模型比較簡單,發送的信號默認是 false ,要麼是 true。

 

event = threading.Event() event.is_set() event.isSet() # 獲取標誌位 是 true 仍是 false
event.set()   # 設置標誌位 爲 true
event.clear()   # 初始化標誌位 爲 false
event.wait()    # 阻塞等待標誌位 爲 true 纔會繼續運行

栗子:

from threading import Thread,Event import time event=Event() def light(): print('紅燈正亮着') time.sleep(3) event.set() #綠燈亮

def car(name): print('車%s正在等綠燈' %name) event.wait() #等燈綠 此時event爲False,直到event.set()將其值設置爲True,纔會繼續運行.
    print('車%s通行' %name) if __name__ == '__main__': # 紅綠燈
    t1=Thread(target=light) t1.start() #
    for i in range(10): t=Thread(target=car,args=(i,)) t.start()

 

4. 信號量 —— multiprocess.Semaphore

  信號量是比同步鎖還能多設置的方式。同步鎖只有一把,而信號量能夠設置多把鎖。信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。

  信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

  進程信號量

from multiprocessing import Process,Semaphore import time,random def go_ktv(sem,user): sem.acquire() print('%s 佔到一間ktv小屋' %user) time.sleep(random.randint(0,3)) #模擬每一個人在ktv中待的時間不一樣
 sem.release() if __name__ == '__main__': sem=Semaphore(4) p_l=[] for i in range(13): p=Process(target=go_ktv,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() print('============》')

 

# coding: utf-8
import threading
import time
import random

semaphore = threading.Semaphore(0)

def consumer():
    print("Consumer is waiting.")
    semaphore.acquire()
    print("Consumer notify: Consumed item number %s" %item)

def producer():
    global item
    time.sleep(10)
    item = random.randint(0, 100)
    print("Producer notify: Produced item number %s" %item)
    semaphore.release()

if __name__ == "__main__":
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    print("Program terminated")

 

5. 隊列  queue-----線程利器, 利用隊列,組合 生產者消費模型

建立一個「隊列」對象 import queue # python 3 是 queue Python2 是 Queue q = queue.Queue(maxsize = 10) Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 將一個值放入隊列中 q.put(10) 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 將一個值從隊列中取出 q.get() 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True, get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 Python Queue模塊有三種隊列及構造函數: 一、Python Queue模塊的FIFO隊列先進先出。   class queue.Queue(maxsize) 二、LIFO相似於堆,即先進後出。               class queue.LifoQueue(maxsize) 三、還有一種是優先級隊列級別越低越先出來。        class queue.PriorityQueue(maxsize) 此包中的經常使用方法(q = Queue.Queue()): q.qsize() 返回隊列的大小 q.empty() 若是隊列爲空,返回True,反之False q.full() 若是隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 至關q.get(False) 非阻塞 q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 至關q.put(item, False) q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列爲空,再執行別的操做
import threading import time import queue class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self):  # 定義每一個線程要運行的函數
 num = qq.get() # 隊列裏沒有就會阻塞等待
        numb = num -1 time.sleep(0.1) num = numb print("running on number:%s" % self.name,num) qq.put(num) if __name__ == '__main__': qq= queue.Queue() qq.put(100) tl = [] for i in range(100): th = MyThread(i) tl.append(th) for th in tl: th.start() for th in tl: th.join() print("ending......", qq.get())
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(5) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1
    #q.task_done()
 q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) # if not q.empty():
    # print("waiting.....")
        #q.join()
        data = q.get() print("eating....") time.sleep(4) q.task_done() #print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) # else:
    # print("-----no baozi anymore----")
        count +=1 p1 = threading.Thread(target=Producer, args=('A君',)) c1 = threading.Thread(target=Consumer, args=('B君',)) c2 = threading.Thread(target=Consumer, args=('C君',)) c3 = threading.Thread(target=Consumer, args=('D君',)) p1.start() c1.start() c2.start() c3.start()

 

 

二.   進程

Python的多進程,multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

多進程、多線程裏的 run 方法  和  start 方法不同的。

1.  調用方式

from multiprocessing import Process import time def f(name): time.sleep(1) print('hello', name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin',)) p_list.append(p) p.start() for i in p_list: p.join() print('end')

類的方式

from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name

    def run(self): time.sleep(1) print ('hello', self.name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')

 Process 模塊介紹

Process([group, [target, [name, [args, [kwargs ]]]]) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹: group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name爲子進程的名稱

方法介紹
p.start():啓動進程,並調用該子進程中的p.run() 。
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。
若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖。 p.is_alive():若是p仍然運行,返回True。 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。
            timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
屬性介紹
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,
     而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置。 p.name:進程的名稱。 p.pid:進程的pid 。 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) 。 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。
這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)。

2. 進程間數據傳輸

數據傳輸而不是共享,兩種方式,隊列和管道。

隊列   from multiprocessing import Queue   

from multiprocessing import Queue   

q=Queue([maxsize]) 

建立共享的進程隊列。 參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現. Queue的實例q具備如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。 q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。 q.empty() 若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。 q.full() 若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。 q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。 q.cancel_join_thread() 不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。 q.join_thread() 鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。

 

 
 

from multiprocessing import Queue

''' multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,可是隊列接口 '''

from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty
q.put(3) q.put(3) q.put(3) # q.put(3) # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
try: q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
    print('隊列已經滿了') # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try: q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了') print(q.empty()) #空了
 單看隊列用法 import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello'])  #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。

if __name__ == '__main__': q = Queue() #建立一個Queue對象
    p = Process(target=f, args=(q,)) #建立一個進程
 p.start() print(q.get()) p.join() 子進程發送數據給父進程

 

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"yuan"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    print("q_ID2:",id(child_conn))

if __name__ == '__main__':

    parent_conn, child_conn = Pipe()
    print("q_ID1:",id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("兒子你好!")
    p.join()
管道
相關文章
相關標籤/搜索