python小白-day8 線程、進程、協程

Python線程

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。html

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
import threading
import time
 
def show(arg):
     time.sleep( 1 )
     print ( 'thread' + str (arg))
 
for i in range ( 10 ):
     t = threading.Thread(target = show, args = (i,))
     t.start()
 
print ( 'main thread stop' )


上述代碼建立了10個「前臺」線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。python

更多方法:git

  • start            線程準備就緒,等待CPU調度程序員

  • setName      爲線程設置名稱github

  • getName      獲取線程名稱算法

  • setDaemon   設置爲後臺線程或前臺線程(默認)
                       若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止
                        若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止編程

  • join              逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義windows

  • run              線程被cpu調度後執行Thread類對象的run方法數組

Python GIL(Global Interpreter Lock) 

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。
緩存

線程鎖(互斥鎖Mutex)

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading
  
def addNum():
     global num #在每一個線程中都獲取這個全局變量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     num  - = 1 #對此公共變量進行-1操做
  
num = 100  #設定一個共享變量
thread_list = []
for i in range ( 100 ):
     t = threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
  
for t in thread_list: #等待全部線程執行完畢
     t.join()
  
  
print ( 'final num:' , num )


正常來說,這個num結果應該是0, 但在python 2.7上多運行幾回,會發現,最後打印出來的num結果不老是0,爲何每次運行的結果不同呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。 

*注:不要在3.x上運行,不知爲何,3.x上的結果老是正確的,多是自動加了鎖

加鎖版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
import threading
  
def addNum():
     global num #在每一個線程中都獲取這個全局變量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     lock.acquire() #修改數據前加鎖
     num  - = 1 #對此公共變量進行-1操做
     lock.release() #修改後釋放
  
num = 100  #設定一個共享變量
thread_list = []
lock = threading.Lock() #生成全局鎖
for i in range ( 100 ):
     t = threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
  
for t in thread_list: #等待全部線程執行完畢
     t.join()
  
print ( 'final num:' , num )

RLock(遞歸鎖)

說白了就是在一個大鎖中還要再包含子鎖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
 
def run1():
     print ( "grab the first part data" )
     lock.acquire()
     global num
     num + = 1
     lock.release()
     return num
def run2():
     print ( "grab the second part data" )
     lock.acquire()
     global  num2
     num2 + = 1
     lock.release()
     return num2
def run3():
     lock.acquire()
     res = run1()
     print ( '--------between run1 and run2-----' )
     res2 = run2()
     lock.release()
     print (res,res2)
 
if __name__ = = '__main__' :
 
     num,num2 = 0 , 0
     lock = threading.RLock()
     for i in range ( 10 ):
         t = threading.Thread(target = run3)
         t.start()
 
while threading.active_count() ! = 1 :
     print (threading.active_count())
else :
     print ( '----all threads done---' )
     print (num,num2)


Semaphore(信號量)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
def run(n):
     semaphore.acquire()
     time.sleep( 1 )
     print ( "run the thread: %s\n" % n)
     semaphore.release()
if __name__ = = '__main__' :
     num = 0
     semaphore  = threading.BoundedSemaphore( 3 ) #最多容許5個線程同時運行
     for i in range ( 20 ):
         t = threading.Thread(target = run,args = (i,))
         t.start()
while threading.active_count() ! = 1 :
     pass #print threading.active_count()
else :
     print ( '----all threads done---' )
     print (num)


event

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

  • clear:將「Flag」設置爲False

  • set:將「Flag」設置爲True

經過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
import random
def light():
     if not event.isSet():
         event. set () #wait就不阻塞 #綠燈狀態
     count = 0
     while True :
         if count < 10 :
             print ( '\033[42;1m--green light on---\033[0m' )
         elif count < 13 :
             print ( '\033[43;1m--yellow light on---\033[0m' )
         elif count < 20 :
             if event.isSet():
                 event.clear()
             print ( '\033[41;1m--red light on---\033[0m' )
         else :
             count = 0
             event. set () #打開綠燈
         time.sleep( 1 )
         count + = 1
 
def car(n): #no bug version
     while 1 :
         time.sleep( 1 )
         if  event.isSet(): #綠燈
             print ( "car [%s] is running.." % n)
         else :
             print ( "car [%s] is waiting for the red light.." % n)
             event.wait()
 
 
def car2(n):
     while 1 :
         time.sleep(random.randrange( 10 ))
         if  event.isSet(): #綠燈
             print ( "car [%s] is running.." % n)
         else :
             print ( "car [%s] is waiting for the red light.." % n)
 
if __name__ = = '__main__' :
     event = threading.Event()
     Light = threading.Thread(target = light)
     Light.start()
     for i in range ( 3 ):
         t = threading.Thread(target = car,args = (i,))
         t.start()


queue隊列 

Python中對隊列和線程的操做,須要使用模塊:Queue 和 threading。其中,Queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語,可以在多線程中直接使用。可使用隊列來實現線程間的同步。

生產者消費者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
 
import queue
import time
import threading
q = queue.Queue()
 
def consumer(num):
     while True :
         time.sleep( 1 )
         print ( 'consumer %s get task:%s' % (num,q.get()))
         q.task_done()
def producer(num):
     count = 1
     while True :
         print ( 'producer %s produced a new task:%s' % (num,count))
         q.put(count)
         count + = 1
         q.join()
         print ( 'all tasks has been consumer by consumers' )
 
 
c1 = threading.Thread(target = consumer,args = [ 1 ,])
c2 = threading.Thread(target = consumer,args = [ 2 ,])
c3 = threading.Thread(target = consumer,args = [ 3 ,])
 
p1 = threading.Thread(target = producer,args = [ 'hetan' ,])
p2 = threading.Thread(target = producer,args = [ 'liuyao' ,])
p3 = threading.Thread(target = producer,args = [ 'xxxx' ,])
 
c1.start()
c2.start()
c3.start()
p1.start()
p2.start()
p3.start()



Python 進程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def f(name):
     time.sleep( 2 )
     print ( 'hello' , name)
 
if __name__ = = '__main__' :
     p = Process(target = f, args = ( 'bob' ,))
     p2 = Process(target = f, args = ( 'bob' ,))
     p.start()
     p2.start()
     p.join()


進程間通信  

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法:

Queues

使用方法跟threading裏的queue差很少

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
from multiprocessing import Process, Queue
 
def f(q):
     q.put([ 42 , None , 'hello' ])
 
if __name__ = = '__main__' :
     q = Queue()
     p = Process(target = f, args = (q,))
     p.start()
     print (q.get())    # prints "[42, None, 'hello']"
     p.join()


Pipes

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe
 
def f(conn):
     conn.send([ 42 , None , 'hello' ])
     conn.close()
 
if __name__ = = '__main__' :
     parent_conn, child_conn = Pipe()
     p = Process(target = f, args = (child_conn,))
     p.start()
     print (parent_conn.recv())   # prints "[42, None, 'hello']"
     p.join()


Managers

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process, Manager
 
def f(d, l):
     d[ 1 ] = '1'
     d[ '2' ] = 2
     d[ 0.25 ] = None
     l.append( 1 )
     print (l)
 
if __name__ = = '__main__' :
     with Manager() as manager:
         d = manager. dict ()
 
         l = manager. list ( range ( 5 ))
         p_list = []
         for i in range ( 10 ):
             p = Process(target = f, args = (d, l))
             p.start()
             p_list.append(p)
         for res in p_list:
             res.join()
 
         print (d)
         print (l)


進程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process, Lock
  
def f(l, i):
     l.acquire()
     try :
         print ( 'hello world' , i)
     finally :
         l.release()
  
if __name__ = = '__main__' :
     lock = Lock()
  
     for num in range ( 10 ):
         Process(target = f, args = (lock, num)).start()


進程池  

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有兩個方法:

  • apply              同步,通常不用

  • apply_async   異步,通常用這個

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from  multiprocessing import Process,Pool,freeze_support
import time
  
def Foo(i):
     time.sleep( 2 )
     return i + 100
  
def Bar(arg):
     print ( '-->exec done:' ,arg)
if __name__ = = '__main__' :
     freeze_support()   #windows系統執行需加上,不然會報錯
  
     pool = Pool( 5 )
  
     for i in range ( 10 ):
         pool.apply_async(func = Foo, args = (i,),callback = Bar)
         #pool.apply(func=Foo, args=(i,))
  
     print ( 'end' )
     pool.close()
     pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。


Python 協程

協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

 

協程的好處:

  • 無需線程上下文切換的開銷

  • 無需原子操做鎖定及同步的開銷

  • 方便切換控制流,簡化編程模型

  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:

  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。

  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
def consumer(name):
     print ( "--->starting eating baozi..." )
     while True :
         new_baozi = yield
         print ( "[%s] is eating baozi %s" % (name,new_baozi))
         time.sleep( 1 )
 
def producer():
 
     r = con.__next__()
     r = con2.__next__()
     n = 0
     while n < 5 :
         n + = 1
         con.send(n)
         con2.send(n)
         print ( "\033[32;1m[producer]\033[0m is making baozi %s" % n )
 
 
if __name__ = = '__main__' :
     con = consumer( "hetan" )
     con2 = consumer( "liuyao" )
     p = producer()


Greenlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env python
 
 
from greenlet import greenlet
 
def test1():
     print ( 12 )
     gr2.switch()
     print ( 34 )
     gr2.switch()
 
 
def test2():
     print ( 56 )
     gr1.switch()
     print ( 78 )
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()


Gevent 

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
 
def foo():
     print ( 'Running in foo' )
     gevent.sleep( 0 )
     print ( 'Explicit context switch to foo again' )
 
def bar():
     print ( 'Explicit context to bar' )
     gevent.sleep( 0 )
     print ( 'Implicit context switch back to bar' )
 
gevent.joinall([
     gevent.spawn(foo),
     gevent.spawn(bar),
])


同步與異步的性能區別 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gevent
 
def task(pid):
     """
     Some non-deterministic task
     """
     gevent.sleep( 0.5 )
     print ( 'Task %s done' % pid)
 
def synchronous():
     for i in range ( 1 , 10 ):
         task(i)
 
def asynchronous():
     threads = [gevent.spawn(task, i) for i in range ( 10 )]
     gevent.joinall(threads)
 
print ( 'Synchronous:' )
synchronous()
 
print ( 'Asynchronous:' )
asynchronous()

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。  

遇到IO阻塞時會自動切換任務

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from gevent import monkey; monkey.patch_all()
import gevent
from  urllib.request import urlopen
 
def f(url):
     print ( 'GET: %s' % url)
     resp = urlopen(url)
     data = resp.read()
     print ( '%d bytes received from %s.' % ( len (data), url))
 
gevent.joinall([
         gevent.spawn(f, 'https://www.python.org/' ),
         gevent.spawn(f, 'https://www.yahoo.com/' ),
         gevent.spawn(f, 'https://github.com/' ),
])


經過gevent實現單線程下的多socket併發

server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
import gevent
 
import time
import sys
from gevent import socket,monkey
import socket
monkey.patch_all()
 
def server(port):
     s = socket.socket()
     s.bind(( '0.0.0.0' ,port))
     s.listen( 300 )
     while True :
         cli,addr = s.accept()
         gevent.spawn(handle_request,cli)
def handle_request(s):
     try :
         while True :
             data = s.recv( 1024 )
             print ( 'recv' ,data.decode())
             s.send(data)
             if not data:
                 s.shutdown(socket.SHUT_WR)
     except Exception as e:
         print (e)
     finally :
         s.close()
if __name__ = = '__main__' :
     server( 8000 )

client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
import socket
 
Host = 'localhost'
port = 8000
 
s = socket.socket()
s.connect((Host,port))
 
while True :
     msg = bytes( input ( '>>' ),encoding = 'utf8' )
     s.send(msg)
     data = s.recv( 1024 )
 
     print ( 'recevied' , repr (data.decode()))
s.close()

論事件驅動與異步IO

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。


在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

  1. 程序中有許多任務,並且…

  2. 任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…

  3. 在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

首先列一下,sellect、poll、epoll三者的區別 
select 
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。

另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。

poll 
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。

poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。

另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。

epoll 
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。

epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

 

 

Python select 

Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。

注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.

接下來經過echo server例子要以瞭解select 是如何經過單進程實現同時處理多個非阻塞的socket鏈接的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import select
import socket
import sys
import Queue
  
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking( 0 )
  
# Bind the socket to the port
server_address = ( 'localhost' , 10000 )
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind(server_address)
  
# Listen for incoming connections
server.listen( 5 )

select()方法接收並監控3個通訊列表, 第一個是全部的輸入的data,就是指外部發過來的數據,第2個是監控和接收全部要發出去的data(outgoing data),第3個監控錯誤信息,接下來咱們須要建立2個列表來包含輸入和輸出信息來傳給select().

1
2
3
4
5
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]

全部客戶端的進來的鏈接和數據將會被server的主循環程序放在上面的list中處理,咱們如今的server端須要等待鏈接可寫(writable)以後才能過來,而後接收數據並返回(所以不是在接收到數據以後就馬上返回),由於每一個鏈接要把輸入或輸出的數據先緩存到queue裏,而後再由select取出來再發出去。

1
2
# Outgoing message queues (socket:Queue)
message_queues = {}

下面是此程序的主循環,調用select()時會阻塞和等待直到新的鏈接和數據進來

1
2
3
4
5
while inputs:
 
     # Wait for at least one of the sockets to be ready for processing
     print >>sys.stderr, '\nwaiting for the next event'
     readable, writable, exceptional = select.select(inputs, outputs, inputs)

 當你把inputs,outputs,exceptional(這裏跟inputs共用)傳給select()後,它返回3個新的list,咱們上面將他們分別賦值爲readable,writable,exceptional, 全部在readable list中的socket鏈接表明有數據可接收(recv),全部在writable list中的存放着你能夠對其進行發送(send)操做的socket鏈接,當鏈接通訊出現error時會把error寫到exceptional列表中。

Readable list 中的socket 能夠有3種可能狀態,第一種是若是這個socket是main "server" socket,它負責監聽客戶端的鏈接,若是這個main server socket出如今readable裏,那表明這是server端已經ready來接收一個新的鏈接進來了,爲了讓這個main server能同時處理多個鏈接,在下面的代碼裏,咱們把這個main server的socket設置爲非阻塞模式。

1
2
3
4
5
6
7
8
9
10
11
12
# Handle inputs
for s in readable:
  
     if s is server:
         # A "readable" server socket is ready to accept a connection
         connection, client_address = s.accept()
         print >>sys.stderr, 'new connection from' , client_address
         connection.setblocking( 0 )
         inputs.append(connection)
  
         # Give the connection a queue for data we want to send
         message_queues[connection] = Queue.Queue()

第二種狀況是這個socket是已經創建了的鏈接,它把數據發了過來,這個時候你就能夠經過recv()來接收它發過來的數據,而後把接收到的數據放到queue裏,這樣你就能夠把接收到的數據再傳回給客戶端了。

1
2
3
4
5
6
7
8
9
else :
      data = s.recv( 1024 )
      if data:
          # A readable client socket has data
          print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
          message_queues[s].put(data)
          # Add output channel for response
          if s not in outputs:
              outputs.append(s)

第三種狀況就是這個客戶端已經斷開了,因此你再經過recv()接收到的數據就爲空了,因此這個時候你就能夠把這個跟客戶端的鏈接關閉了。

1
2
3
4
5
6
7
8
9
10
11
else :
     # Interpret empty result as closed connection
     print >>sys.stderr, 'closing' , client_address, 'after reading no data'
     # Stop listening for input on the connection
     if s in outputs:
         outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉
     inputs.remove(s)    #inputs中也刪除掉
     s.close()           #把這個鏈接關閉掉
  
     # Remove message queue
     del message_queues[s]

對於writable list中的socket,也有幾種狀態,若是這個客戶端鏈接在跟它對應的queue裏有數據,就把這個數據取出來再發回給這個客戶端,不然就把這個鏈接從output list中移除,這樣下一次循環select()調用時檢測到outputs list中沒有這個鏈接,那就會認爲這個鏈接還處於非活動狀態

1
2
3
4
5
6
7
8
9
10
11
# Handle outputs
for s in writable:
     try :
         next_msg = message_queues[s].get_nowait()
     except Queue.Empty:
         # No messages waiting so stop checking for writability.
         print >>sys.stderr, 'output queue for' , s.getpeername(), 'is empty'
         outputs.remove(s)
     else :
         print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
         s.send(next_msg)

最後,若是在跟某個socket鏈接通訊過程當中出了錯誤,就把這個鏈接對象在inputs\outputs\message_queue中都刪除,再把鏈接關閉掉

1
2
3
4
5
6
7
8
9
10
11
# Handle "exceptional conditions"
for s in exceptional:
     print >>sys.stderr, 'handling exceptional condition for' , s.getpeername()
     # Stop listening for input on the connection
     inputs.remove(s)
     if s in outputs:
         outputs.remove(s)
     s.close()
  
     # Remove message queue
     del message_queues[s]

最後服務器端的完整代碼以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python
import select
import socket
import sys
import queue
 
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  #生成socket句柄
server.setblocking( False )   #非阻塞模式
 
 
server_address = ( 'localhost' , 10000 #服務器端ip和端口
#輸出錯誤信息
print (sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)    #綁定ip和端口
 
server.listen( 5 #監聽端口,最大鏈接數爲5個
 
inputs = [server]    #select接收的輸入信息
 
outputs = []        #select接收的輸出信息
 
message_queue = {}  #socket的連接對象和輸出信息的隊列字典(socket:Queue)
while inputs:  #程序主循環,由於inputs列表中有值因此爲死循環
     print ( '\nwaiting for the next event' )
     '''將inputs和outputs等三個列表放進select中返回三個列表,分別賦值爲readable,writeable,exception,
     全部在readable list中的socket鏈接表明有數據可接收(recv),全部在writable list中的存放着你能夠對其
     進行發送(send)操做的socket鏈接,當鏈接通訊出現error時會把error寫到exceptional列表中。'''
     readable,writeable,exception = select.select(inputs,outputs,inputs)
     for s in readable:  #遍歷輸入信息列表
         if s is server: #若是s是server,那表明這是server端已經ready來接收一個新的鏈接進來了,因此在inputs中有一個server,表明server端已經ready
             connection,client_address = s.accept()  #等待接收數據
             connection.seblocking( False #爲了讓這個server能同時處理多個鏈接,咱們把這個server的socket設置爲非阻塞模式。
             inputs.append(connection) #將這個連接的對象內存地址放入inputs列表
             message_queue[connection] = queue.Queue() #將這個連接對象和其對應的隊列地址組成一個字典,方便之後知道咱們要往哪裏放數據
         else #第二種狀況是已經創建了的鏈接,它把數據發了過來,這個時候你就能夠經過recv()來接收它發過來的數據,而後把接收到的數據放到queue裏,這樣你就能夠把接收到的數據再傳回給客戶端了。
             data = s.recv( 1024 #接收數據
             if data:  #若是接收數據不爲空
                 print (sys.stderr, 'received "%s" from %s' % (data, s.getpeername()))
                 message_queue[s].put(data) #接收數據放入這個連接對應的隊列中
                 if s not in outputs:  #若是s不在輸出列表中就加入到列表
                     outputs.append(s)  #將對象加入信息輸入列表
             #第三種狀況就是這個客戶端已經斷開了,因此你再經過recv()接收到的數據就爲空了,因此這個時候你就能夠把這個跟客戶端的鏈接關閉了。
             else :
                 if s in outputs:  #若是s在outputs列表中就刪除
                     outputs.remove(s)
                 inputs.remove(s)  #把s刪除出inputs列表
                 s.close()  #鏈接關閉
                 del message_queue[s] #在字典中刪除s
     for s in writeable:  #遍歷輸出信息列表
         try #若是這個客戶端連接對應的queue中有數據,就取出來
             next_msg = message_queue[s].get_nowait()
         #沒有數據就將其從outputs列表中移除,這樣下一次循環select()調用時檢測到outputs list中沒有這個鏈接,那就會認爲這個鏈接還處於非活動狀態
         except queue.Empty:
             print ( 'output queue for' , s.getpeername(), 'is empty' )
             outputs.remove(s)
         #將取出的數據在發送回客戶端
         else :
             print ( 'sending "%s" to %s' % (next_msg, s.getpeername()))
             s.send(next_msg)
     #最後,若是在跟某個socket鏈接通訊過程當中出了錯誤,就把這個鏈接對象在inputs\outputs\message_queue中都刪除,再把鏈接關閉掉
     for s in exception:
         print ( 'handling exceptional condition for' , s.getpeername() )
         #刪除inputs中連接對象
         inputs.remove(s)
         if s in outputs:
             #刪除outputs中的連接對象
             outputs.remove(s)
         s.close()  #關閉連接
         del message_queue[s]  #刪除連接字典中的對象

客戶端

下面的這個是客戶端程序展現瞭如何經過select()對socket進行管理並與多個鏈接同時進行交互:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import socket
import sys
  
messages = [ 'This is the message. ' ,
              'It will be sent ' ,
              'in parts.' ,
              ]
server_address = ( 'localhost' , 10000 )
  
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           ]
  
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
     s.connect(server_address)

接下來經過循環經過每一個socket鏈接給server發送和接收數據。

1
2
3
4
5
6
7
8
9
10
11
12
13
for message in messages:
  
     # Send messages on both sockets
     for s in socks:
         print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
         s.send(message)
  
     # Read responses on both sockets
     for s in socks:
         data = s.recv( 1024 )
         print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
         if not data:
             print >>sys.stderr, 'closing socket' , s.getsockname()

客戶端完整代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
__author__ = 'jieli'
import socket
import sys
  
messages = [ 'This is the message. ' ,
              'It will be sent ' ,
              'in parts.' ,
              ]
server_address = ( 'localhost' , 10000 )
  
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
           ]
  
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
     s.connect(server_address)
  
for message in messages:
  
     # Send messages on both sockets
     for s in socks:
         print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
         s.send(message)
  
     # Read responses on both sockets
     for s in socks:
         data = s.recv( 1024 )
         print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
         if not data:
             print >>sys.stderr, 'closing socket' , s.getsockname()
             s.close()

selectors模塊(將select模塊封裝,調用更簡潔)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import selectors
import socket
  
sel = selectors.DefaultSelector()
  
def accept(sock, mask):
     conn, addr = sock.accept()  # Should be ready
     print ( 'accepted' , conn, 'from' , addr)
     conn.setblocking( False )
     sel.register(conn, selectors.EVENT_READ, read)
  
def read(conn, mask):
     data = conn.recv( 1000 # Should be ready
     if data:
         print ( 'echoing' , repr (data), 'to' , conn)
         conn.send(data)  # Hope it won't block
     else :
         print ( 'closing' , conn)
         sel.unregister(conn)
         conn.close()
  
sock = socket.socket()
sock.bind(( 'localhost' , 10000 ))
sock.listen( 100 )
sock.setblocking( False )
sel.register(sock, selectors.EVENT_READ, accept)
  
while True :
     events = sel.select()
     for key, mask in events:
         callback = key.data
         callback(key.fileobj, mask)




相關文章
相關標籤/搜索