併發編程(線程、進程、協程)

2、進程和線程

進程(process)

  假若有兩個程序A和B,程序A在執行到一半的過程當中,須要讀取大量的數據輸入(I/O操做),而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。 是否是在程序A讀取數據的過程當中,讓程序B去執行,當程序A讀取完數據以後,讓 程序B暫停,而後讓程序A繼續執行? 固然沒問題,但這裏有一個關鍵詞:切換。html

  既然是切換,那麼這就涉及到了狀態的保存,狀態的恢復,加上程序A與程序B所須要的系統資 源(內存,硬盤,鍵盤等等)是不同的。天然而然的就須要有一個東西去記錄程序A和程序B 分別須要什麼資源,怎樣去識別程序A和程序B等等,因此就有了一個叫進程的抽象 python

進程定義 進程就是一個程序在一個數據集上的一次動態執行過程。 數據庫

進程通常由 程序、數據集、進程控制塊 三部分組成。 api

  1. 程序用來描述進程要完成哪些功能以及如何完成;
  2. 數據集則是程序在執行過程當中所須要使用的資源;
  3. 進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系 統感知進程存在的惟一標誌。

線程(thread)

線程的出現是爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹同樣事的缺陷,使到進程內併發成爲可能。
 

  假設,一個文本程序,須要接受鍵盤輸入,將內容顯示在屏幕上,還須要保存信息到硬盤中。若只有一個進程,勢必形成同一時間只能幹同樣事的尷尬(當保存時,就不能經過鍵盤輸入內容)。
  如有多個進程,每一個進程負責一個任務,進程A負責接收鍵盤輸入的任務,進程B負責將內容顯示在屏幕上的任務,進程C負責保存內容到硬盤中的任務。這裏進程A,B,C間的協做涉及到了進程通訊問題,並且有共同都須要擁有的東西-------文本內容,不停的切換形成性能上的損失。
  如有一種機制,可使任務A,B,C共享資源,這樣上下文切換所須要保存和恢復的內容就少了,同時又能夠減小通訊所帶來的性能損耗,那就行了。是的,這種機制就是線程。緩存

  線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能。線程沒有本身的系統資源。

線程進程的關係區別

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
 
  1. 一個程序至少有一個進程,一個進程至少有一個線程.(進程能夠理解成線程的容器)
  2. 進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率。
  3. 線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
  4. 進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位.
  5. 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.
  6. 線程本身基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)可是它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源.
  7. 一個線程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行.

Python GIL(Global Interpreter Lock)

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)安全

上面的核心意思就是,不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行。數據結構

 

  首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。
  然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。多線程

這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf 

python的線程與threading模塊

1、線程的兩種調用方式

 threading 模塊創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。併發

直接調用:app

import threading
import time
 
def sayhi(num): #定義每一個線程要運行的函數
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,),name="name_t1") #生成一個線程實例
    t2 = threading.Thread(target=sayhi,args=(2,),name="name_t2") #生成另外一個線程實例
 
    t1.start() #啓動線程
    t2.start() #啓動另外一個線程
 
    print(t1.getName()) #獲取線程名  等同於 print(t1.name)
    print(t2.getName())
    print(threading.current_thread().name)


>>>
running on number:1
running on number:2
name_t1
name_t2
MainThread

 

  因爲任何進程默認就會啓動一個線程,咱們把該線程稱爲主線程,主線程又能夠啓動新的線程,Python的  threading 模塊有個  current_thread()  函數,它永遠返回當前線程的實例。主線程實例的名字叫  MainThread ,子線程的名字在建立時指定,咱們用LoopThread命名子線程。名字僅僅在打印時用來顯示,徹底沒有其餘意義,若是不起名字Python就自動給線程命名爲Thread-1Thread-2……

  args裏面是參數

繼承式調用:

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)  # 等同於 super().__init__()
        self.num = num

    def run(self):#定義每一個線程要運行的函數

        print("running on number:%s" %self.num)

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
    
    print("ending......")

 Join & Daemon

#_*_coding:utf-8_*_
__author__ = 'Alex Li'
 
import time
import threading
 
 
def run(n):
 
    print('[%s]------running----\n' % n)
    time.sleep(2)
    print('--done--')
 
def main():
    for i in range(5):
        t = threading.Thread(target=run,args=[i,])
        t.start()
        t.join(1)
        print('starting thread', t.getName())
 
 
m = threading.Thread(target=main,args=[])
# m.setDaemon(True) #將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務
m.start()
m.join(timeout=2)
print("---main thread done----")

>>>  
[0]------running----

starting thread Thread-2
[1]------running----

---main thread done----
--done--
starting thread Thread-3
[2]------running----

--done--
starting thread Thread-4
[3]------running----

--done--
starting thread Thread-5
[4]------running----

--done--
starting thread Thread-6
--done--

***Repl Closed***

# 加了守護程序以後
>>>
[0]------running----

starting thread Thread-2
[1]------running----

---main thread done----
--done--

***Repl Closed***

join():

  在子線程完成運行以前,這個子線程的父線程將一直被阻塞。

setDaemon(True):

  將線程聲明爲守護線程,必須在start() 方法調用以前設置, 若是不設置爲守護線程程序會被無限掛起。這個方法基本和join是相反的。

  當咱們 在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成

  想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是 只要主線程

  完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法啦。

其它方法

# run():  線程被cpu調度後自動執行線程對象的run方法
# start():啓動線程活動。
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。

threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

 三 同步鎖(Lock)

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?

import time
import threading

def addNum():
    global num #在每一個線程中都獲取這個全局變量
    #num-=1

    temp=num
    #print('--get num:',num )
    time.sleep(0.1)
    num =temp-1 #對此公共變量進行-1操做

num = 100  #設定一個共享變量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部線程執行完畢
    t.join()

print('final num:', num )

觀察:time.sleep(0.1)  /0.001/0.0000001 結果分別是多少?

  假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。

多個線程都在同時操做同一個共享資源,因此形成了資源破壞,怎麼辦呢?(join會形成串行,失去所線程的意義)

 

咱們能夠經過 同步鎖 來解決這種問題。
鎖的概念是這樣的,當一個線程lock.acquire()後,能夠切換到另外一個線程,可是當另外一個線程執行到lock.acquire()後,發現已經鎖住了,因此要等到
另外一個進程解鎖之後,才能進行加鎖。因此lock = threaing.Lock() 是個全局鎖。這樣lock能夠在多個線程中判斷
import time
import threading
 
def addNum():
    global num #在每一個線程中都獲取這個全局變量
    print('--get num:',num )
    time.sleep(1)
    lock.acquire() #修改數據前加鎖
    num  -=1 #對此公共變量進行-1操做
    lock.release() #修改後釋放
 
num = 100  #設定一個共享變量
thread_list = []
lock = threading.Lock() #生成全局鎖
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for t in thread_list: #等待全部線程執行完畢
    t.join()
 
print('final num:', num )
 

GIL VS Lock 

 Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體經過下圖來看一下。

 

  那你又問了, 既然用戶程序已經本身有鎖了,那爲何C python還須要GIL呢?加入GIL主要的緣由是爲了下降程序的開發的複雜度,好比如今的你寫python不須要關心內存回收的問題,由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這能夠說是Python早期版本的遺留問題。

4、線程死鎖和遞歸鎖(RLock)

在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都正在使用,全部這兩個線程在無外力做用下將一直等待下去。下面是一個死鎖的例子:
 
import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()#等待線程結束,後面再講。
 

解決辦法:使用遞歸鎖,將

lockA=threading.Lock()
lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()

  爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。

應用

import time

import threading

class Account:
    def __init__(self, _id, balance):
        self.id = _id
        self.balance = balance
        self.lock = threading.RLock()

    def withdraw(self, amount):

        with self.lock:
            self.balance -= amount

    def deposit(self, amount):
        with self.lock:
            self.balance += amount


    def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景

        with self.lock:
            interest=0.05
            count=amount+amount*interest

            self.withdraw(count)


def transfer(_from, to, amount):

    #鎖不能夠加在這裏 由於其餘的其它線程執行的其它方法在不加鎖的狀況下數據一樣是不安全的
     _from.withdraw(amount)

     to.deposit(amount)



alex = Account('alex',1000)
yuan = Account('yuan',1000)

t1=threading.Thread(target = transfer, args = (alex,yuan, 100))
t1.start()

t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
t2.start()

t1.join()
t2.join()

print('>>>',alex.balance)
print('>>>',yuan.balance)

同步條件(Event)

  python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法wait、clear、set.

  事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

  • clear:將「Flag」設置爲False

  • set:將「Flag」設置爲True

  • wait:若是「Flag」爲False,阻塞。True,不阻塞。

用 threading.Event 實現線程間通訊。

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚你們都要加班到22:00。")
        print(event.isSet())
        event.set()
        time.sleep(5)
        print("BOSS:<22:00>能夠下班了。")
        print(event.isSet())
        event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

Condition

  若是一個線程須要不停重複的使用event對象,最好使用condition對象實現一個週期定時器,每當定時器超時的時候,其餘線程均可以檢測到:

   當小夥伴a在往火鍋裏面添加魚丸,這個就是生產者行爲;另一個小夥伴b在吃掉魚丸就是消費者行爲。當火鍋裏面魚丸達到必定數量加滿後b才能吃,這就是一種條件判斷了。

Condition(條件變量)一般與一個鎖關聯。須要在多個Contidion中共享一個鎖時,能夠傳遞一個Lock/RLock實例給構造方法,不然它將本身生成一個RLock實例。

能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。

Condition():

  • acquire(): 線程鎖
  • release(): 釋放鎖
  • wait(timeout): 線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)纔會被喚醒繼續運行。wait()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。
  • notify(n=1): 通知其餘線程,那些掛起的線程接到這個通知以後會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已得到Lock前提下才能調用,不然會觸發RuntimeError。notify()不會主動釋放Lock。
  • notifyAll(): 若是wait狀態線程比較多,notifyAll的做用就是通知全部線程

實現場景:當a同窗王火鍋裏面添加魚丸加滿後(最多5個,加滿後通知b去吃掉),通知b同窗去吃掉魚丸(吃到0的時候通知a同窗繼續添加)

# coding=utf-8
import threading
import time

con = threading.Condition()

num = 0

# 生產者
class Producer(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        # 鎖定線程
        global num
        con.acquire()
        while True:
            print("開始添加!!!")
            num += 1
            print("火鍋裏面魚丸個數:%s" % str(num))
            time.sleep(1)
            if num >= 5:
                print("火鍋裏面裏面魚丸數量已經到達5個,沒法添加了!")
                # 喚醒等待的線程
                con.notify()  # 喚醒小夥伴開吃啦
                # 等待通知
                con.wait()

        #當while退出則開鎖。此程序中運行不到這句話
        con.release()

# 消費者
class Consumers(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        con.acquire()
        global num
        while True:
            print("開始吃啦!!!")
            num -= 1
            print("火鍋裏面剩餘魚丸數量:%s" %str(num))
            time.sleep(2)
            if num <= 0:
                print("鍋底沒貨了,趕忙加魚丸吧!")
                con.notify()  # 喚醒其它線程
                # 等待通知
                con.wait()

        # 當while退出則開鎖。此程序中運行不到這句話
        con.release()
p = Producer()
c = Consumers()
p.start()
c.start()

信號量(Semaphore)

      信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。

      計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)

      BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)      # getname() 線程的名稱
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()

Timer

定時器,指定n秒後執行某操做

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

多線程利器---隊列(queue)

列表是不安全的數據結構

import threading,time

li=[1,2,3,4,5]

def pri():
    while li:
        a=li[-1]
        print(a)
        time.sleep(1)
        try:
            li.remove(a)
        except Exception as e:
            print('----',a,e)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()

多線程進行操做時,li[-1]有可能都表明5,而不是遞減的。

思考:如何經過對列來完成上述功能?

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

queue列隊類的方法

建立一個「隊列」對象
import queue
q = queue.Queue(maxsize = 10)
queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,
get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

Python queue模塊有三種隊列及構造函數:
一、Python queue模塊的FIFO隊列先進先出。   class queue.Queue(maxsize)
二、LIFO相似於堆,即先進後出。               class queue.LifoQueue(maxsize)
三、還有一種是優先級隊列級別越低越先出來。        class queue.PriorityQueue(maxsize)

此包中的經常使用方法(q = queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 若是隊列爲空,返回True,反之False
q.full() 若是隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 至關q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 至關q.put(item, False)
q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做

other mode:

import queue

#先進後出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#優先級
# q=queue.PriorityQueue()
# q.put([5,100])
# q.put([7,200])
# q.put([3,"hello"])
# q.put([4,{"name":"alex"}])

while 1:

  data=q.get()
  print(data)

生產者消費者模型:

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
import threading,queue

class Mythread1(threading.Thread):

    def __init__(self):
        super().__init__()

    def run(self):
        n = 100
        while n>0:
            n -= 1
            print("put進n")
            q.put(n)

class Mythread2(threading.Thread):

    def __init__(self):
        super().__init__()

    def run(self):
        n = 100
        while n>0:
            n -= 1
            print("get出n")
            q.get(n)


if __name__ == '__main__':
    q = queue.Queue()
    t1 = Mythread1()
    t2 = Mythread2()
    t1.start()
    t2.start()

>>>代碼會出現以下結果
put進n
put進n
put進n
put進nget出n

get出n
get出nput進n
get出n
put進n
put進n
put進n
put進n
put進n
put進n

get出nput進n
get出n
get出n
get出n

put進nget出n
get出n
get出n

  暫時不知道爲何。(估計是print的時候自動換行出問題)可是有一個解決方法。加鎖。

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "Q1mi"
# Date: 2018/7/12

import threading, queue


class Mythread1(threading.Thread):

    def __init__(self):
        super().__init__()

    def run(self):
        n = 1000
        while n > 0:
            n -= 1
            lock.acquire()
            print("put進n", end="\n")
            lock.release()
            q.put(n)


class Mythread2(threading.Thread):

    def __init__(self):
        super().__init__()

    def run(self):
        n = 1000
        while n > 0:
            n -= 1
            lock.acquire()
            print("get出n", end="\n")
            lock.release()
            q.get(n)


if __name__ == '__main__':
    lock = threading.Lock()
    q = queue.Queue()
    t1 = Mythread1()
    t2 = Mythread2()
    t1.start()
    t2.start()

多進程模塊 multiprocessing

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

因爲GIL的存在,python中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。

multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

 

一 進程的調用

調用方式1

from multiprocessing import Process
import time
def f(name):
    time.sleep(1)
    print('hello', name,time.ctime())

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('alvin',))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')

調用方式2

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name

    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')

 

To show the individual process IDs involved, here is an expanded example:

from multiprocessing import Process
import os
import time
def info(title):
  
    print("title:",title)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main process line')
    time.sleep(1)
    print("------------------")
    p = Process(target=info, args=('yuan',))
    p.start()
    p.join()

二 Process類 

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前尚未實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

 

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():無論任務是否完成,當即中止工做進程

 

屬性

  daemon:和線程的setDeamon功能同樣

  name:進程名字。

  pid:進程號。

import time
from  multiprocessing import Process

def foo(i):
    time.sleep(1)
    print (p.is_alive(),i,p.pid)
    time.sleep(1)

if __name__ == '__main__':
    p_list=[]
    for i in range(10):
        p = Process(target=foo, args=(i,))
        #p.daemon=True
        p_list.append(p)

    for p in p_list:
        p.start()
    # for p in p_list:
    #     p.join()

    print('main process end')

三 進程間通信 

3.1 進程對列Queue

from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

3.2 管道

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

 

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"yuan"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    print("q_ID2:",id(child_conn))

if __name__ == '__main__':

    parent_conn, child_conn = Pipe()
    print("q_ID1:",id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("兒子你好!")
    p.join()

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

多管道實例

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing

# 管道消費者.
def consumer(lock,pipe):
    output_p, input_p = pipe
    input_p.close() # 關閉管道輸入口
    while True:
    lock.acquire()            
    item = output_p.recv()
    lock.release()
    if item == None:    
            break
        
        # 處理部分
        lock.acquire()
        print(item)
        lock.release()

# 管道生產者
def producer(sequence, input_p):
    for item in sequence:
        # Put the item on the queue
        input_p.send(item)
        
if __name__ == '__main__':
    
    # 進程數、建立管道,鎖等
    p_num = 2
    process = []    
    (output_p, input_p) = multiprocessing.Pipe()
    lock = multiprocessing.Lock()
    
    # 定義消費進程
    for i in range(p_num):
        t =multiprocessing.Process(target=consumer,args=(lock,(output_p, input_p),))      
        t.daemon=True
        process.append(t)    

    # 啓動消費進程
    for i in range(p_num):
        process[i].start()
        
    # 關閉輸出管道,以往管道填充數據
    output_p.close()
    sequence = range(100) + [None]*p_num   
    producer(sequence, input_p)    
    # 數據填充完畢,打開輸入管道
    input_p.close()
    
    # 等待結束
    for i in range(p_num):
        process[i].join()

3.3 Managers

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example:

from multiprocessing import Process, Manager
 
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(1)
    print(l)
 
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
 
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)

四 進程同步

Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
 
if __name__ == '__main__':
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

 

五 進程池

1.進程池的概念

python中,進程池內部會維護一個進程序列。當須要時,程序會去進程池中獲取一個進程。若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。

2.進程池的內置方法

  • apply 從進程池裏取一個進程並同步執行
  • apply_async 從進程池裏取出一個進程並異步執行
  • terminate 馬上關閉進程池
  • join 主進程等待全部子進程執行完畢,必須在close或terminete以後
  • close 等待全部進程結束才關閉線程池

 

 

同步是指一個進程在執行某個請求的時候,必需要到收到對方返回的信息才繼續執行下去。

異步是指進程在執行某個請求時,無論其餘的進程的狀態,這個進程就執行後續操做;
當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。

例如:打電話就是同步通訊,發信息就是異步通訊。

 

3.進程池的使用

代碼以下:

from multiprocessing import Pool
import time

def func(args):
    time.sleep(1)   #程序休眠1s
    print("%s------>%s"%(args,time.ctime()))    #打印參數及時間

if __name__=="__main__":
    p1=Pool(2)  #設定開啓2個進程池
    for i in range(10):
        p1.apply_async(func=func,args=(i,)) #設定異步執行任務

    p1.close()  #關閉進程池
    time.sleep(2)   #程序休眠2s
    p1.terminate()  #關閉進程池
    p1.join()   #阻塞進程池
    print("ending")     #打印結束語句

>>>
0------>Thu Jul 20 20:18:43 2017
1------>Thu Jul 20 20:18:43 2017
ending

能夠看到,在程序執行過程當中,關閉進程池,則程序會當即中止,不會再繼續執行後續語句。

4.修改程序,使程序可以執行所有的任務

代碼以下:

from multiprocessing import Pool
import time

def func(args):
    time.sleep(1)   #休眠1s
    print("%s------>%s"%(args,time.ctime()))    #打印傳遞的參數及時間 

if __name__=="__main__":
    p1=Pool(2)  #定義2個進程池
    for i in range(10): #定義循環10次
        p1.apply_async(func=func,args=(i,)) #異步執行任務

    p1.close()      #等待全部的任務都完成才關閉進程池
    p1.join()
    print("ending")

執行結果以下:

0------>Thu Jul 20 20:19:12 2017
1------>Thu Jul 20 20:19:12 2017
2------>Thu Jul 20 20:19:13 2017
3------>Thu Jul 20 20:19:13 2017
4------>Thu Jul 20 20:19:14 2017
5------>Thu Jul 20 20:19:14 2017
6------>Thu Jul 20 20:19:15 2017
7------>Thu Jul 20 20:19:15 2017
8------>Thu Jul 20 20:19:16 2017
9------>Thu Jul 20 20:19:16 2017
ending

參考文檔:

1. https://www.cnblogs.com/huanxiyun/articles/5826902.html

2. http://www.javashuo.com/article/p-xkqrdjgb-c.html

Python中的上下文管理器(contextlib模塊)

 

上下文管理器的任務是:代碼塊執行前準備,代碼塊執行後收拾

 

1 如何使用上下文管理器:

如何打開一個文件,並寫入"hello world"

filename="my.txt"
mode="w"
f=open(filename,mode)
f.write("hello world")
f.close()

當發生異常時(如磁盤寫滿),就沒有機會執行第5行。固然,咱們能夠採用try-finally語句塊進行包裝:

writer=open(filename,mode)
try:
    writer.write("hello world")
finally:
    writer.close()

當咱們進行復雜的操做時,try-finally語句就會變得醜陋,採用with語句重寫:

with open(filename,mode) as writer:
    writer.write("hello world")

as指代了從open()函數返回的內容,並把它賦給了新值。with完成了try-finally的任務。

2 自定義上下文管理器 

with語句的做用相似於try-finally,提供一種上下文機制。要應用with語句的類,其內部必須提供兩個內置函數__enter__和__exit__。前者在主體代碼執行前執行,後者在主體代碼執行後執行。as後面的變量,是在__enter__函數中返回的。

class echo():
    def output(self):
        print "hello world"
    def __enter__(self):
        print "enter"
        return self  #能夠返回任何但願返回的東西
    def __exit__(self,exception_type,value,trackback):
        print "exit"
        if exception_type==ValueError:
            return True
        else:
            return Flase
  
>>>with echo as e:
    e.output()
     
輸出:
enter
hello world
exit

完備的__exit__函數以下:

def __exit__(self,exc_type,exc_value,exc_tb)

其中,exc_type:異常類型;exc_value:異常值;exc_tb:異常追蹤信息

當__exit__返回True時,異常不傳播

3 contextlib模塊

contextlib模塊的做用是提供更易用的上下文管理器,它是經過Generator實現的。contextlib中的contextmanager做爲裝飾器來提供一種針對函數級別的上下文管理機制,經常使用框架以下:

from contextlib import contextmanager
@contextmanager
def make_context():
    print 'enter'
    try:
        yield "ok"
    except RuntimeError,err:
        print 'error',err
    finally:
        print 'exit'
         
>>>with make_context() as value:
    print value
     
輸出爲:
    enter
    ok
    exit

其中,yield寫入try-finally中是爲了保證異常安全(能處理異常)as後的變量的值是由yield返回。yield前面的語句可看做代碼塊執行前操做,yield以後的操做能夠看做在__exit__函數中的操做。

以線程鎖爲例:

@contextlib.contextmanager
def loudLock():
    print 'Locking'
    lock.acquire()
    yield
    print 'Releasing'
    lock.release()
 
with loudLock():
    print 'Lock is locked: %s' % lock.locked()
    print 'Doing something that needs locking'
 
#Output:
#Locking
#Lock is locked: True
#Doing something that needs locking
#Releasing

4 contextlib.nested:減小嵌套

對於:

with open(filename,mode) as reader:
    with open(filename1,mode1) as writer:
        writer.write(reader.read())

能夠經過contextlib.nested進行簡化:

with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer):
    writer.write(reader.read())

在python 2.7及之後,被一種新的語法取代:

with open(filename,mode) as reader,open(filename1,mode1) as writer:
    writer.write(reader.read())

5 contextlib.closing() 

file類直接支持上下文管理器API,但有些表示打開句柄的對象並不支持,如urllib.urlopen()返回的對象。還有些遺留類,使用close()方法而不支持上下文管理器API。爲了確保關閉句柄,須要使用closing()爲它建立一個上下文管理器(調用類的close方法)。

import contextlib
class myclass():
  def __init__(self):
    print '__init__'
  def close(self):
    print 'close()'
    
with contextlib.closing(myclass()):
  print 'ok'

>>>
__init__
ok
close()

協程

協程,又稱微線程,纖程。英文名Coroutine。

優勢1: 協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。

優勢2: 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。

由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。

 

 

yield的簡單實現

import time
import queue

def consumer(name):
    print("--->ready to eat baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        #time.sleep(1)

def producer():

    r = con.__next__()
    r = con2.__next__()
    n = 0
    while 1:
        time.sleep(1)
        print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) )
        con.send(n)
        con2.send(n+1)

        n +=2


if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

from greenlet import greenlet
 
 
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
 
 
def test2():
    print(56)
    gr1.switch()
    print(78)
 
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

Gevent

import gevent

import requests,time


start=time.time()

def f(url):
    print('GET: %s' % url)
    resp =requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([

        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://www.baidu.com/'),
        gevent.spawn(f, 'https://www.sina.com.cn/'),

])

# f('https://www.python.org/')
#
# f('https://www.yahoo.com/')
#
# f('https://baidu.com/')
#
# f('https://www.sina.com.cn/')

print("cost time:",time.time()-start)

 

更多內容參考:

異步IO\數據庫\隊列\緩存

http://www.cnblogs.com/alex3714/articles/5248247.html

相關文章
相關標籤/搜索