Python全棧之路-Day40

1 互斥鎖(同步鎖)

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

import time
import threading

def addNum():
    global num     # 在每一個線程中都獲取這個全局變量
    Lock.acquire() # 每次只能有一個線程在運行Lock塊的代碼
    temp = num
    time.sleep(0.01)
    num = temp - 1    # 對此公共變量進行-1操做
    Lock.release()

num = 100  # 設定一個共享變量

Lock = threading.Lock()  # 定義同步鎖
thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部線程執行完畢
    t.join()

print('Result: ', num)

2 死鎖與遞歸鎖

2.1 死鎖

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9


import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()

        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

2.2 遞歸鎖

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9


import threading
import time

RLock = threading.RLock()  # 得到遞歸鎖,能夠acquire屢次

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        RLock.acquire()  # acquire計數加1
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        RLock.acquire()  # acquire計數加1
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        RLock.release()  # acquire計數減1
        RLock.release()  # acquire計數減1  計數爲0後其餘線程就能夠競爭這把鎖

    def fun2(self):
        RLock.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)
        RLock.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        RLock.release()
        RLock.release()

if __name__ == "__main__":
    print("start---------------------------%s"%time.time())
    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

3 event對象

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行python

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()  # 完成線程間的通訊 至關於標誌位
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()

# event.isSet():返回event的狀態值;
# event.wait():若是 event.isSet()==False將阻塞線程;
# event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
# event.clear():恢復event的狀態值爲False。

4 進程對象Process

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)

def foo(name):

    info(name)

if __name__ == '__main__':

    info('main process line')


    p1 = Process(target=info, args=('alvin',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")

5 協程簡介

5.1 yield與協程

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

import time

"""
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。
若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。
"""
# 注意到consumer函數是一個generator(生成器):
# 任何包含yield關鍵字的函數都會自動成爲生成器(generator)對象

def consumer():
    r = ''
    while True:
        # 三、consumer經過yield拿到消息,處理,又經過yield把結果傳回;
        #    yield指令具備return關鍵字的做用。而後函數的堆棧會自動凍結(freeze)在這一行。
        #    當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,
        #    就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。經過這種方式,迭代器能夠實現無限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    # 一、首先調用c.next()啓動生成器
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 二、而後,一旦生產了東西,經過c.send(n)切換到consumer執行;
        cr = c.send(n)
        # 四、produce拿到consumer處理的結果,繼續生產下一條消息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 五、produce決定不生產了,經過c.close()關閉consumer,整個過程結束。
    c.close()
if __name__=='__main__':
    # 六、整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。
    c = consumer()
    produce(c)


'''
result:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''

5.2 greentlet

greelet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍後使用next()或send()操做進行恢復爲止。可使用一個調度器循環在一組生成器函數之間協做多個任務。greentlet是python中實現咱們所謂的"Coroutine(協程)"的一個基礎庫.git

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

from greenlet import greenlet

def test1():
    print (12)
    gr2.switch()
    print (34)
    gr2.switch()

def test2():
    print (56)
    gr1.switch()
    print (78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

5.3 gevent

Python經過yield提供了對協程的基本支持,可是不徹底。而第三方的gevent爲Python提供了比較完善的協程支持。github

gevent是第三方庫,經過greenlet實現協程,其基本思想是:redis

當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。網絡

因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:app

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9


import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)

固然,實際代碼裏,咱們不會用gevent.sleep()去切換協程,而是在執行到IO操做時,gevent自動切換,代碼以下:函數

#!/usr/bin/env python
# __Author__: "wanyongzhen"
# Date: 2017/5/9

from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time

import ssl
ssl._create_default_https_context = ssl._create_unverified_context # 解決Mac上的報錯
def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

start=time.time()

gevent.joinall([
        gevent.spawn(f, 'https://itk.org/'),
        gevent.spawn(f, 'https://www.github.com/'),
        gevent.spawn(f, 'https://zhihu.com/'),
])

# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')

print(time.time()-start)
相關文章
相關標籤/搜索