Python進階----異步同步,阻塞非阻塞,線程池(進程池)的異步+回調機制實行併發, 線程隊列(Queue, LifoQueue,PriorityQueue), 事件Event,線程的三個狀態(就

Python進階----異步同步,阻塞非阻塞,線程池(進程池)的異步+回調機制實行併發, 線程隊列(Queue, LifoQueue,PriorityQueue), 事件Event,線程的三個狀態(就緒,掛起,運行) ,***協程概念,yield模擬併發(有缺陷),Greenlet模塊(手動切換),Gevent(協程併發)

一丶同步,異步

同步:

   所謂同步就是一個任務須要依賴另外一個任務時,只有被依賴任務執行完畢以後,依賴的任務纔會完成.這是可靠的任務序列.要麼都成功,要麼失敗,兩個任務的狀態能夠保持一致.html

異步:

   所謂異步不須要等待被依賴的任務完成,只是通知依賴的任務要完成什麼工做.依賴的任務也當即執行,只要本身完成了整個任務就算完成了. 至於被依賴的任務是否完成,依賴它的任務沒法肯定,是不可靠的任務序列python

### 同步和異步
## 好比我去銀行辦理業務,可能會有兩種方式:
# 第一種 :選擇排隊等候;
# 第二種 :選擇取一個小紙條上面有個人號碼,等到排到我這一號時由櫃檯的人通知我輪到我去辦理業務了;

# 第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務狀況;

# 第二種:後者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)每每註冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這裏是櫃檯的人)經過某種機制(在這裏是寫在小紙條上的號碼,喊號)找到等待該事件的人。

二丶阻塞,非阻塞,

   阻塞和非阻塞兩個概念與程序(也就是執行程序的'線程')等待消息通知時的狀態相關git

阻塞:

   在程序中,阻塞表明程序'卡'在某處,必須等待這處執行完畢才能繼續執行.一般的阻塞大多數是IO阻塞github

   好比:銀行排隊取錢是一條流水線,如今負責取錢的服務人員餓了,他必須吃飯(阻塞). 只有吃完飯才能繼續回來服務你.此時你就必須等待他,不然你將沒法取錢.對於程序而言,就卡在了此處.編程

非阻塞:

   非阻塞就是沒有IO阻塞,線程在執行任務時沒有遇到IO阻塞.服務器

   好比:你去銀行取錢,在排隊'等候'時什麼事情都沒有發生. 強調在執行的過程網絡

同步阻塞:

   效率最低.你排着隊取錢,服務人員吃飯去了(阻塞了),此時你只能等待,不然不能取錢.這就是同步+阻塞數據結構

異步阻塞:

   在銀行等待辦理業務的人,採用異步方式. 可是他不能離開銀行併發

   異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。app

同步非阻塞:

   其實是效率低下的。

   想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的。

異步非阻塞:

   效率更高

   由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換

   好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。

   不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來,一樣的,不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞

三丶異步+回調機制

提升效率版:

#######  併發爬取 , 併發處理爬取結果
        # 缺點: 1.加強了耦合性,
        #      2.開啓進程耗費資源
        # 優勢: 1. 提升處理效率
        
from  concurrent.futures import ProcessPoolExecutor
import time
import random
import os
import requests

def get_html(url):
    response=requests.get(url)

    print(f'{os.getpid()} 正在爬取網頁~~~')

    if response.status_code==200:
        parser_html(response.text)

def parser_html(obj):

    print(f'總字符長度:{len(obj.result()) }')

if __name__ == '__main__':
    url_list = [
        'http://www.taobao.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.baidu.com',
        'https://www.cnblogs.com/jin-xin/articles/11232151.html',
        'https://www.cnblogs.com/jin-xin/articles/10078845.html',
        'http://www.sina.com.cn',
        'https://www.sohu.com',
        'https://www.youku.com',
    ]
    pool=ProcessPoolExecutor(4)         # 開啓了一個進程池 有4個進程資源

    for url in url_list:
        obj=pool.submit(get_html,url)   #  異步的開啓了 10個任務,4個進程並行(併發)執行.

    pool.shutdown(wait=True)    # 必須等待全部的子進程任務執行完畢

下降耦合版本:

# 併發爬取, 串行解析結果
########### 回調函數 + 異步
    #  1. 下降了耦合性, 由回調函數 去通知執行下一個任務(形成這個任務會經歷串行)
    #  2. 處理爬取結果時是串行處理,影響效率
    
import requests
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import time
import random
import os

def get(url):
    response = requests.get(url)
    print(f'{os.getpid()} 正在爬取:{url}')
    # time.sleep(random.randint(1,3))
    if response.status_code == 200:
        
        return response.text


def parse(obj):
    '''
    對爬取回來的字符串的分析
    簡單用len模擬一下.
    :param text:
    :return:
    '''
    time.sleep(1)
    
    ### obj.result() 取得結果
    print(f'{os.getpid()} 分析結果:{len(obj.result())}')

if __name__ == '__main__':

    url_list = [
        'http://www.taobao.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.baidu.com',
        'https://www.cnblogs.com/jin-xin/articles/11232151.html',
        'https://www.cnblogs.com/jin-xin/articles/10078845.html',
        'http://www.sina.com.cn',
        'https://www.sohu.com',
        'https://www.youku.com',
    ]
    start_time = time.time()
    pool = ProcessPoolExecutor(4)
    for url in url_list:
        obj = pool.submit(get, url)
        obj.add_done_callback(parse)  # 增長一個回調函數
        # 如今的進程完成的仍是網絡爬取的任務,拿到了返回值以後,結果丟給回調函數add_done_callback,
        # 回調函數幫助你分析結果
        # 進程繼續完成下一個任務.
    pool.shutdown(wait=True)

    print(f'主: {time.time() - start_time}')

四丶線程隊列

   使用 queue 模塊

先進先出:FIFO

      Queue

# -*-coding:utf-8-*-
# Author:Ds

import queue

q = queue.Queue(3) # 先進先出隊列


q.put(1)
q.put(2)
q.put('123')
# q.put(666)              # 阻塞 卡住了
# q.put(timeout=1)        # 超時1秒報錯 queue.Full
# q.put(1,block=False)        # 非阻塞,直接報錯 queue.Full


print(q.get())
print(q.get())
print(q.get())
# print(q.get())              #阻塞 卡住
# print(q.get(timeout=1))      #超時1秒報錯 queue.Empty
print(q.get(block=False))       #非阻塞,直接報錯queue.Empty

先進後出(後進先出):LIFO

      LifoQueue

# -*-coding:utf-8-*-
# Author:Ds

import queue

q = queue.LifoQueue(3) #後進先出隊列 (棧)

q.put(1)
q.put(2)
q.put('123')
# q.put(666)              # 阻塞 卡住了
## q.put(timeout=1)        # 超時1秒報錯 queue.Full
### q.put(1,block=False)        # 非阻塞,直接報錯 queue.Full


print(q.get())
print(q.get())
print(q.get())
# print(q.get())              #阻塞 卡住
## print(q.get(timeout=1))      #超時1秒報錯 queue.Empty
### print(q.get(block=False))       #非阻塞,直接報錯queue.Empty
# 使用列表數據結構模擬棧
li=[]
li.append(1) # 後進  添加元素到列表末尾
li.pop()     # 先出  移除列表末尾元素

優先級隊列:

      PriorityQueue

# -*-coding:utf-8-*-
# Author:Ds

import queue

q = queue.PriorityQueue(3)  # 優先級隊列

# 放入元組類型()數據, 第一個參數表示優先級別,第二個參數是真實數據
#  數字越低表示優先級越高
q.put((10, '垃圾消息'))
q.put((-9, '緊急消息'))
q.put((3, '通常消息'))
# q.put((3, '我被卡主了 '))              # 卡主了
# q.put((3, '我被卡主了 '),timeout=1)      # 超時報錯: queue.Full
q.put((3, '我被卡主了 '),block=False)        # 不阻塞: queue.Full

print(q.get())
print(q.get())
print(q.get())
print(q.get())              #阻塞 卡住
print(q.get(timeout=1))      #超時1秒報錯 queue.Empty
print(q.get(block=False))       #非阻塞,直接報錯queue.Empty

五丶事件Event

   線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

方法:

   event.isSet():返回event的狀態值;

   event.wait():若是 event.isSet()==False將阻塞線程;

   event.set():設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態,等待操做系統調度;

   event.clear():恢復event的狀態值爲False

import time
from threading import Thread
from threading import current_thread
from threading import Event

event = Event()  # 默認是False
def task():
    print(f'{current_thread().name} 檢測服務器是否正常開啓....')
    time.sleep(3)
    event.set()  # 改爲了True

def task1():
    print(f'{current_thread().name} 正在嘗試鏈接服務器')
    # event.wait()  # 輪詢檢測event是否爲True,當其爲True,繼續下一行代碼. 阻塞.
    event.wait(1)
    # 設置超時時間,若是1s中之內,event改爲True,代碼繼續執行.
    # 設置超時時間,若是超過1s中,event沒作改變,代碼繼續執行.
    print(f'{current_thread().name} 鏈接成功')
if __name__ == '__main__':
    t1 = Thread(target=task1,)
    t2 = Thread(target=task1,)
    t3 = Thread(target=task1,)

    t = Thread(target=task)


    t.start()
    t1.start()
    t2.start()
    t3.start()

紅綠燈Event事件模型:

# _*_coding:utf-8_*_
# Author   :Ds  
# CreateTime   2019/5/30 17:54 

import  threading ,time
event=threading.Event() # 聲明一個event全局變量
def lighter():
    count=0         #計數
    event.set()     #設置有標誌
    while True:        #循環
        if count > 5 and count<10:#    紅燈5秒
            event.clear()# 清空標誌位
            print("\033[41;1mred light is on...\033[0m")
        elif count>10:  #  綠燈5秒
            event.set()#變綠燈
            count=0 #清空count
        else:
            print('\033[42;1mgreen light is on...\033[0m')
        time.sleep(1)
        count+=1

def car(name):
    while True:
        if event.is_set(): #is_set 判斷設置了標誌位沒有
            print('[%s] running ...'%name)
            time.sleep(1)
        else:
            print(' [%s] see red  light waiting '%name)
            event.wait()
            print('\033[43;lm [%s] green light  is on  ,start going ..\033[0m'%name)
            
light=threading.Thread(target=lighter,)
car1=threading.Thread(target=car,args=('特斯拉',))
car2=threading.Thread(target=car,args=('奔馳',))
light.start()
car1.start()
car2.start()

六丶協程

   進程是資源分配的最小單位,線程是CPU調度的最小單

   併發的本質:切換+保存狀態

線程也具備三個狀態:

      cpu正在運行一個任務,會在兩種狀況下切走去執行其餘的任務(切換由操做系統強制控制),一種狀況是該任務發生了阻塞,另一種狀況是該任務計算的時間過長

      ps:在介紹進程理論時,說起進程的三種執行狀態,而線程纔是執行單位,因此也能夠將上圖理解爲線程的三種狀態

img

yield模擬併發:

      1. yiled能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級
      2. send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換

   一:其中第二種狀況並不能提高效率,只是爲了讓cpu可以雨露均沾,實現看起來全部任務都被「同時」執行的效果,若是多個任務都是純計算的,這種切換反而會下降效率。爲此咱們能夠基於yield來驗證。yield自己就是一種在單線程下能夠保存任務運行狀態的方法,咱們來簡單複習一下:

'''
一、協程:
    單線程實現併發
    在應用程序裏控制多個任務的切換+保存狀態
    優勢:
        應用程序級別速度要遠遠高於操做系統的切換
    缺點:
        多個任務一旦有一個阻塞沒有切,整個線程都阻塞在原地
        該線程內的其餘的任務都不能執行了

        一旦引入協程,就須要檢測單線程下全部的IO行爲,
        實現遇到IO就切換,少一個都不行,覺得一旦一個任務阻塞了,整個線程就阻塞了,
        其餘的任務即使是能夠計算,可是也沒法運行了

二、協程序的目的:
    想要在單線程下實現併發
    併發指的是多個任務看起來是同時運行的
    併發=切換+保存狀態
'''

#串行執行
import time

def func1():
    for i in range(10000000):
        i+1

def func2():
    for i in range(10000000):
        i+1

start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)


#基於yield併發執行
import time
def func1():
    while True:
        yield

def func2():
    g=func1()
    for i in range(10000000):
        i+1
        next(g)

start=time.time()
func2()
stop=time.time()
print(stop-start)

#  單純地切換反而會下降運行效率

   二:第一種狀況的切換。在任務一遇到io狀況下,切到任務二去執行,這樣就能夠利用任務一阻塞的時間完成任務二的計算,效率的提高就在於此。

import time
def func1():
    while True:
        print('func1')
        yield

def func2():
    g=func1()
    for i in range(10000000):
        i+1
        next(g)
        time.sleep(3)
        print('func2')
start=time.time()
func2()
stop=time.time()
print(stop-start)

yield不能檢測IO,實現遇到IO自動切換

協程介紹:

   協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。

   一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

須要強調的是:

#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)
#2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)

優勢以下:

   1.協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
​   2.單線程內就能夠實現併發的效果,最大限度地利用cpu

缺點以下:

   1.協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程

​   2.協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

總結協程特色:

   1.必須在只有一個單線程裏實現併發

​   2.修改共享數據不需加鎖

   3.用戶程序裏本身保存多個控制流的上下文棧

   4.附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

Greenlet模塊:手動模擬切換

   安裝 :pip3 install greenlet

      手動實現切換

from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('egon')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')   #能夠在第一次switch時傳入參數,之後都不須要

      效率對比:

         greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。

### 串行執行計算密集型~~  11.37856674194336
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start_time=time.time()
f1()
f2()
print(f'runing time {time.time()-start_time}')  # runing time 11.37856674194336




### 切換執行計算密集型~~  runing time 60.24287223815918
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start_time=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
print(f'runing time {time.time()-start_time}')  # runing time 60.24287223815918

Gevent模塊:

​   安裝:pip3 install gevent

      Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度

###  用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

​   遇到IO阻塞時會自動切換任務

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')

   '打補丁:monkey'

# from gevent import monkey 
# monkey.patch_all()    必須放到被打補丁者的前面,

import threading
from gevent import monkey
monkey.patch_all()      # 打補丁,自動切換

import gevent
import time
def eat():
    print(threading.current_thread().getName()) # 虛擬線程 DummyThread-n
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print(threading.current_thread().getName()) # 虛擬線程 DummyThread-n 
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])     # 執行g1 g2

print(threading.current_thread().getName())     # MainThread 主線程
print('主')

   協程應用:爬蟲:

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))         # 使用協程爬取,計算爬取的時間
相關文章
相關標籤/搜索