python自動化開發-[第十天]-線程、協程、socketserver

今日概要python

  一、線程mysql

  二、協程web

  三、socketserverredis

  四、基於udp的socket(見第八節)sql

1、線程

  一、threading模塊編程

    第一種方法:實例化 數組

import threading
import time

#第一種方法實例化
def sayhi(num):
    print('running on num %s' %(num))
    time.sleep(3)


if __name__ == '__main__':
    t1 = threading.Thread(target=sayhi,args=(1,))  #實例化線程
    t2 = threading.Thread(target=sayhi, args=(1,))

    t1.start()  #執行start方法
    t2.start()
    print(threading.active_count())  #統計線程數量
    print(t1.getName())
    print(t2.getName())

    第二種方法類的繼承安全

import threading
import time

#第二種方法類的繼承

class MyThread(threading.Thread):
    def __init__(self,num):
        super().__init__()
        self.num = num

    def run(self):
        print('running on num %s' % (self.num))
        print('start time : %s'%(time.time()))
        time.sleep(3)
        print('end time : %s'%(time.time()))
if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

  二、在一個進程下開啓多個線程與在一個進程下開啓多個子進程的區別服務器

    誰的開啓速度快? 開啓線程的速度快多線程

from threading import Thread
from multiprocessing import Process


def talk(name):
    print ('%s is talking' %name)



if __name__ == '__main__':
    t = Thread(target=talk,args=('dragon',))
    t.start()
    print ('主線程')

    '''
    輸出:
    dragon is talking
    主線程
    '''
    p = Process(target=talk,args=('dragon',))
    p.start()
    print('主線程')
    '''
    輸出:
    主線程
    dragon is talking
    '''

    看一看pid? 線程的pid和當前進程同樣,進程的pid各自獨立

from threading import Thread
from multiprocessing import Process
import os

def talk():
    print ('%s is talking' %os.getpid())



if __name__ == '__main__':
    t = Thread(target=talk)
    t1 = Thread(target=talk)
    t.start()
    t1.start()
    print('主線程pid: %s' %os.getpid())

    '''
    21592 is talking
    21592 is talking
    主線程pid: 21592
    '''

    p = Process(target=talk)
    p1 = Process(target=talk)
    p.start()
    p1.start()
    print('主線程pid: %s' % os.getpid())
    
    '''
    主線程pid: 21724
    21460 is talking
    20496 is talking
    '''
from socket import *
from threading import Thread

def server(ip,port):
    s = socket(AF_INET,SOCK_STREAM)
    s.bind((ip,port))
    s.listen(5)
    while True:   #連接循環變成多線程
        conn,addr = s.accept()
        t = Thread(target=talk,args=(conn,addr))
        t.start()

def talk(conn,addr):
    '''通訊循環'''
    try:
        while True:
            res = conn.recv(1024)
            if not res :break
            print('client %s:%s msg:%s' % (addr[0], addr[1], res))
            conn.send(res.upper())
    except Exception:
        pass

    finally:
        conn.close()
if __name__ == '__main__':
    server('127.0.0.1',9999)
多線程服務端
#!/usr/bin/python
# -*- coding:utf-8 -*-

from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',9999))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    res=c.recv(1024)
    print(res.decode('utf-8'))
客戶端

 

  例子:

    三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件

from threading import Thread
msg_list = []
format_list = []

def talk(): 
    while True:
        user_input = input('-> : ').strip()
        if len(user_input) == 0 : continue
        msg_list.append(user_input)

def format():
    while True:
        if msg_list:
            res = msg_list.pop()
            res = res.upper()
            format_list.append(res)

def save():
    while True:
        if format_list:
            res = format_list.pop()
            with open('db.txt','a',encoding='utf-8') as f :
                f.write('%s\n' %(res))


if __name__ == '__main__':
    t1 = Thread(target=talk)
    t2 = Thread(target=format)
    t3 = Thread(target=save)
    t1.start()
    t2.start()
    t3.start()
多線程例子

 

  三、線程的join和setdaemon

from threading import Thread
import threading
import time
def talk():
    time.sleep(1)
    print ('%s is talk' %threading.current_thread().getName())


if __name__ == '__main__':
    t = Thread(target=talk)
    t.setDaemon(True)  #設置守護線程的守護進程
    t.start()
    t.join()
    print ('主線程')
    print (t.is_alive())

  四、線程的其餘用法

     thread實例對象的方法

     isAlive(): 返回線程是否活動的。

     getName(): 返回線程名。
     setName(): 設置線程名。
  threading模塊提供的一些方法:
     threading.currentThread(): 返回當前的線程變量。
     threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
     threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啓線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName()) #主線程的名字
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

'''
MainThread
<_MainThread(MainThread, started 23516)>
[<_MainThread(MainThread, started 23516)>, <Thread(Thread-1, started 26208)>]
2
主線程/主進程
Thread-1
'''

  五、python解釋器GIL

    在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點,若是不加GIL鎖,解釋器級別如垃圾回收,就會出現混亂,因爲同一份資源多個線程間的競爭

    首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL

    參考鏈接:http://www.dabeaz.com/python/UnderstandingGIL.pdf  這篇文章透徹的剖析了GIL對python多線程的影響

  

  cpu究竟是用來作計算的,仍是用來作I/O的?

    1. 多cpu,意味着能夠有多個核並行完成計算,因此多核提高的是計算性能

    2. 每一個cpu一旦遇到I/O阻塞,仍然須要等待,因此多核對I/O操做沒什麼用處

    一個工人至關於cpu,此時計算至關於工人在幹活,I/O阻塞至關於爲工人幹活提供所需原材料的過程,工人幹活的過程當中若是沒有原材料了,則工人幹活的過程須要中止,直到等待原材料的到來。

    若是你的工廠乾的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的工人,意義也不大,還不如一我的,在等材料的過程當中讓工人去幹別的活,

    反過來說,若是你的工廠原材料都齊全,那固然是工人越多,效率越高

    結論:

      對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用

      固然對於一個程序來講,不會是純計算或者純I/O,咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,從而進一步分析python的多線程有無用武之地

    分析:

    咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,解決方案能夠是:

      方案一:開啓四個進程

      方案二:一個進程下,開啓四個線程

      單核狀況下,分析結果: 

        若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝

        若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝

      多核狀況下,分析結果:

      若是四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝

      若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

    結論:如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。

 

      測試:計算密集型,多進程勝出

#計算密集型
from threading import Thread
from multiprocessing import Process
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()

    for i in range(100):
        #t=Thread(target=work) #在個人機器上,8核cpu,多線程 7.210500001907349s
        t=Process(target=work) #在個人機器上,8核cpu,多進程 3.642500162124634s
        t_l.append(t)
        t.start()
    for i in t_l:
        i.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    print('主線程')

    測試:io密集型,多線程勝出 

from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模擬I/O操做,能夠打開一個文件來測試I/O,與sleep是一個效果
    # print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(500):
        #t=Thread(target=work) #2.0440001487731934s 耗時
        t=Process(target=work) #11.970999956130981s 耗時
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print ('run time is %s' %(stop_time-start_time))

    多線程用於IO密集型,如socket,爬蟲,web
    多進程用於計算密集型,如金融分析  

  六、同步鎖

import time
import threading

def addNum():
    global num #在每一個線程中都獲取這個全局變量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 對此公共變量進行-1操做

num = 100  #設定一個共享變量

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部線程執行完畢
    t.join()

print('Result: ', num)

  以上是不加鎖的狀況,返回值會是99,同時或者num爲100進行-1的動做

  七、互斥鎖

  不加鎖的狀況下,會致使同一份資源互相搶佔

n = 100
def work():
    global  n
    temp = n
    time.sleep(0.001)
    n = temp-1


if __name__ == '__main__':
        t_l=[]
        for i in range(100):
            t=Thread(target=work)
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
        print(n)

'''
結果不爲0

'''

  加上互斥鎖,犧牲了性能,保護了數據安全

from threading import Thread,Lock
import time
n=100
def work():
    with mutex1:
        global n
        temp=n
        time.sleep(0.01)
        n=temp-1

if __name__ == '__main__':
    mutex1=Lock()
    t_l=[]
    for i in range(100):
        t=Thread(target=work)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(n)

'''
結果爲0
'''

  八、GIL和LOCK的區別  

    鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據

     而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖。

  GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock    

  python的gil詳解:

  由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這能夠說是Python早期版本的遺留問題。 

  九、死鎖和遞歸鎖

  死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import Thread,Lock,RLock
import time

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutex.acquire()
        print('\033[45m %s 拿到A鎖\033[0m' %(self.name))
        mutex1.acquire()
        print('\033[45m %s 拿到B鎖\033[0m' % (self.name))
        mutex1.release()
        mutex.release()

    def f2(self):
        mutex1.acquire()
        time.sleep(1)
        print('\033[45m %s 拿到B鎖\033[0m' %(self.name))
        mutex.acquire()
        print('\033[45m %s 拿到A鎖\033[0m' % (self.name))
        mutex.release()
        mutex1.release()

if __name__ == '__main__':
    mutex = Lock()     #定義鎖對象
    mutex1 = Lock()
    #mutex = mutex1 = RLock() #定義遞歸鎖對象,遞歸鎖跟計數器一個原理,只有計數器爲0的時候,其餘線程才能進行加鎖
'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
Thread-1 拿到B鎖

卡主死鎖了,線程1拿到B鎖在等A鎖釋放,線程2拿到A鎖在等B鎖釋放,兩個鎖進入相互等待的狀況,產生了死鎖
'''

  遞歸鎖:

  遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

  這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖: 

 mutex1=mutex2=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

  十、信號量Semahpore

    信號量相似一個令牌池

    Semaphore管理一個內置的計數器,
    每當調用acquire()時內置計數器-1;
    調用release() 時內置計數器+1;
    計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

from threading import Thread,Semaphore
import time
def work(id):
    with sem:
        time.sleep(2)
        print('%s say hello' %(id))



if __name__ == '__main__':
    sem = Semaphore(5)
    for i in range(20):
        t = Thread(target=work,args=(i,))
        t.start()

    與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程,每次只能執行4個,其餘的都在阻塞狀態

  十一、事件event

     線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

    參數介紹:

      

event.isSet():返回event的狀態值;

event.wait():若是 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。

    例子:

#!/usr/bin/python
# -*- coding:utf-8 -*-

import threading
from threading import Thread,Event
import time

def conn_mysql(): #一堆執行程序線程
    print('%s is waiting' %(threading.current_thread().getName()))
    e.wait()
    print('%s is starting' % (threading.current_thread().getName()))
    time.sleep(2)

def check_mysql():  #一個檢查mysql狀態的線程
    print('%s is checking' % (threading.current_thread().getName()))
    time.sleep(5)
    e.set()

if __name__ == '__main__':
    e = Event()
    t1 = Thread(target=conn_mysql)
    t2 = Thread(target=conn_mysql)
    t3 = Thread(target=conn_mysql)
    t4 = Thread(target=check_mysql)

    t1.start()
    t2.start()
    t3.start()
    t4.start()


'''
輸出:
Thread-1 is waiting
Thread-2 is waiting
Thread-3 is waiting
Thread-4 is checking
Thread-1 is starting
Thread-2 is starting
Thread-3 is starting 

'''

  

 

 

    能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()
'''
輸出:
(t1        ) Waiting for redis ready...
(t2        ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t1        ) redis ready, and connect to redis server and do some work [Mon Jul  3 19:17:51 2017]
(t2        ) redis ready, and connect to redis server and do some work [Mon Jul  3 19:17:51 2017]

'''
redis示例

 

     threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:

from threading import Event,Thread
import threading
import time
def conn_mysql():
    count = 0
    while not e.is_set():
        print('%s 第 <%s> 次嘗試' % (threading.current_thread().getName(), count))
        count += 1
        e.wait(0.5)
    print('%s ready to conn mysql' % threading.current_thread().getName())
    time.sleep(1)

def check_mysql():
    print('%s checking...' %threading.current_thread().getName())
    time.sleep(4)
    e.set()
if __name__ == '__main__':
    e=Event()

    c1=Thread(target=conn_mysql)
    c2=Thread(target=conn_mysql)
    c3=Thread(target=conn_mysql)

    c4=Thread(target=check_mysql)
    c1.start()
    c2.start()
加上重試mysql
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    while not event.is_set():
        print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName())
        event.wait(0.1)
    print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName())


def check_mysql():
    print('\033[41m正在檢查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql)
    t2=Thread(target=conn_mysql)
    t3=Thread(target=check_mysql)

    t1.start()
    t2.start()
    t3.start()
修改以後的mysql

  應用:鏈接池

  十二、condition(條件)

    使得線程等待,只有知足某條件時,才釋放n個線程

import threading
import time

lock = threading.Condition()

def task(arg):

    time.sleep(1)
    lock.acquire()
    lock.wait()
    print('進程%s'%(arg))
    lock.release()

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()

while True:
    user_input = input('--->').strip()
    if user_input == 'q':break
    lock.acquire()
    lock.notify(int(user_input))
    lock.release()
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
另一種寫法

 

   1三、線程queue

    queue隊列 :使用import queue,用法與進程Queue同樣

import queue

q = queue.Queue(3) #先進先出  --> 隊列

q.put('first')
q.put('second')
q.put((1,2,3,4))


print(q.get())
print(q.get())
print(q.get())


q = queue.LifoQueue() #先進後出 -> 堆棧

q.put('first')
q.put('second')
q.put((1,2,3,4))


print(q.get())
print(q.get())
print(q.get())

q = queue.PriorityQueue()  #按照優先級輸出,數字越小優先級越高
q.put((10,'first'))
q.put((3,'second'))
q.put((5,(1,2,3,4)))


print(q.get())
print(q.get())
print(q.get())

2、協程

  協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

  須要強調的是:

    1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)

    2. 單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換

  對比操做系統控制線程的切換,用戶在單線程內控制協程的切換,優勢以下:

    1.  協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級

    2. 單線程內就能夠實現併發的效果,最大限度地利用cpu

  

  要實現協程,關鍵在於用戶程序本身控制程序切換,切換以前必須由用戶程序本身保存協程上一次調用時的狀態,如此,每次從新調用時,可以從上次的位置繼續執行

  (詳細的:協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧)

  以前已經學過一個在單線程下能夠保存程序的運行狀態,即yield:

    1.yiled能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級

    2.send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換 

 

#!/usr/bin/python
# -*- coding:utf-8 -*-
import time
def consumer(item):
    x =1
    y =2
    c = 3
    d =4
    a = 'sadsaddsdadasd'
    pass

def producer(target,seq):

    for item in seq:
        target(item)

start_time = time.time()

producer(consumer,range(10000000))

stop_time = time.time()
print ('cost time is %s' %(stop_time-start_time))  #1.9440789222717285 s

def consumer():
    x =1
    y =2
    c = 3
    d =4
    a = 'sadsaddsdadasd'
    while True:
        item = yield
def producer(target,seq):

    for item in seq:
        target.send(item)

g = consumer()
next(g)

start_time = time.time()

producer(g,range(10000000))

stop_time = time.time()
print ('cost time is %s' %(stop_time-start_time)) #1.7285699844360352


#用yield協程函數,執行的快的緣由,不用yield須要不斷的開闢內存,垃圾回收, 而yield只開闢一次內存地址,節省了這一塊的時間

'''
cost time is 1.9440789222717285
cost time is 1.7285699844360352
''' 

  缺點:

    協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程

    協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

  協程的定義(知足1,2,3就可稱爲協程):

    1. 必須在只有一個單線程裏實現併發
    2. 修改共享數據不需加鎖
    3. 用戶程序裏本身保存多個控制流的上下文棧
    4. 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

  yield切換在沒有io的狀況下或者沒有重複開闢內存空間的操做,對效率沒有什麼提高,甚至更慢,爲此,能夠用greenlet來爲你們演示這種切換

3、greenlet模塊

  greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

#!/usr/bin/python
# -*- coding:utf-8 -*-
from greenlet import greenlet
 
def test1():
    print('test1,first')
    gr2.switch()
    print('test1,sencod')
    gr2.switch()
def test2():
    print('test2,first')
    gr1.switch()
    print('test2,sencod')
 
 
gr1=greenlet(test1)
gr2=greenlet(test2)
gr1.switch()

 

import time
from greenlet import greenlet
def eat(name):
    print('%s eat food 1' %name)
    gr2.switch('alex飛飛飛')
    print('%s eat food 2' %name)
    gr2.switch()
def play_phone(name):
    print('%s play 1' %name)
    gr1.switch()
    print('%s play 2' %name)

gr1=greenlet(eat)
gr2=greenlet(play_phone)
gr1.switch(name='egon啦啦啦')#能夠在第一次switch時傳入參數,之後都不須要


'''
egon啦啦啦 eat food 1
alex飛飛飛 play 1
egon啦啦啦 eat food 2
alex飛飛飛 play 2


'''
第一次傳入能夠加參數

  單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度

#順序執行
import time
def f1():
    res=0
    for i in range(10000000):
        res+=i

def f2():
    res=0
    for i in range(10000000):
        res*=i


start_time=time.time()
f1()
f2()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #1.7395639419555664


#切換
from greenlet import greenlet
import time
def f1():
    res=0
    for i in range(10000000):
        res+=i
        gr2.switch()


def f2():
    res=0
    for i in range(10000000):
        res*=i
        gr1.switch()

gr1=greenlet(f1)
gr2=greenlet(f2)

start_time=time.time()
gr1.switch()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #7.789067983627319

   greenlet只是提供了一種比generator更加便捷的切換方式,仍然是沒有解決遇到IO自動切換的問題

4、gevent模塊 

  Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。 

  g1=gevent.spawn()建立一個協程對象g1,

  spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的

#!/usr/bin/python
# -*- coding:utf-8 -*-

from gevent import monkey;monkey.patch_all()  #這是gevent給其餘模塊打補丁,否則別的模塊使用gevent就變成串行
import gevent
import time


def eat(name):
    print('%s is eat first' %(name))
    time.sleep(4)  #等飯來
    #gevent.sleep(1)
    print('%s is eat second' %(name))


def play(name):
    print('%s is play 1' %(name))
    time.sleep(3) #模擬網卡了
    #gevent.sleep(2)
    print('%s is play 2' %(name))



if __name__ == '__main__':
    g1= gevent.spawn(eat,'alex')
    g2= gevent.spawn(play,name='alex')
    # 於是也須要join方法,進程或現場的jion方法只能join一個,而gevent的join方法能夠join多個
    g1.join()
    g2.join()
    print ('主線程') 

  上例gevent.sleep模擬的是gevent能夠識別的io阻塞,而time.sleep或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了

  from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前

  或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭

  

from gevent import monkey;monkey.patch_all()

import gevent
import time


def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play_phone():
    print('play phone 1')
    time.sleep(1)
    print('play phone 2')


g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
用第三方的time

  同步或者異步

import gevent
 
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print('Synchronous:')
synchronous()
 
print('Asynchronous:')
asynchronous()


'''
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done

'''

   上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。  

#gevent線程的一些用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

   

#!/usr/bin/python
# -*- coding:utf-8 -*-

from gevent import monkey;monkey.patch_all()
import gevent
import time
import requests


def get_page(url):
    print ('get page %s' %url)
    response = requests.get(url)
    if response.status_code == 200:
        print (response.text)


start_time = time.time()

# get_page('http://www.baidu.com')
# get_page('http://www.python.org')
# get_page('http://www.yahoo.com')

g1 = gevent.spawn(get_page,url='http://www.baidu.com')
g2 = gevent.spawn(get_page,url='http://www.python.org')
g3 = gevent.spawn(get_page,url='http://www.yahoo.com')

gevent.joinall([
    g1,
    g2,
    g3
])

stop_time = time.time()

print ('run time is %s ' %(stop_time-start_time))
協程應用-爬蟲

  經過gevent實現單線程下的socket併發(from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞)

 

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)
單線程實現併發服務端
from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM)
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()
多線程併發多個client

5、socketserver實現併發

  基於tcp的套接字,關鍵就是兩個循環,一個連接循環,一個通訊循環socketserver模塊中分兩大類:server類(解決連接問題)和request類(解決通訊問題)

  server類:

  

  request類:

  

  繼承類關係:

      線程繼承關係

   

     進程的繼承關係

    run方法繼承關係

  如下述代碼爲例,分析socketserver源碼:

  ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
  ftpserver.serve_forever()

  查找屬性的順序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer

    1. 實例化獲得ftpserver,先找類ThreadingTCPServer的__init__,在TCPServer中找到,進而執行server_bind,server_active
    2. 找ftpserver下的serve_forever,在BaseServer中找到,進而執行self._handle_request_noblock(),該方法一樣是在BaseServer中
    3. 執行self._handle_request_noblock()進而執行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),而後執行self.process_request(request, client_address)
    4. 在ThreadingMixIn中找到process_request,開啓多線程應對併發,進而執行process_request_thread,執行self.finish_request(request, client_address)
    5. 上述四部分完成了連接循環,本部分開始進入處理通信部分,在BaseServer中找到finish_request,觸發咱們本身定義的類的實例化,去找__init__方法,而咱們本身定義的類沒有該方法,則去它的父類也就是BaseRequestHandler中找....

  源碼分析總結:

  基於tcp的socketserver咱們本身定義的類中的

    1. self.server即套接字對象
    2. self.request即一個連接
    3. self.client_address即客戶端地址

  基於udp的socketserver咱們本身定義的類中的

    1. self.request是一個元組(第一個元素是客戶端發來的數據,第二部分是服務端的udp套接字對象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)  
    2. self.client_address即客戶端地址
    3. self.request 是在UDPServer類下的get_request函數
    4. data, client_addr = self.socket.recvfrom(self.max_packet_size)
#!/usr/bin/python
# -*- coding:utf-8 -*-

import socketserver
#MyHandler(conn, client_address, s)
class MyHandler(socketserver.BaseRequestHandler): #通信循環
    def handle(self):
        while True:
            res=self.request.recv(1024)
            print('client %s msg:%s' %(self.client_address,res))
            self.request.send(res.upper())

if __name__ == '__main__':

    s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyHandler)
    s.serve_forever() #連接循環

  基於udp的socketserver

#!/usr/bin/python
# -*- coding:utf-8 -*-

import socketserver

class MyUDPhandler(socketserver.BaseRequestHandler):
    def handle(self):
        client_msg,s=self.request
        s.sendto(client_msg.upper(),self.client_address)

if __name__ == '__main__':
    s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler)
    s.serve_forever()  

socketserver 例子

from SocketServer import TCPServer, ThreadingMixIn, StreamRequestHandler
 
#定義支持多線程的服務類,注意是多繼承
class Server(ThreadingMixIn, TCPServer): pass
 
#定義請求處理類
class Handler(StreamRequestHandler):
	
	def handle(self):
		addr = self.request.getpeername()
		print 'Got connection from ',addr
		self.wfile.write('Thank you for connection')
 
server = Server(('', 1234), Handler)	#實例化服務類
server.serve_forever()	#開啓服務
相關文章
相關標籤/搜索