協程 事件

引子:python

   由於 有GIL鎖,因此cpython沒法利用多核CPU的優點,只能使用單核併發的執行,效率明顯不高。json

  對於計算密集型任務而言,無需任何操做就能一直佔用cpu到超時,米辦法提升效率,除非去鎖,讓多核CPU並行執行;服務器

  對於io密集型任務而言,一旦線程遇到io操做CPU就會且到其餘線程,切到的線程不肯定,應用程序沒法控制,就會下降效率;可是若是一個線程可以檢測io操做並將其設置爲非阻塞,那不就自動切換到其餘任務了嗎,也就是在單線程下實現併發           併發

 

1.單線程實現併發         切換任務+保存狀態異步

  1.實現條件:要單線程實現併發得找到一種方案,可以在兩個任務之間切換執行而且保存狀態。函數

  而在python中生成器就具有這麼一個特色那咱們不就能夠利用生成器來實現併發執行,好比每次調用next都會回到生成器函數中執行代碼,這意味着任務間能夠切換,而且是基於上一次運行的結果,這意味着生成器會自動保存執行狀態。測試

def task1():
    while True:
        yield
        print("task1 run")

def task2():
    g = task1()
    while True:
        next(g)
        print("task2 run")
task2()

import time def task1(): a
= 0 for i in range(10000000): a += i yield def task2(): g = task1() b = 0 for i in range(10000000): b += 1 next(g) s = time.time() task2() print("併發執行時間",time.time()-s) # 單線程下串行執行兩個計算任務 效率反而比並發高 由於併發須要切換和保存 def task1(): a = 0 for i in range(10000000): a += i def task2(): b = 0 for i in range(10000000): b += 1 s = time.time() task1() task2() print("串行執行時間",time.time()-s)

  2.greenlet   模塊spa

    因爲yield切換代碼很是混亂,若是任務多,不敢想象,所以有人對yield進行分裝,也就有了greenlet操作系統

def task1(name):
    print("%s task1 run1" % name)
    g2.switch(name) # 切換至任務2
    print("task1 run2") 
    g2.switch() # 切換至任務2

def task2(name):
    print("%s task2 run1" % name)
    g1.switch() # 切換至任務1
    print("task2 run2")

g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch("jerry") # 爲任務傳參數

可是greenlet與yield不能檢測io,遇到io一樣會阻塞,所以咱們須要有一種既可檢測io,又可實現單線程併發的方案,因而就有了geven.monkey(補丁)線程

# gevent 不具有檢測IO的能力  須要爲它打補丁  打上補丁以後就能檢測IO
# 注意補丁必定打在最上面    必須保證導入模塊前就打好補丁
from gevent import monkey
monkey.patch_all()   #

from threading import current_thread
import gevent,time


def task1():
    print(current_thread(),1)
    print("task1 run")
    # gevent.sleep(3)
    time.sleep(3)
    print("task1 over")

def task2():
    print(current_thread(),2)
    print("task2 run")
    print("task2 over")

# spawn 用於建立一個協程任務
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)

# 任務要執行,必須保證主線程沒掛  由於全部協程任務都是主線在執行   ,必須調用join來等待協程任務
# g1.join()
# g2.join()
# 理論上等待執行時間最長的任務就行 , 可是不清楚誰的時間長 能夠所有join

gevent.joinall([g1,g2])
print("over")

2.線程中的隊列

  Queue :與進程中的Joinablequeue 使用方式如出一轍 可是 不具有IPC

   LifoQueue :last in first out 後進先出 先進 後出 模擬堆棧

   PriorityQueue:具有優先級的隊列   能夠存儲一個能夠比較大小的對象 比較越小的優先級越高 自定義對象 不能使用比較運算符 因此不能存儲

from queue import Queue,LifoQueue,PriorityQueue
=====================================================================
q = Queue()
#
# q.put("123")
# q.put("456")
#
# print(q.get())
# print(q.get())
#
# # print(q.get(block=True,timeout=3))
# q.task_done()
# q.task_done()
# q.join()
# print("over")
==========================================================================
lq = LifoQueue()
#
# lq.put("123")
# lq.put("456")
#
# print(lq.get())
# print(lq.get())
====================================================================
class A(object):
    def __init__(self,age):
        self.age = age

    # def __lt__(self, other):
    #     return self.age < other.age
    #
    # def __gt__(self, other):
    #     return self.age > other.age

    def __eq__(self, other):
        return self.age == other.age
a1 = A(50)
a2 = A(50)



print(a1 == a2)

# print(a1 is a1)

# print(pq.get())


# pq = PriorityQueue()
# pq.put("a")
# pq.put("A")
# pq.put("C")

3.協程程的概述

  定義:單線程下的併發,又稱爲微線程,纖程,是一種用戶態的輕量級線程,由用戶程序本身控制調度的

  注:1.python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)

    2.單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)

對比操做系統控制線程的切換,用戶在單線程內控制協程的切換

優勢以下:

1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級

2. 單線程內就能夠實現併發的效果,最大限度地利用cpu

缺點以下:  

1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程來儘量提升效率
2. 協程本質是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

import gevent,sys
from gevent import monkey # 導入monkey補丁
monkey.patch_all() # 打補丁 
import time

print(sys.path)

def task1():
    print("task1 run")
    # gevent.sleep(3)
    time.sleep(3)
    print("task1 over")

def task2():
    print("task2 run")
    # gevent.sleep(1)
    time.sleep(1)
    print("task2 over")

g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
#gevent.joinall([g1,g2])
g1.join()
g2.join()
# 執行以上代碼會發現不會輸出任何消息
# 這是由於協程任務都是以異步方式提交,因此主線程會繼續往下執行,而一旦執行完最後一行主線程也就結束了,
# 致使了協程任務沒有來的及執行,因此這時候必須join來讓主線程等待協程任務執行完畢   也就是讓主線程保持存活
# 後續在使用協程時也須要保證主線程一直存活,若是主線程不會結束也就意味着不須要調用join

須要注意:

1.若是主線程結束了 協程任務也會當即結束。

2.monkey補丁的原理是把原始的阻塞方法替換爲修改後的非阻塞方法,即偷樑換柱,來實現IO自動切換

必須在打補丁後再使用相應的功能,避免忘記,建議寫在最上方

monke補丁案例:

#myjson.py
def dump():
    print("一個被替換的 dump函數")

def load():
    print("一個被替換的 load函數")
# test.py
import myjson
import json
# 補丁函數
def monkey_pacth_json():
    json.dump = myjson.dump
    json.load = myjson.load
    
# 打補丁
monkey_pacth_json()

# 測試 
json.dump()
json.load()
# 輸出:
# 一個被替換的 dump函數
# 一個被替換的 load函數

4.事件

from threading import Thread,Event

import time


# is_running = False
#
# def boot_server():
#     global is_running
#     print("正在啓動服務器......")
#     time.sleep(3)
#     print("服務器啓動成功!")
#     is_running = True
#
#
# def connect_server():
#     while True:
#         if is_running:
#             print("連接服務器成功!")
#             break
#         else:
#             time.sleep(0.1)
#             print("error 服務器未啓動!")
#
#
#
# t1 = Thread(target=boot_server)
# t1.start()
#
#
# t2 = Thread(target=connect_server)
# t2.start()






boot_event = Event()

# boot_event.clear() 回覆事件的狀態爲False
# boot_event.is_set() 返回事件的狀態
# boot_event.wait()等待事件發生 ,就是等待事件被設置爲True
# boot_event.set() 設置事件爲True



def boot_server():
    print("正在啓動服務器......")
    time.sleep(3)
    print("服務器啓動成功!")
    boot_event.set() # 標記事件已經發生了


def connect_server():
    boot_event.wait() # 等待事件發生
    print("連接服務器成功!")

t1 = Thread(target=boot_server)
t1.start()

t2 = Thread(target=connect_server)
t2.start()
相關文章
相關標籤/搜索