day9-協程

生產者和消費者模型:

#!/usr/bin/env  python
#coding:utf8

import threading,Queue
import time
import random


def producer(name,n):#生產者
    # while True:#無限循環生產包子
    time.sleep(random.randrange(3))#random生成一個3之內不包含3的隨機數,經過隨機數決定等待多長時間,主要便於測試
    if q.qsize()<4:#判斷隊列個數若是小於4程序繼續往下執行,#包子剩餘的個數若是小於4個才生產包子,避免浪費.每一個廚師在生產包子都會看還剩餘幾個
        for i in range(2):#每一個廚師生產2個包子上傳到隊列
            print '%s生產了[%d]個包子\n'%(name,n)
            q.put(n)
        q.join()#隊列的個數爲空則阻塞.#由於消費者每吃完一個包子都會告訴廚師,當全部包子都吃完廚師繼續生產包子.#就是繼續下一次循環
        print '包子已賣光了,[%d]'%q.qsize()


def consumer(name,n):#消費者
    while True:#無限循環吃包子
        print '%s 吃了[%d]個包子\n'%(name,n)
        q.get()#吃掉一個包子從隊列減1
        time.sleep(1)#每一個消費者吃掉一個包子的時候等待1秒
        q.task_done()#每一個消費者吃掉一個包子通知隊列(廚師)



if __name__=='__main__':
    q=Queue.Queue()
    c_name=['z1','z2','z3','z4']#4個消費者
    p_name=['p1','p2']#2個廚師

    for name in p_name:
        p=threading.Thread(target=producer,args=[name,1,])#開啓2個線程調用producer函數,#2個廚師同時生產包子
        p.start()#開啓線程,線程的開關

    for name in c_name:
        c=threading.Thread(target=consumer,args=[name,1,])#開啓4個線程調用consumer函數,#4個消費者同時吃包子
        c.start()#開啓線程,線程的開關

 執行結果:python

 
 

p1生產了[1]個包子git

p1生產了[1]個包子程序員

z1 吃了[1]個包子github

z4 吃了[1]個包子
z1 吃了[1]個包子編程

包子已賣光了,[0]

數組

 

 

協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程安全

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:服務器

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。網絡

 

協程的好處:多線程

  • 無需線程上下文切換的開銷
  • 無需原子操做鎖定及同步的開銷
  • 方便切換控制流,簡化編程模型
  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:

  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子    

import time
import queue
def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        #time.sleep(1)
 
def producer():
 
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n +=1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
 
 
if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

 

Greenlet

#!/usr/bin/env python
# -*- coding:utf-8 -*-
  
  
from greenlet import greenlet
  
  
def test1():
    print 12
    gr2.switch()
    print 34
    gr2.switch()
  
  
def test2():
    print 56
    gr1.switch()
    print 78
  
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

 

  

Gevent 

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

import gevent
 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

 

輸出:

Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar 

同步與異步的性能區別 

import gevent
 
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print('Synchronous:')
synchronous()
 
print('Asynchronous:')
asynchronous()

 

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。  

遇到IO阻塞時會自動切換任務

from gevent import monkey; monkey.patch_all()
import gevent
from  urllib.request import urlopen
 
def f(url):
    print('GET: %s' % url)
    resp = urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))
 
gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

 

 

經過gevent實現單線程下的多socket併發

server side 

import sys
import socket
import time
import gevent
 
from gevent import socket,monkey
monkey.patch_all()
def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)
def handle_request(s):
    try:
        while True:
            data = s.recv(1024)
            print("recv:", data)
            s.send(data)
            if not data:
                s.shutdown(socket.SHUT_WR)
 
    except Exception as  ex:
        print(ex)
    finally:
 
        s.close()
if __name__ == '__main__':
    server(8001)

 

client side   

import socket
 
HOST = 'localhost'    # The remote host
PORT = 8001           # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"),encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    #print(data)
 
    print('Received', repr(data))
s.close()

 

  

論事件驅動與異步IO

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

 

在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

  1. 程序中有許多任務,並且…
  2. 任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
  3. 在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

 select-server端代碼:

#!/usr/bin/env  python
#coding:utf8
import select
import socket
import time
import sys

import Queue#用於存放客服端發送過來的消息

server_ip=('0.0.0.0',9003)#定義元組,服務器IP,端口
sk=socket.socket()#實例化socket模塊的socket類建立一個對象爲sk
sk.bind(server_ip)#調用sk對象中的bind方法,傳入參數.綁定IP和端口
sk.listen(20)#server端容許的最大鏈接數
sk.setblocking(False)#遇到IO的時候不阻塞
inputs=[sk,]#定義一個列表存放服務端和客服端socket對象
outputs=[]#定義一個列表存放客服端socket對象
message={}#定義一個字典存放"{客服端socket對象:隊列}",隊列中是放的客服端發送過來的消息



while True:
    """
    select一共能夠設置4個參數
    rList=inputs=[sk,客服端socket對象]
    wList=outputs[客服端socket對象]
    第三個參數異常信息
    第四個參數是超時時間,若是客服端沒有鏈接server端,0.5秒超時,程序會繼續往下執行
    程序第一次啓動的時候 rList=sk 感知server的變化,只有客服端鏈接過來server纔會變化
    若是客服端已經鏈接進來此時 inputs列表中至少存在2個元素 server的socket對象和client端的socket對象,select會遍歷列表中的每個元素並感知時候是否有變化
    若是有變化那麼知足條件select不會阻塞程序繼續向下執行
    若是select感知到rList發生變化,好比客服端給服務端發送消息,程序向下執行
    """
    rList,wList,error=select.select(inputs,outputs,inputs)#讀,寫,錯誤,超時時間
    #客服端鏈接過來,rList [<socket._socketobject object at 0x101445750>]
    for r in  rList:
        #若是rList有變化進入for循環,判斷r == <socket._socketobject object at 0x101445750>
        if r == sk:#
            conn,address=r.accept()#監聽客服端socket對象
            inputs.append(conn)#把客服端socket對象放入inputs列表中
            message[conn]=Queue.Queue()#message={socket對象:隊列}
            print address#打印客服端IP
        ####注視中客服端鏈接指的是客服端socket對象#####
        else:
            #若是r==客服端鏈接,前提條件是已經感知到客服端socket對象發生變化,程序纔會執行到此處
            #監聽客服端發送過來的數據
            data=r.recv(1024)
            if data:
                #若是有數據把客服端socket對象放入outputs列表中讓select感知wList的變化,也就是感知客服端是否發送過來消息,用於讀寫分離
                print data  #打印客服端發送過來的消息
                outputs.append(r)#把客服端鏈接添加到outputs列表
                message[r].put(data)#message[客服端鏈接].put(接受的數據),  message{客服端socket對象:客服端發送過來的消息上次到隊列}
            else:
                inputs.remove(r)#若是客服端異常斷開,刪除inputs列表中客服端socket對象
                del message[w]#若是客服端異常斷開,刪除message字典中客服端socket對象和客服端的消息隊列
                #客服端異常斷開的時候會發送空數據,此時在inputs列表中刪除客服端鏈接

    for w in wList:
        #select遍歷wList的時候感知到了變化,也就是服務端已經接受到客服端已經發送過來的消息了
        try:
            data=message[w].get_nowait()#message[客服端socket對象]獲取到消息隊列,最後獲得發送過來的消息,get_nowait若是從隊列中沒有獲取到數據也不會阻塞
            w.sendall(data)#發送數據給客服端
        #給客服端發送數據
        except Queue.Empty as e:#捕捉隊列是否爲空
            if message[w]:
                del message[w]#刪除message字典中客服端socket對象和客服端的消息隊列
        outputs.remove(w)#刪除outputs列表中客服端socket對象
        #刪除客服端鏈接

 

 

select-client端代碼:

import socket

server_ip=('127.0.0.1', 9003)
sk=socket.socket()
sk.connect(server_ip)


while True:
    data=raw_input('Please:').strip()
    if len(data) ==0:continue
    sk.sendall(data)
    server_response=sk.recv(1024)
    print server_response
相關文章
相關標籤/搜索