Python Day 36 線程隊列、線程事件Event、協程、Gevent 模塊實現單線程併發

  ##內容回顧python

#一、什麼是GIL
全局解釋器鎖,本質就是一把互斥鎖,是加到解釋器身上的,每個python進程內都有這麼一把鎖

#二、有了GIL會對單進程下的多個線程形成什麼樣的影響
多線程要想執行,首先須要爭搶GIL,對全部待執行的線程來講,GIL就至關於執行權限,同一時刻只有一個線程爭搶成功,即單進程下的多個線程同一時刻只有一個在運行
意味着單進程下的多線程沒有並行的效果,可是有併發的效果

ps:分散於不一樣進程內的線程不會去爭搶同一把GIL,只有同一個進程的多個線程才爭搶同一把GIL
#三、爲何要有GIL
python解釋器的內存管理機制不是線程安全的
#四、GIL與自定義互斥鎖的區別,多個線程爭搶GIL與自定義互斥鎖的過程分析
相同:
    都是互斥鎖
不一樣點:
    GIL是加到解釋器上的,做用於全局
    自定義互斥鎖做用於局部

    單進程內的全部線程都會去搶GIL
                單進程內的只有一部分線程會去搶自定義的互斥鎖
        5、何時用python的多線程,何時用多進程,爲何?
            單進程下的多個線程是沒法並行,沒法並行意味着不能利用多核優點

            cpu乾的計算的活,多個cpu意味提高了計算性能,
            cpu是沒法作IO操做,多個cpu在IO操做面前毫無用處

            當咱們的程序是IO密集型的狀況下,多核對性能的提高微不足道,此時可使用python的多線程
            當咱們的程序是計算密集型的狀況下,必定要用上多核優點,此時可使用python的多進程
    2、進程池與線程池
        1、池的用途,爲什麼要用它
            池:裝固定數目的東西,東西指的是進程或者線程,讓機器在本身可承受的範圍內去保證一個高效的工做

  ##本身如何直接使用Thread的話 如何完成回調案例mysql

"""
#回調函數:直接使用add_done_callback方法直接調用就好
在線程池/進程池 每次提交任務 都會返回一個表示任務的對象 Future對象 Future對象具有一個綁定方法 add_done_callback 用於指定回調函數 add 意味着能夠添加多個回調函數
#使用線程Thread 自定義一個回調函數
""" from threading import Thread import time # res = None def call_back(res): print("任務結果拿到了:%s" % res) def parser(res): print("任務結果拿到了:%s" % res) def task(callback): # global res print("run") time.sleep(1) # # return 100 res = 100 # 表示任務結果 callback(res) # 執行回調函數 並傳入任務結果 t = Thread(target=task,args=(parser,)) t.start() # t.join() # print(res) print("over")

  ##線程隊列面試

#1.Queue 先進先出隊列

與多進程中的Queue使用方式徹底相同,區別僅僅是不能被多進程共享。

from queue import Queue,LifoQueue,PriorityQueue

# q = Queue()
#
# q.put("123")
# q.put("456")
#
# print(q.get())
# print(q.get())
#
# # print(q.get(block=True,timeout=3))
# q.task_done()
# q.task_done()
# q.join()
# print("over")

#2.LifoQueue 後進先出隊列

該隊列能夠模擬堆棧,實現先進後出,後進先出

# 除順序之外別的都同樣
# lq = LifoQueue()
#
# lq.put("123")
# lq.put("456")
#
# print(lq.get())
# print(lq.get())

#3.PriorityQueue 優先級隊列

該隊列能夠爲每一個元素指定一個優先級,這個優先級能夠是數字,字符串或其餘類型,可是必須是能夠比較大小的類型,取出數據時會按照從小到大的順序取出

pq = PriorityQueue()
# 數字優先級
pq.put((10,"a"))
pq.put((11,"a"))
pq.put((-11111,"a"))

print(pq.get())
print(pq.get())
print(pq.get())
# 字符串優先級
pq.put(("b","a"))
pq.put(("c","a"))
pq.put(("a","a"))

print(pq.get())
print(pq.get())
print(pq.get())

#優先級:對象比較
class A(object):
    def __init__(self,age):
        self.age = age

    # def __lt__(self, other):
    #     return self.age < other.age
    #
    # def __gt__(self, other):
    #     return self.age > other.age

    def __eq__(self, other):
        return self.age == other.age

a1 = A(50)
a2 = A(50)



print(a1 == a2)

# print(a1 is a1)


# pq = PriorityQueue()
# pq.put("a")
# pq.put("A")
# pq.put("C")
#
#



# print(pq.get())

   ##線程事件Eventsql

### 一、什麼是事件

事件表示在某個時間發生了某個事情的通知信號,用於線程間協同工做。

由於不一樣線程之間是獨立運行的狀態不可預測,因此一個線程與另外一個線程間的數據是不一樣步的,當一個線程須要利用另外一個線程的狀態來肯定本身的下一步操做時,就必須保持線程間數據的同步,Event就能夠實現線程間同步

###二、 Event介紹

Event象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

###三、可用方法:
event.isSet():返回event的狀態值;
event.wait():將阻塞線程;知道event的狀態爲True
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。

###四、使用案例
#4-1 在連接mysql服務器前必須保證mysql已經啓動,而啓動須要花費一些時間,因此客戶端不能當即發起連接 須要等待msyql啓動完成後當即發起連接
from threading import Event,Thread
import time

boot = False
def start():
    global boot
    print("正正在啓動服務器.....")
    time.sleep(5)
    print("服務器啓動完成!")
    boot = True
    
def connect():
    while True:
        if boot:
            print("連接成功")
            break
        else:
            print("連接失敗")
        time.sleep(1)

Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()


#4-2使用Event改造後:
「」「
from threading import Event,Thread
import time

e = Event()
def start():
    global boot
    print("正正在啓動服務器.....")
    time.sleep(3)
    print("服務器啓動完成!")
    e.set()

def connect():
    e.wait()
    print("連接成功")
    
Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()

」「」
#4-3增長需求,每次嘗試連接等待1秒,嘗試次數爲3次
「」「
from threading import Event,Thread
import time

e = Event()
def start():
    global boot
    print("正正在啓動服務器.....")
    time.sleep(5)
    print("服務器啓動完成!")
    e.set()

def connect():
    for i in range(1,4):
        print("第%s次嘗試連接" % i)
        e.wait(1)
        if e.isSet():
            print("連接成功")
            break
        else:
            print("第%s次連接失敗" % i)
    else:
        print("服務器未啓動!")

Thread(target=start).start()
Thread(target=connect).start()
# Thread(target=connect).start()
」「」

  ##協程編程

###協程理論基礎###
###一、什麼是協程
協程:本質是單線程實現併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

###二、爲何使用協程,如何可以實現併發呢
在Cpython多線程沒法並行執行,在IO密集任務中,而且併發量較大,沒法開啓更多的線程時,形成後續的任務沒法處理,即便前面都在等待IO

協程就可使用單線程實現併發,當某一個任務處於IO阻塞時,就能夠切換到其餘的任務來執行
併發 = 切換任務+保存狀態,只要找到一種方案,可以在兩個任務之間切換執行而且保存狀態,那就能夠實現單線程併發 python中的生成器就具有這樣一個特色,每次調用next都會回到生成器函數中執行代碼,這意味着任務之間能夠切換,而且是基於上一次運行的結果,這意味着生成器會自動保存執行狀態! ###三、對比操做系統控制線程的切換,用戶在單線程內控制協程的切換 1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行) 2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關) ###四、優缺點 #優勢: 1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級 2. 單線程內就能夠實現併發的效果,最大限度地利用cpu時間片,能夠佔用CPU直到超時 #缺點: 1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程 2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程 ###五、總結協程特色: 必須在只有一個單線程裏實現併發 修改共享數據不需加鎖 用戶程序裏本身保存多個控制流的上下文棧 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制)) ###六、yiled實現併發效果 # 使用生成器來實現 單線 併發多個任務 import time def func1(): a = 1 for i in range(10000000): a += 1 print("a run") time.sleep(10) yield def func2(): res = func1() a = 1 for i in range(10000000): a += 1 print("b run") next(res) st = time.time() func2() print(time.time() - st) ### 單線程下串行執行兩個計算任務 效率反而比並發高 由於併發須要切換和保存 # def func1(): # a = 1 # for i in range(10000000): # a += 1 # # # def func2(): # a = 1 # for i in range(10000000): # a += 1 # # # st = time.time() # func1() # func2() # print(time.time() - st) ###七、greentlet模塊實現併發 「」「 使用yield來切換是的代碼結構很是混亂,若是十個任務須要切換呢,不敢想象!所以就有人專門對yield進行了封裝,這便有了greenlet模塊該模塊簡化了yield複雜的代碼結構,實現了單線程下多任務併發,可是不管直接使用yield仍是greenlet都不能檢測IO操做,遇到IO時一樣進入阻塞狀態,因此此時的併發是沒有任何意義的。 如今咱們須要一種方案 便可檢測IO 又可以實現單線程併發,因而gevent閃亮登場 」「」 def task1(name): print("%s task1 run1" % name) g2.switch(name) # 切換至任務2 print("task1 run2") g2.switch() # 切換至任務2 def task2(name): print("%s task2 run1" % name) g1.switch() # 切換至任務1 print("task2 run2") g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch("jerry") # 爲任務傳參數 ###八、Gevent 模塊實現單線程併發 「」「 #8-1 Gevent模塊的介紹 Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。 #8-2Gevent模塊的用法 #用法 g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合做一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值 #8-3 Gevent模塊默認不識別IO阻塞的如(time.sleep(3)),只能默認是識別自身的gevent.sleep(2),咱們就引入了猴子補丁用法 import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('') ###8-4 Gevent模塊:猴子補丁,就能夠識別IO阻塞,注意:猴子補丁語句只能放在文件開頭,咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程 #總結 1.若是主線程結束了 協程任務也會當即結束。 2.monkey補丁的原理是把原始的阻塞方法替換爲修改後的非阻塞方法,即偷樑換柱,來實現IO自動切換 ​ 必須在打補丁後再使用相應的功能,避免忘記,建議寫在最上方 ##8-4 示例一 # gevent 不具有檢測IO的能力 須要爲它打補丁 打上補丁以後就能檢測IO # 注意補丁必定打在最上面 必須保證導入模塊前就打好補丁 from gevent import monkey;monkey.patch_all() import gevent from threading import current_thread import time def task1(): print(current_thread(),1) print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print(current_thread(),2) print("task2 run") print("task2 over") # spawn 用於建立一個協程任務 g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) # 任務要執行,必須保證主線程沒掛 由於全部協程任務都是主線在執行 ,必須調用join來等待協程任務 # g1.join() # g2.join() # 理論上等待執行時間最長的任務就行 , 可是不清楚誰的時間長 能夠所有join gevent.joinall([g1,g2]) print("over") ##8-4 示例二 import gevent,sys from gevent import monkey # 導入monkey補丁 monkey.patch_all() # 打補丁 import time print(sys.path) def task1(): print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print("task2 run") # gevent.sleep(1) time.sleep(1) print("task2 over") g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) #gevent.joinall([g1,g2]) g1.join() g2.join() # 執行以上代碼會發現不會輸出任何消息 # 這是由於協程任務都是以異步方式提交,因此主線程會繼續往下執行,而一旦執行完最後一行主線程也就結束了, # 致使了協程任務沒有來的及執行,因此這時候必須join來讓主線程等待協程任務執行完畢 也就是讓主線程保持存活 # 後續在使用協程時也須要保證主線程一直存活,若是主線程不會結束也就意味着不須要調用join 」「」#####最終的解決方案多進程+單線程+ 協程面試問題:如何是實現高併發?答:先分析是什麼任務,若是是計算密集型,開多進程,那爲何開多進程,由於Cpython存在GIL鎖,集羣(全部服務器乾的都是一件事)或者分佈式(每一個服務器就幹一種活)
相關文章
相關標籤/搜索