Python進程、線程、協程詳解

引子javascript

  進程html

  線程(優先閱讀)java

  協程python

進程

概念:就是一個程序在一個數據集上的一次動態執行過程(本質上來說,就是運行中的程序(代指運行過程),程序不運行就不是進程)    抽象概念windows

組成:安全

   一、程序:咱們編寫的程序用來描述進程要完成哪些功能以及如何完成網絡

   二、數據集:數據集則是程序在執行過程當中所須要使用的資源數據結構

   三、進程控制塊:進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。多線程

闡釋:進程與進程之間都佔用的是獨立的內存塊,它們彼此之間的數據也是獨立的併發

優勢:同時利用多個CPU,可以同時進行多個操做

缺點:耗費資源(須要從新開闢內存空間)

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前尚未實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():無論任務是否完成,當即中止工做進程

屬性:

  daemon:和線程的setDeamon功能同樣

  name:進程名字。

  pid:進程號。

建立進程的方式有倆種

一,經過調用模塊的方式來建立線程

複製代碼
# 進程模塊
import multiprocessing
import time

def f1():
    start = time.time()
    sum = 0
    for n in range(100000000):
        sum += n
    print(sum)
    print("data:{}".format(time.time() - start))
if __name__ == '__main__':   # windows在調用進程的時候,必須加這句話,不然會報錯
    li = []
    p1 = multiprocessing.Process(target=f1)
    li.append(p1)
    p2 = multiprocessing.Process(target=f1)
    li.append(p2)
    for p in li:
        p.start()
    for i in li:
        i.join()

    print("ending...")
複製代碼

二,經過繼承類的方式(推薦)

複製代碼
import multiprocessing


class Process(multiprocessing.Process):
    def run(self):
        sum = 0
        for n in range(100000000):
            sum += n
        print(sum)

li = []
for i in range(2):
    p = Process()
    li.append(p)

if __name__ == '__main__':
    for p in li:
        p.start()

    for i in li:
        i.join()

    print("ending")
複製代碼

進程之間的通訊

建立進程模塊的下隊列(Queue)

# 進程之間的通訊   Queue
from multiprocessing import Queue, Process, Pipe
import os,time,random


def write(q):
    print("process to write{}".format(os.getpid()))
    for value in ["A","B","C"]:
        print("Put {} to queue...".format(value))
        q.put(value)
        time.sleep(random.random())


def read(q):
    print("process to read{}".format(os.getpid()))
    while True:
        value = q.get(True)
        print("Get {} from queue".format(value))

if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write,args=(q,))  # 這裏傳輸的q是copy的
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()

    pw.join()
    pr.terminate()  # 強行終止進程(由於這個子進程定義了一個死循環)
進程隊列(Queue)

管道(Pipe)

# 進程之間的通訊   Pipe(相似於socket)
from multiprocessing import Queue, Process, Pipe
import os,time,random

# 說明Pipe的send是沒有返回值的
pipe = Pipe()
# print(pipe)

def worker(pipe):
    time.sleep(random.random())
    for i in range(10):
        print("worker send {}".format(pipe.send(i)))


def Boss(pipe):
    while True:
        print("Boss recv {}".format(pipe.recv()))

p1 = Process(target=worker,args=(pipe[0],))
p2 = Process(target=Boss,args=(pipe[1],))
if __name__ == '__main__':

    p1.start()
    p2.start()
管道(Pipe)

上述實現了進程間的數據通訊,那麼進程能夠達到數據共享麼?Sure。

前一節中, Pipe、Queue 都有必定數據共享的功能,可是他們會堵塞進程, 這裏介紹的兩種數據共享方式都不會堵塞進程, 並且都是多進程安全的。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array.

由上述英文咱們瞭解到,經過Manager()能夠實現進程上的數據共享,而且支持的類型也由不少,接下來看代碼

from multiprocessing import Process, Manager


def f(d,l,n):
    d["name"] = "alex"
    d[n] = "1"
    l.append(n)

if __name__ == '__main__':
    with Manager() as manager:  # 相似於文件操做的with open(...)
        d = manager.dict()
        l = manager.list(range(5))
        print(d,l)

        p_list = []
        for n in range(10):
            p = Process(target=f,args=(d, l, n))
            p.start()
            p_list.append(p)

        for p in p_list:   
            p.join()           # 這兒的join必須加

        print(d)
        print(l)

# 關於數據共享的進程等待的問題,鄙人做出一些本身的理解
# 多核CPU的狀況下,進程間是能夠實現並行的,固然每一個核處理的速度又有極其細微的差別性,速度處理稍慢些的進程在還在對數據進行處理的候,同時又想要獲得數據了,天然會出現錯誤,因此要等待進程處理完這份數據的時候再進行操做
進程數據共享(Manager)
from multiprocessing import Process, Manager

def func(n,a):
    n.value = 50
    for i in range(len(a)):
        a[i] += 10


if __name__ == '__main__':
    with Manager() as manager:
        num = manager.Value("d", 0.0)
        ints = manager.Array("i", range(10))
        p = Process(target=func,args=(num,ints))
        p.start()
        p.join()

        print(num)
        print(ints)

輸出
Value('d', 50)
array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])

# 共享內存有兩個結構,一個是 Value, 一個是 Array,這兩個結構內部都實現了鎖機制,所以是多進程安全的。
# Value 和 Array 都須要設置其中存放值的類型,d 是 double 類型,i 是 int 類型,具體的對應關係在Python 標準庫的 sharedctypes 模塊中查看。
# 上面的共享內存支持兩種結構 Value 和 Array, 這些值在主進程中管理,很分散。 Python 中還有一統天下,無所不能的Manager,專門用來作數據共享。 其支持的類型很是多。
View Code

進程同步

Lock

鎖是爲了確保數據一致性,好比讀寫鎖,每一個進程給一個變量增長 1 ,可是若是在一個進程讀取但尚未寫入的時候,另外的進程也同時讀取了,並寫入該值,則最後寫入的值是錯誤的,這時候就須要鎖。

# 爲何引伸進程同步
# 數據的一致性
import time
from multiprocessing import Lock, Process


def run(i, lock):
    with lock:  # 自動得到鎖和釋放鎖
        time.sleep(1)
        print(i)


if __name__ == '__main__':

    lock = Lock()

    for i in range(10):
        p = Process(target=run,args=(i,lock,))
        p.start()
進程同步

Lock 同時也實現了 ContextManager API, 能夠結合 with 語句使用, 關於 ContextManager, 請移步 Python 學習實踐筆記 裝飾器 與 context 查看。

Semaphore

Semaphore 和 Lock 稍有不一樣,Semaphore 至關於 N 把鎖,獲取其中一把就能夠執行了。 信號量的總數 N 在構造時傳入,s = Semaphore(N)。 和 Lock 同樣,若是信號量爲0,則進程堵塞,直到信號大於0。

進程池

若是有50個任務要去執行,CPU只有4核,那建立50個進程完成,其實大可沒必要,徒增管理開銷。若是隻想建立4個進程,讓它們輪流替完成任務,不用本身去管理具體的進程的建立銷燬,那 Pool 是很是有用的。

Pool 是進程池,進程池可以管理必定的進程,當有空閒進程時,則利用空閒進程完成任務,直到全部任務完成爲止

?
1
2
3
4
5
6
7
8
def func(x):
     return x * x
 
if __name__ = = '__main__' :
     p_pool = pool.Pool( 4 )
     result = p_pool. map (func, range ( 8 ))
     print (result)
# Pool 進程池建立4個進程,無論有沒有任務,都一直在進程池中等候,等到有數據的時候就開始執行。

從上面的例子來看貌似也看不出什麼效果,那麼接下來自定義一個進程池

關於進程池的API用法(並非只有倆個哦)

apply  (每一個任務是排隊進行,相似於串行失去意義)

apply_async  (任務都是併發進行,而且能夠設置回調函數) 進程的併發其實能夠稱之爲並行了,能夠利用到多核CPU

import os,time
from multiprocessing import pool,Process


def run(n):
    # print(os.getpid())
    time.sleep(1)
    print(n)
    return n    # 該函數的返回值,是回調函數的所要傳入的值


def bar(args):
    pass
    # print("bar {}".format(args))
    # print(os.getpid())

if __name__ == '__main__':
    p_pool = pool.Pool(5)   # 設置進程池中的最大放置
    for n in range(100):
        # 回調函數,就是某個函數執行成功或結束執行的函數
        p_pool.apply_async(func=run,args=(n,),callback=bar)

    p_pool.close()  # 進程的關閉和等待是有順序的
    p_pool.join()

    print("ending")

# 看看 Pool 的執行流程,有三個階段。第1、一個進程池接收不少任務,而後分開執行任務;第2、再也不接收任務了;第3、等全部任務完成了,回家,不幹了。
# 這就是上面的方法,close 中止接收新的任務,若是還有任務來,就會拋出異常。 join 是等待全部任務完成。 join 必需要在 close 以後調用,不然會拋出異常。terminate 非正常終止,內存不夠用時,垃圾回收器調用的就是這個方法。
low版進程池

 

線程

概念:線程是應用程序中工做的最小單元,或者又稱之爲微進程。

組成:它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。

闡釋:線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。線程能夠共享(調用)進程的數據資源

優勢:共享內存,IO操做時候,創造併發操做

缺點:"......"(中國文化的博大精深的帶引號)

 

關於多線程

多線程相似於同時執行多個不一樣程序,多線程運行有以下優勢:

  • 使用線程能夠把佔據長時間的程序中的任務放到後臺去處理。
  • 用戶界面能夠更加吸引人,這樣好比用戶點擊了一個按鈕去觸發某些事件的處理,能夠彈出一個進度條來顯示處理的進度
  • 程序的運行速度可能加快
  • 在一些等待的任務實現上如用戶輸入、文件讀寫和網絡收發數據等,線程就比較有用了。在這種狀況下咱們能夠釋放一些珍貴的資源如內存佔用等等。

線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。

每一個線程都有他本身的一組CPU寄存器,稱爲線程的上下文,該上下文反映了線程上次運行該線程的CPU寄存器的狀態。

指令指針和堆棧指針寄存器是線程上下文中兩個最重要的寄存器,線程老是在進程獲得上下文中運行的,這些地址都用於標誌擁有線程的進程地址空間中的內存。

  • 線程能夠被搶佔(中斷)。
  • 在其餘線程正在運行時,線程能夠暫時擱置(也稱爲睡眠) -- 這就是線程的退讓。

線程能夠分爲:

  • 內核線程:由操做系統內核建立和撤銷。
  • 用戶線程:不須要內核支持而在用戶程序中實現的線程。

Python3 線程中經常使用的兩個模塊爲:

  • _thread
  • threading(推薦使用)

thread 模塊已被廢棄。用戶可使用 threading 模塊代替。因此,在 Python3 中不能再使用"thread" 模塊。爲了兼容性,Python3 將 thread 重命名爲 "_thread"。

Python中使用線程有兩種方式:函數或者用類來包裝線程對象。

Python3 經過兩個標準庫 _thread 和 threading 提供對線程的支持。

_thread 提供了低級別的、原始的線程以及一個簡單的鎖,它相比於 threading 模塊的功能仍是比較有限的。

threading 模塊除了包含 _thread 模塊中的全部方法外,還提供的其餘方法:

  • threading.currentThread(): 返回當前的線程變量。
  • threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  • threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法:

  • run(): 用以表示線程活動的方法。
  • start():啓動線程活動。 
  • join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
  • setDaemon(True):守護主線程,跟隨主線程退(必需要放在start()上方)
  • isAlive(): 返回線程是否活動的。
  • getName(): 返回線程名。
  • setName(): 設置線程名。

看了那麼多廢話,那麼建立線程的方式有倆種,接下來看代碼

一,經過調用模塊的方式來建立線程(推薦使用)

複製代碼
import threading # 線程模塊
import time
# 建立線程
def onepiece1(n):
    print("路飛正在使用橡膠火箭炮%s,攻擊力%s" %(time.ctime(),n))
    time.sleep(3)
    print("路飛結束該技能%s" %time.ctime())

def onepiece2(n):
    print("艾尼路正在出雷神萬擊%s你,攻擊力%s" %(time.ctime(),n))
    time.sleep(5)
    print("艾尼路結束該技能%s" %time.ctime())

if __name__ == '__main__':

    thread_1 = threading.Thread(target=onepiece1,args=(10,)) # 建立子線程
    thread_2 = threading.Thread(target=onepiece2,args=(9,))

    thread_1.start()
    # pyhton1.join()
    thread_2.start()
    thread_2.join() # 等待線程終止

    print("ending Fighting")
複製代碼

二,建立類經過繼承的方式來建立線程

使用Threading模塊建立線程,直接從threading.Thread繼承,而後重寫__init__方法和run方法:

複製代碼
import threading
import time

class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):  # 定義每一個線程要運行的函數
        print("running on number:%s" %self.num)
        time.sleep(3)
print("ending......")

if __name__ == '__main__':
    t1 = MyThread(1) # 繼承這個類,把1這個參數,傳給num ,t1就是個線程對象
    t2 = MyThread(2)
    t1.start()
    t2.start()
複製代碼

GIL

在知道線程的建立方式以及一些方法的使用後,引伸一個cpython解釋器的一個歷史遺留問題,全局GIL鎖

由於Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先得到GIL鎖,而後,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把全部線程的執行代碼都給上了鎖,因此,多線程在Python中只能交替執行,即便100個線程跑在100核CPU上,也只能用到1個核。

固然了,也有經過別的途徑提升執行效率,技術的道路上終無止境。

同步鎖

多個線程共同對某個數據修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要對多個線程進行同步。

使用 Thread 對象的 Lock 和 Rlock 能夠實現簡單的線程同步。

這兩個對象都有 acquire 方法和 release 方法。

對於那些須要每次只容許一個線程操做的數據,能夠將其操做放到 acquire 和 release 方法之間。

def sub():
    global num
    thread_lock_A.acquire()  # 得到鎖,用於線程同步
    tmep = num
    time.sleep(0.001)
    num = tmep - 1
    thread_lock_A.release()  # 釋放鎖,開啓下一個線程
                             # 問題,加鎖以後100個線程就變爲了串行執行,鎖內的代碼
li = []
for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    li.append(t)

for t in li:
    t.join()
print("ending")
print(num)
同步鎖  

線程的死鎖和遞歸鎖

在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都

正在使用,全部這兩個線程在無外力做用下將一直等待下去。

解決死鎖就能夠用遞歸鎖

import threading,time

# lock_A = threading.Lock()
# lock_B = threading.Lock()
r_lock = threading.RLock()


class Mythread(threading.Thread):

    def actionA(self):
        r_lock.acquire()
        print(self.name,time.ctime())
        time.sleep(2)
        r_lock.acquire()
        print(self.name,time.ctime())
        time.sleep(1)
        r_lock.release()
        r_lock.release()

    def actionB(self):
        r_lock.acquire()
        print(self.name,time.ctime())
        time.sleep(2)
        r_lock.acquire()
        print(self.name,time.ctime())
        time.sleep(1)
        r_lock.release()
        r_lock.release()

    def run(self):

        self.actionA()
        self.actionB()
li = []
for i in range(5):
    t = Mythread()
    t.start()
    li.append(t)

for t in li:
    t.join()

print("ending")
遞歸鎖

爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。

信號量(Semaphore):從意義上來說,也能夠稱之爲一種鎖

信號量:指同時開幾個線程併發

    信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。

計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)

    BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。

import threading,time

class myThread(threading.Thread):
    def run(self):           #啓動後,執行run方法
        if semaphore.acquire():  #加把鎖,能夠放進去多個(至關於5把鎖,5個鑰匙,同時有5個線程)
            print(self.name)
            time.sleep(5)
            semaphore.release()

if __name__=="__main__":
    semaphore=threading.Semaphore(5)  #同時能有幾個線程進去(設置爲5就是一次5個線程進去),相似於停車廠一次能停幾輛車
    
    thrs=[] #空列表
    for i in range(100): #100個線程
        thrs.append(myThread()) #加線程對象

    for t in thrs:
        t.start()  #分別啓動
信號量例子

同步條件(Event)

簡單瞭解

Event對象實現了簡單的線程通訊機制,它提供了設置信號,清楚信號,等待等用於實現線程間的通訊。

1 設置信號

使用Event的set()方法能夠設置Event對象內部的信號標誌爲真。Event對象提供了isSet()方法來判斷其內部信號標誌的狀態。當使用event對象的set()方法後,isSet()方法返回真

2 清除信號

使用Event對象的clear()方法能夠清除Event對象內部的信號標誌,即將其設爲假,當使用Event的clear方法後,isSet()方法返回假

3 等待

Event對象wait的方法只有在內部信號爲真的時候纔會很快的執行並完成返回。當Event對象的內部信號標誌位假時,則wait方法一直等待到其爲真時才返回。

import threading, time


class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚你們都要加班到22:00。")
        print(event.isSet())
        event.set()
        time.sleep(5)
        print("BOSS:<22:00>能夠下班了。")
        print(event.isSet())
        event.set()


class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")


if __name__ == "__main__":
    event = threading.Event()
    threads = []
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
同步條件Event

Event內部包含了一個標誌位,初始的時候爲false。
可使用使用set()來將其設置爲true;
或者使用clear()將其重新設置爲false;
可使用is_set()來檢查標誌位的狀態;
另外一個最重要的函數就是wait(timeout=None),用來阻塞當前線程,直到event的內部標誌位被設置爲true或者timeout超時。若是內部標誌位爲true則wait()函數理解返回。

多線程利器——隊列(queue)

由於列表是不安全的數據結構,因此引伸了新的模塊——隊列

# 列表是不安全的數據結構     舉個簡單的例子

li = [1, 2, 3, 4, 5]


def remove():
    while True:
        xx = li[-1]
        print(xx)
        time.sleep(1)
        li.remove(xx)


A = threading.Thread(target=remove)
B = threading.Thread(target=remove)

A.start()
B.start()
爲何列表是不安全的數據結構

Python 的 queue 模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列QueueLIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue

這些隊列都實現了鎖原語,可以在多線程中直接使用,可使用隊列來實現線程間的同步。

queue 模塊中的經常使用方法:

  • queue.qsize() 返回隊列的大小
  • queue.empty() 若是隊列爲空,返回True,反之False
  • queue.full() 若是隊列滿了,返回True,反之False
  • queue.full 與 maxsize 大小對應
  • queue.get([block[, timeout]])獲取隊列,timeout等待時間
  • queue.get_nowait() 至關queue.get(False)
  • queue.put(item) 寫入隊列,timeout等待時間
  • queue.put_nowait(item) 至關Queue.put(item, False)
  • queue.task_done() 在完成一項工做以後,queue.task_done()函數向任務已經完成的隊列發送一個信號
  • queue.join() 實際上意味着等到隊列爲空,再執行別的操做
import queue

# 隊列有三種模式
# 先進先出
qu = queue.Queue()

qu.put("alex")
qu.put(123)
qu.put({"age":18})

while True:
    print(qu.get())
    print("————————")
FIFO
# 先進後出
qu = queue.LifoQueue()

qu.put("alex")
qu.put(123)
qu.put({"age":18})

while True:
    print(qu.get())
    print("————————")
LIFO
# 優先級

q = queue.PriorityQueue(3)  # 設定大小

q.put([1, "alex"])
q.put([3, 123])
q.put([2, {"age":18}])
# q.put([4,456])  # 若是裝的大於設定大小,也會阻塞(等待)

# while True:
#     print(q.get()[1])  # get當取不到值以後會等待
#     print("————————")

print(q.qsize())  # 查看當前隊列有多少個
print(q.empty())  # 判斷是否爲空
print(q.full())   # 判斷是否爲滿
優先級
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 實例
import queue
import threading
import time
 
go = False  # 設定標識位
 
 
class MyThread(threading.Thread):
     def __init__( self , threadID, name, q):
         threading.Thread.__init__( self )
         self .threadID = threadID
         self .name = name
         self .q = q
 
     def run( self ):
         print ( "開啓線程:{}" . format ( self .name))
         process_data( self .name, self .q)
         print ( "退出線程:{}" . format ( self .name))
 
 
def process_data(thread_name,q):
     while not go:
         queue_lock.acquire()        # 得到鎖
         if not work_queue.empty():  # 若是隊列爲空返回True,反之False
             data = q.get()          # 向隊列取值,先進先出
             queue_lock.release()    # 釋放鎖
             print ( "{} processing {}" . format (thread_name,data))
         else :
             queue_lock.release()
         time.sleep( 1 )
 
thread_list = [ "Thread-1" , "Thread-2" , "Thread-3" ]
name_list = [ "one" , "two" , "three" , "four" , "five" ]
queue_lock = threading.Lock()  # 同步鎖
 
work_queue = queue.Queue( 10 )
threads = []
threads_ID = 1
 
# 建立新線程
for t in thread_list:
     thread = MyThread(threads_ID,t,work_queue)  # 建立線程
     thread.start()          # 啓動線程
     threads.append(thread)  # 追加線程對象到列表
     threads_ID + = 1         # ID自加1
 
# 填充隊列
queue_lock.acquire()
for name in name_list:
     work_queue.put(name)  # 向隊列填充
queue_lock.release()
 
# 等待隊列清空.  清空返回True,則此循環會跳過
while not work_queue.empty():
     pass
 
# 改變狀態,通知線程退出
go = True
 
# 等待全部線程完成
for t in threads:
     t.join()
print ( "退出主線程。" )

生產者與消費者模型

在這個現實社會中,生活中到處充滿了生產和消費.

什麼是生產者消費者模型

在 工做中,可能會碰到這樣一種狀況:某個模塊負責產生數據,這些數據由另外一個模塊來負責處理(此處的模塊是廣義的,能夠是類、函數、線程、進程等)。產生數據的模塊,就形象地稱爲生產者;而處理數據的模塊,就稱爲消費者。在生產者與消費者之間在加個緩衝區,形象的稱之爲倉庫,生產者負責往倉庫了進商 品,而消費者負責從倉庫裏拿商品,這就構成了生產者消費者模型。結構圖以下

生產者消費者模型的優勢

一、解耦

假設生產者和消費者分別是兩個類。若是讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。未來若是消費者的代碼發生變化, 可能會影響到生產者。而若是二者都依賴於某個緩衝區,二者之間不直接依賴,耦合也就相應下降了。

舉個例子,咱們去郵局投遞信件,若是不使用郵筒(也就是緩衝區),你必須得把信直接交給郵遞員。有同窗會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產生和你和郵遞員之間的依賴(至關於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要從新認識一下(至關於消費者變化致使修改生產者代碼)。而郵筒相對來講比較固定,你依賴它的成本就比較低(至關於和緩衝區之間的弱耦合)。

二、支持併發

因爲生產者與消費者是兩個獨立的併發體,他們之間是用緩衝區做爲橋樑鏈接,生產者只須要往緩衝區裏丟數據,就能夠繼續生產下一個數據,而消費者只須要從緩衝區了拿數據便可,這樣就不會由於彼此的處理速度而發生阻塞。

接上面的例子,若是咱們不使用郵筒,咱們就得在郵局等郵遞員,直到他回來,咱們把信件交給他,這期間咱們啥事兒都不能幹(也就是生產者阻塞),或者郵遞員得挨家挨戶問,誰要寄信(至關於消費者輪詢)。

三、支持忙閒不均

緩衝區還有另外一個好處。若是製造數據的速度時快時慢,緩衝區的好處就體現出來了。當數據製造快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中。 等生產者的製造速度慢下來,消費者再慢慢處理掉。

爲了充分複用,再拿寄信的例子來講事。假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節(也多是聖誕節)送賀卡,須要寄出去的信超過1000封,這時 候郵筒這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來 時再拿走。

對生產者與消費者模型的闡釋就進行到這裏,用代碼實現生產者與消費者模型

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making.....正在製做包子...")
    time.sleep(5)
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    q.join()
    print("ok......")

def Consumer(name):
  count = 0
  while count <10:
        time.sleep(random.randrange(4))  # 產生一個隨機數(1秒-3秒之間)
        data = q.get()
        print("eating.......")
        time.sleep(4)  # 4秒鐘這後
        q.task_done()  # 給他發一個信號,纔打印ok
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
        count +=1

p1 = threading.Thread(target=Producer, args=('A君',))
c1 = threading.Thread(target=Consumer, args=('B君',))
c2 = threading.Thread(target=Consumer, args=('C君',))
c3 = threading.Thread(target=Consumer, args=('D君',))

p1.start()
c1.start()
c2.start()
c3.start()
包子工廠
複製代碼
import threading, time, queue

q = queue.Queue()


def consumer(q):
    while True:
        msg = q.get()
        if isinstance(msg, str) and msg == "quit":
            break
        else:
            print(msg)
    print("Bye byes")


def producer():
    start_time = time.time()
    while time.time() - start_time < 5:
        q.put('something at %s' % time.time())
        time.sleep(1)
    q.put('quit')

factory =threading.Thread(target=producer)
worker = threading.Thread(target=consumer, args=(q,))

factory.start()  # 開啓生產者線程
worker.start()   # 開啓消費者線程
複製代碼

協程

在學習異步IO模型前,先來了解協程。

一大波闡釋即將到臨,非高能請注意閃躲(仔細閱讀)

概念:協程,又稱微線程,纖程。英文名Coroutine。 是非搶佔式的程序 主要也是解決I/O操做的

協程的概念很早就提出來了,但直到最近幾年纔在某些語言(如Lua)中獲得普遍應用。

子程序,或者稱爲函數,在全部語言中都是層級調用,好比A調用B,B在執行過程當中又調用了C,C執行完畢返回,B執行完畢返回,最後是A執行完畢。

因此子程序調用是經過棧實現的,一個線程就是執行一個子程序。

子程序調用老是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不一樣。

協程看上去也是子程序,但執行過程當中,在子程序內部可中斷,而後轉而執行別的子程序,在適當的時候再返回來接着執行。

優勢:

優勢1: 協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。

優勢2: 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。

由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。

在此引伸了下生成器的內容

# 生成器
def f():

    print("ok")
    s = yield 6
    print(s)
    print("ok2")
    yield

gen=f()
# print(gen)
# next(gen)  # 方法一
# next(gen)

RET=gen.__next__()  # 方法二
print(RET)

gen.send(5)  # 方法三
生成器簡單複習

 

import time
import queue

def consumer(name):
    print("--->ready to eat baozi........")
    while True:
        new_baozi = yield  # yield實現上下文切換,傳包子進來
        print("[%s] is eating baozi %s" % (name,new_baozi))
        #time.sleep(1)

def producer():

    r = con.__next__()
    r = con2.__next__()
    n = 0
    while 1:
        time.sleep(1)
        print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) )
        con.send(n)  # 發送告訴他有包子了
        con2.send(n+1)

        n +=2

if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    producer()
yield簡單實現

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

from greenlet import greenlet


def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
gr2.switch()
View Code

Gevent

import gevent
import requests,time

start_time = time.time()


def get_url(url):
    print("get: {}".format(url))
    resp = requests.get(url)
    data = resp.text
    print(len(data),url)

# get_url('https://www.python.org/')
# get_url('https://www.yahoo.com/')
# get_url('https://www.baidu.com/')
# get_url('https://www.sina.com.cn/')
# get_url('http://www.xiaohuar.com/')

gevent.joinall(
    [
        gevent.spawn(get_url, 'https://www.python.org/'),
        gevent.spawn(get_url, 'https://www.yahoo.com/'),
        gevent.spawn(get_url, 'https://www.baidu.com/'),
        gevent.spawn(get_url, 'https://www.sina.com.cn/'),
        gevent.spawn(get_url,'http://www.xiaohuar.com/')
    ]
)


print(time.time()-start_time)
View Code

協程的優點

一、沒有切換的消耗

二、沒有鎖的概念

有一個問題:能用多核嗎?

答:能夠採用多進程+協程,是一個很好的解決併發的方案

相關文章
相關標籤/搜索