同步,異步,阻塞,非阻塞,異步加回調機制,線程隊列,event事件

1. 同步,異步,阻塞,非阻塞,

  • 站在任務發佈的角度:
    • 同步:任務發出去以後,等待結果,直到這個任務最終結束後,返回結果,再發步下一個任務.
    • 異步:全部任務同時發出,不會在原地等待結果返回
  • 程序運行中表現得狀態:阻塞.運行,就緒
    • 阻塞:程序遇到IO阻塞,程序遇到IO立馬會中止(掛起),cpu立刻切換,等IO結束後再執行python

    • 非阻塞:程序沒有IO或者遇到IO經過某種手段讓CPU去執行同一個線程裏面的其餘的任務,儘量的佔用CPUgit

      # 異步回收任務的方式一: 將全部任務的結果統一收回
      from concurrent.futures import ProcessPoolExecutor
      import os
      import time
      import random
      def task():
          print(f'{os.getpid()} is running')
          time.sleep(random.randint(1, 3))
          return f'{os.getpid()} is finish'
      
      if __name__ == '__main__':
          p = ProcessPoolExecutor(4)
          lst = []
          for i in range(10):
              res = p.submit(task)   # 異步發出
              lst.append(res)
              # print(res.result())  # 在這裏result()就會變成同步
          p.shutdown(wait=True)
          # 1.阻止再向進程池投放新的任務
          # 2.wait=True 一個任務完成了就減一,直至爲0才執行下一行
          for res in lst:
              print(res.result())

2.異步加回調機制,

# 瀏覽器作的事情很簡單,封裝一些頭部,發一個請求到服務器,服務器拿到請求信息,分析信息,分析正確以後,給瀏覽器返回一個文件,瀏覽器將這個文件的代碼渲染就成了網頁
# 爬蟲: 利用requests模塊,模擬瀏覽器,封裝頭給服務器發送請求,騙過服務器,服務器也給你返回一個文件,
# 爬蟲拿到文件進行數據清洗,獲取想要的信息
# 爬蟲: 分兩步
#   第一步: 爬取服務端的文件(IO阻塞)
#   第二步: 拿到文件,進行數據清洗(非IO,極少IO)

# 版本一
from concurrent.futures import ProcessPoolExecutor
import requests
import time
import os
import random
def get(url):  # 爬取文件
    response = requests.get(url)
    print(os.getpid(), '正在爬取:', url)
    time.sleep(random.randint(1, 3))
    if response.status_code == 200:
        return response.text

def parse(text): # 對爬取回來的字符串的分析,用len模擬一下
    print('分析結果:', len(text))

if __name__ == '__main__':
    url_list = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/',
        'https://www.cnblogs.com/'
    ]
    pool = ProcessPoolExecutor(4)
    obj_list = []
    for url in url_list:
        obj = pool.submit(get, url)
        obj_list.append(obj)
    pool.shutdown(wait=True)
    for obj in obj_list:
        text = obj.result()
        parse(text)
# 問題出在哪裏?
# 1.分析結果的過程是串行,效率低
# 2.將全部的結果所有爬取成功以後,放在一個列表中
-------------------------------------------------------
# 版本二:異步處理,獲取結果的第二種方式
# 完成一個任務,返回一個結果,併發的獲取結果
from concurrent.futures import ProcessPoolExecutor
import requests
import time
import os
import random
def get(url):  # 爬取文件
    response = requests.get(url)
    print(os.getpid(), '正在爬取:', url)
    time.sleep(random.randint(1, 3))
    if response.status_code == 200:
        parse(response.text)
        # return response.text

def parse(text): # 對爬取回來的字符串的分析,用len模擬一下
    print('分析結果:', len(text))

if __name__ == '__main__':
    url_list = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/',
        'https://www.cnblogs.com/'
    ]
    pool = ProcessPoolExecutor(4)
    for url in url_list:
        obj = pool.submit(get, url)
    pool.shutdown(wait=True)
# 問題,加強了耦合性
------------------------------------------------------
# 版本三: 版本二,兩個任務有耦合性.在上一個基礎上,對其進行解耦
from concurrent.futures import ProcessPoolExecutor
import requests
import time
import os
import random
def get(url):  # 爬取文件
    response = requests.get(url)
    print(os.getpid(), '正在爬取:', url)
    time.sleep(random.randint(1, 3))
    if response.status_code == 200:
        return response.text

def parse(obj): # 對爬取回來的字符串的分析,用len模擬一下
    print(f'{os.getpid()}分析結果:', len(obj.result()))

if __name__ == '__main__':
    url_list = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/',
        'https://www.cnblogs.com/'
    ]
    pool = ProcessPoolExecutor(4)
    for url in url_list:
        obj = pool.submit(get, url)
        obj.add_done_callback(parse)  # 增長一個回調函數
        # 如今的進程完成的仍是網絡爬取的任務,拿到返回值以後,丟給回調函數,
        # 進程繼續完成下一個任務,回調函數進行分析結果
    pool.shutdown(wait=True)
# 回調函數是主進程實現的,回調函數幫咱們進行分析任務
# 明確了進程的任務就是網絡爬取,分析任務交給回調函數執行,對函數之間解耦

# 極值狀況: 若是回調函數是IO任務,那麼因爲回調函數是主進程作的,因此有可能影響效率
# 回調不是萬能的,若是回調的任務是IO,那麼異步+回調機制很差,此時若是須要效率,只能再開一個線程或進程池

# 異步就是回調?
# 這個論點是錯的,異步,回調是兩個概念

# 若是多個任務,多進程多線程處理的IO任務
# 1. 剩下的任務 非IO阻塞   異步+回調機制
# 2. 剩下的任務有 IO  遠小於  多個任務的IO   異步+回調機制
# 3. 剩下的任務 IO 大於等於 多個任務的IO  第二種解決方式,或者開啓兩個進程/線程池

3.線程隊列(進程相同)

  • FIFO: 先進先出github

    import queue
    #不須要經過threading模塊裏面導入,直接import queue就能夠了,這是python自帶的
    #用法基本和咱們進程multiprocess中的queue是同樣的
    q = queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    結果(先進先出):
    first
    second
    third
    '''
  • LIFO: 後進先出(棧)數組

    import queue
    q = queue.LifoQueue() # 隊列,相似於棧
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    結果(後進先出):
    third
    second
    first
    '''
  • Priority: 優先級隊列瀏覽器

    import queue
    
    q = queue.PriorityQueue()
    # put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
    q.put((-10, 'a'))
    q.put((-5, 'a'))  #負數也能夠
    # q.put((20, 'ws'))  #若是兩個值的優先級同樣,那麼按照後面的值的acsii碼順序來排序,若是字符串第一個數元素相同,比較第二個元素的acsii碼順序
    # q.put((20, 'wd'))
    # q.put((20, {'a': 11})) #TypeError: unorderable types: dict() < dict() 不能是字典
    # q.put((20, ('w', 1)))  #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,能夠是元祖,也是經過元素的ascii碼順序來排序
    
    q.put((20, 'b'))
    q.put((20, 'a'))
    q.put((0, 'b'))
    q.put((30, 'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    結果(數字越小優先級越高,優先級高的優先出隊):
    (-10, 'a')
    (-5, 'a')
    (0, 'b')
    (20, 'a')
    (20, 'b')
    (30, 'c')
    '''

4. Event事件

  • 線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測.若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手.爲了解決這些問題,咱們須要使用threading庫中的Event對象. 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生.在初始狀況下,Event對象中的信號標誌被設置爲假.若是有線程等待一個Event對象,而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真.一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程.若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件,繼續執行(進程也同樣)

    1564147865524

  • 方法服務器

    event.isSet(): 返回event的狀態值
    
    event.wait(): 若是 event.isSet() == False將阻塞線程
    
    event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度
    
    event.clear(): 恢復event的狀態值爲False
  • 示例網絡

    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time
    event = Event()  # 默認False
    def task():
        print(f'{current_thread().name}檢測服務器是否正常開啓....')
        time.sleep(3)
        event.set()  # 改爲True
    
    def task1():
        print(f'{current_thread().name}正在嘗試鏈接服務器')
        event.wait()  # 阻塞,輪詢檢測event是否爲True,當其爲True,繼續下一行代碼
        # event.wait(1) # 超時時間,超過期間不管是否爲True都繼續下一行代碼
        print(f'{current_thread().name}鏈接成功')
    
    if __name__ == '__main__':
        t1 = Thread(target=task1)
        t2 = Thread(target=task1)
        t3 = Thread(target=task1)
        t = Thread(target=task)
        t1.start()
        t2.start()
        t3.start()
        t.start()

    協程

    1. 概念多線程

    協程本質就是一條線程,多個任務在一條線程上來回切換 協程是操做系統不可見的 協程的概念自己並 沒有規避I/O操做,可是咱們能夠利用協程這個概念來實現規避I/O操做,進而達到了咱們將一條線程中 的I/O操做降到最低的目的 協程可以實現的大部分I/O操做都在網絡併發

    2. 相關模塊概覽和協程的應用app

    gevent:利用了greenlet底層模塊(C語言寫的)完成的切換 + 自動規避io的功能

    3. gevent模塊

    import gevent
    def eat(name):
        print('%s eat 1' %name)
        gevent.sleep(2)
        print('%s eat 2' %name)
    
    def play(name):
        print('%s play 1' %name)
        gevent.sleep(1)
        print('%s play 2' %name)
    
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,name='egon')
    g1.join()
    g2.join()
    #或者gevent.joinall([g1,g2])
    print('主')
    
    遇到I/O切換

    上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,

      而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了

      from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前

      或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭

    from gevent import monkey;monkey.patch_all() #必須寫在最上面,這句話後面的全部阻塞所有可以識別了
    
    import gevent  #直接導入便可
    import time
    def eat():
        #print()  
        print('eat food 1')
        time.sleep(2)  #加上mokey就可以識別到time模塊的sleep了
        print('eat food 2')
    
    def play():
        print('play 1')
        time.sleep(1)  #來回切換,直到一個I/O的時間結束,這裏都是咱們個gevent作得,再也不是控制不了的操做系統了。
        print('play 2')
    
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play_phone)
    gevent.joinall([g1,g2])
    print('主')

    咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程,虛擬線程,其實都在一個線程裏面

      進程線程的任務切換是由操做系統自行切換的,你本身不能控制

      協程是經過本身的程序(代碼)來進行切換的,本身可以控制,只有遇到協程模塊可以識別的IO操做的時候,程序纔會進行任務切換,實現併發效果,若是全部程序都沒有IO操做,那麼就基本屬於串行執行了。

Gevent之同步與異步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
#上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

協程:同步異步對比
相關文章
相關標籤/搜索