Python網絡編程—socket(二)

http://www.cnblogs.com/phennry/p/5645369.htmlhtml

接着上篇博客咱們繼續介紹socket網絡編程,今天主要介紹的內容:IO多路複用、多線程、補充知識點。python

1、IO多路複用

    IO多路複用是指內核一旦發現進程指定的一個或者多個IO條件準備讀取,它就通知該進程。IO多路複用適用於如下場合:面試

  • 當客戶端處理多個描述符時(通常是交互式輸入和網絡套接字),必須使用IO複用;編程

  • 當一個客戶經過處理過個套接字時,而這種狀況是可能的,但不多出現;數組

  • 若是一個TCP服務器既要處理監聽套接字,又要處理已鏈接套接字,通常也要用到IO複用;緩存

  • 若是服務器紀要處理TCP,又要處理UDP時;服務器

  • 若是一個服務器要處理多個服務或多個協議時,使用IO複用。網絡

IO多路複用的事件方式有三種,分別是:select、poll、epoll。數據結構

下面咱們就介紹下這三種事件方式:多線程

一、select

    首先select是能夠跨平臺的,select函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用後select函數會阻塞,知道有描述符就緒(有數據可讀、可寫或者有except異常),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠經過遍歷fdset,來找到就緒的描述符。具體用法,請看下面代碼:

 服務器端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import  socket
import  select                #select的監聽是有個數限制1024
 
sk  =  socket.socket()
sk.bind(( '127.0.0.1' 9999 ,))
sk.listen( 5 )
 
inputs  =  [sk,]
while  True :
     rlist,w,x,  =  select.select(inputs,[],[], 1 )
     print ( len (inputs), len (rlist))
     #監聽sk(服務端)對象若是sk對象發生變化,表示有客戶端來鏈接了,此時rlist值爲[sk,]
     #監聽conn對象,若是conn發生變化時,表示客戶端有新消息發送過來,此時rlist的值爲[客戶端]
     #當s1向服務端發送消息時,rlist =[s1]
 
     for  in  rlist:
         if  = =  sk:                      #判斷新客戶來鏈接
             conn,address  =  r.accept()    #conn也是socket的對象
             inputs.append(conn)
             conn.sendall(bytes( 'hello' ,encoding = 'utf-8' ))
         else :
             r.recv( 1024 )                  #等待接收客戶端發來消息

客戶端:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import  socket
 
sk  =  socket.socket()
 
sk.connect(( "127.0.0.1" , 9999 ,))
data  =  sk.recv( 1024 )
print (data)
 
while  True :
     inp  =  input ( '>>>:' )
     sk.sendall(bytes(inp,encoding = 'utf-8' ))
     print (sk.recv( 1024 ))
sk.close()

select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。上面的例子只用來監視sk對象和conn對象。

從上面的例子咱們能夠判斷出若是同時多個客戶端鏈接過來,某一個斷開的話,服務器端會報錯,爲了解決這個問題咱們將代碼修改以下:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import  socket
import  select
 
sk  =  socket.socket()
sk.bind(( '127.0.0.1' 9999 ,))
sk.listen( 5 )
 
inputs = [sk,]
while  True :
     rlist,w,x  =  select.select(inputs,[],[], 1 )
     print ( len (inputs), len (rlist))
     for  in  rlist:
         if  = =  sk:
             conn,address  =  r.accept()
             inputs.append(conn)
             conn.sendall(bytes( 'hello' ,encoding = 'utf-8' ))
         else :
             print ( '=====================' )
             try :
                 ret  =  r.recv( 1024 )
                 r.sendall(ret)
                 if  not  ret:         #若是接收的數據Wie空的話,主動觸發下面的raise錯誤
                     raise   Exception( '斷開鏈接!!!' )
             except  Exception as e:
                 inputs.remove(r)    #若是客戶端斷開的話,移除監聽的鏈接

下面咱們就使用select來實現一下socketserver服務端的功能,具體代碼以下:

 

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import  socket
import  select
 
sk  =  socket.socket()              #建立套接字
sk.bind(( '127.0.0.1' 9999 ,))     #綁定套接字
sk.listen( 5 )                      #等待鏈接隊列長度
 
inputs = [sk,]                      #初始化讀取數據的監聽列表,最開始時但願從sk這個套接字上讀取數據
outputs = []                        #初始化寫入數據的監聽列表,最開始時並無客戶端鏈接進來,因此列表爲空
messages  =  {}                     #建立字典,用來記錄發往客戶端的數據
 
while  True :
     rlist,wlist,elist  =  select.select(inputs,outputs,[], 1 )     #調用select監聽全部列表中的套接字,並將準備好的套接字加入到對應的列表中
     print ( len (inputs), len (rlist), len (wlist), len (outputs))
     for  in  rlist:
         if  = =  sk:
             conn,address  =  r.accept()
             inputs.append(conn)
             messages[conn]  =  []
             conn.sendall(bytes( 'hello' ,encoding = 'utf-8' ))
         else :
             print ( '=====================' )
             try :
                 ret  =  r.recv( 1024 )
                 if  not  ret:                            #若是接收的數據爲空的話,主動觸發下面的raise錯誤
                     raise   Exception( '斷開鏈接!!!' )
                 else :
                     outputs.append(r)
                     messages[r].append(ret)
             except  Exception as e:
                 inputs.remove(r)                       #若是客戶端斷開的話,移除監聽的鏈接
                 del  messages[r]
 
#全部給我發過消息的人
     for  in  wlist:
         msg  =  messages[w].pop()
         resp  =  msg  +  bytes( 'response' ,encoding = 'utf-8' )
         w.sendall(resp)
         outputs.remove(w)

在上面的例子中監控文件句柄有某一處發生了變化,可寫、可讀、異常屬於Linux中的網絡編程,屬於同步I/O操做,屬於I/O複用模型的一種:

  • rlist-->等待到準備好讀;

  • wlist-->等待到準備好寫;

  • xlist-->等待到一種異常。

     若是sk這個套接字可讀,則說明有新連接到來,此時在sk套接字上調用accept,生成一個與客戶端通信的套接字,並將與客戶端通信的套接字加入到inputs列表,下一次能夠經過select檢查連接是否可讀,而後在發往客戶端的緩衝加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列,select系統調用是用來讓咱們的程序監視多個文件句柄(file descriptor)的狀態變化的。程序會停在select這裏等待,知道被監視的文件句柄有某一個會多個發生了狀態改變。

若可讀的套接字不是sk套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開。

    若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後將於客戶端通信的套接字加入到寫數據的監聽列表;

    若是套接字可讀,但沒有接收到數據,則說明客戶端已經斷開,這時須要關閉與客戶端連接的套接字,進行資源清理。

 select本質上是經過設置或者檢查存放fd標誌位的數據結構來進行下一步處理的,這樣所帶來的缺點是:

  • select最大的缺陷就是單個進程所打開的FD是有必定限制的,它由FD_SIZE設置,默認值是1024。通常來講這個數目和系統內存關係很大,具體數目能夠cat /proc/sys/fs/file-max查看。32位的系統默認爲1024,64位的系統默認爲2048。

  • 對socket進行掃描是採用的輪詢的方法,效率較低當套接字比較多的時候,無論哪一個socket是活躍的,都要遍歷一遍,這樣會浪費CPU時間。

  •  須要維護一個用來存放大量fd的數據結構,這樣會使得用戶空間和內核空間在傳遞該結構時複製開銷大。

二、poll

   poll本質上和select沒有區別,它將用戶傳入的數組拷貝到內核空間,而後查詢每一個fd對應的設備狀態,若是設備就緒則在設備等待隊列中加入一項並繼續遍歷,若是遍歷完全部fd後沒有發現就緒設備,則掛起當前進程,直到設備就緒或者主動超時,被喚醒後它又要再次遍歷fd,這個過程經歷了屢次無謂的遍歷。

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import  socket
import  select
import  Queue
   
server  =  socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking( False )                     #設置成非阻塞
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,  1 )
server_address  =  ( "127.0.0.1" 9999 )
server.bind(server_address)
server.listen( 5 )
print  "服務器啓動成功,監聽IP:"  , server_address
message_queues  =  {}
  #超時,毫秒
timeout  =  5000  
#監聽哪些事件
READ_ONLY  =  ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE  =  (READ_ONLY|select.POLLOUT)
#新建輪詢事件對象
poller  =  select.poll()
#註冊本機監聽socket到等待可讀事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket  =  {server.fileno():server,}
while  True :
     print  "等待活動鏈接......"
     #輪詢註冊的事件集合
     events  =  poller.poll(timeout)
     if  not  events:
       print  "poll超時,無活動鏈接,從新poll......"
       continue
     print  "有"  len (events),  "個新事件,開始處理......"
     for  fd ,flag  in  events:
         =  fd_to_socket[fd]
         #可讀事件
         if  flag & (select.POLLIN | select.POLLPRI) :
             if  is  server :
                 #若是socket是監聽的server表明有新鏈接
                 connection , client_address  =  s.accept()
                 print  "新鏈接:"  , client_address
                 connection.setblocking( False )
                   
                 fd_to_socket[connection.fileno()]  =  connection
                 #加入到等待讀事件集合
                 poller.register(connection,READ_ONLY)
                 message_queues[connection]  =  Queue.Queue()
             else  :
                 #接收客戶端發送的數據
                 data  =  s.recv( 1024 )
                 if  data:
                     print  "收到數據:"  , data ,  "客戶端:"  , s.getpeername()
                     message_queues[s].put(data)
                     #修改讀取到消息的鏈接到等待寫事件集合
                     poller.modify(s,READ_WRITE)
                 else  :
                     # Close the connection
                     print  " closing"  , s.getpeername()
                     # Stop listening for input on the connection
                     poller.unregister(s)
                     s.close()
                     del  message_queues[s]
         #鏈接關閉事件
         elif  flag & select.POLLHUP :
             print  " Closing " , s.getpeername() , "(HUP)"
             poller.unregister(s)
             s.close()
         #可寫事件
         elif  flag & select.POLLOUT :
             try :
                 msg  =  message_queues[s].get_nowait()
             except  Queue.Empty:
                 print  s.getpeername() ,  " queue empty"
                 poller.modify(s,READ_ONLY)
             else  :
                 print  "發送數據:"  , data ,  "客戶端:"  , s.getpeername()
                 s.send(msg)
         #異常事件
         elif  flag & select.POLLERR:
             print  " exception on"  , s.getpeername()
             poller.unregister(s)
             s.close()
             del  message_queues[s]

三、epoll

   epoll是在2.6內核中提出的,是以前的select和poll的加強版本。先對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理過個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。

   epoll支持水平觸發和邊緣觸發,最大的特色在於邊緣觸發,它只告訴進程哪些fd剛剛變爲就緒狀態,而且只會通知一次。還有一個特色是,epoll使用"事件"的就緒通知方式,經過epoll_ctl註冊fd,一旦該fd就緒,內核就會採用相似callback的回調機制來激活該fd,epoll_wait即可以收到通知。

使用epoll的優勢:

  • 沒有最大併發鏈接的限制,能打開的FD的上限遠大於1024(1G的內存上能監聽約10萬個端口);

  • 效率提高,不是輪詢的方式,不會隨着FD數目的增長效率降低。只有活躍可用的FD纔會調用callback函數;即epoll最大的優勢就在於它只管你"活躍"的鏈接,而跟鏈接總數無關,所以在實際的網絡環境中,epoll的效率就會遠遠高於select和poll;

  • 內存拷貝,利用mmap()文件映射內存加速與內核空間的消息傳遞,即epoll使用mmap減小

epoll對文件描述符的操做有兩種模式:LT(level trigger)和ET(edge trigger)。LT模式是默認的模式,LT模式與ET模式的區別以下:

  • LT模式(缺省工做模式):當epoll_wait檢測到描述符事件發生並將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件;

  • ET模式(高速工做模式):當epoll_wait檢測到描述符事件發生將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。

下面舉一個epoll事件處理的方式,來監聽socket套接字的變化,請看下面代碼:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket, select
import Queue
  
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
server_address = ( "127.0.0.1" , 9999 )
serversocket.bind(server_address)
serversocket.listen( 1 )
print "服務器啓動成功,監聽IP:" , server_address
serversocket.setblocking( 0 )
timeout = 10
#新建epoll事件對象,後續要監控的事件添加到其中
epoll = select.epoll()
#添加服務器監聽fd到等待讀事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues = {}
  
fd_to_socket = {serversocket.fileno():serversocket,}
while True :
   print "等待活動鏈接......"
   #輪詢註冊的事件集合
   events = epoll.poll(timeout)
   if not events:
      print "epoll超時無活動鏈接,從新輪詢......"
      continue
   print "有" , len (events), "個新事件,開始處理......"
   for fd, event in events:
      socket = fd_to_socket[fd]
      #可讀事件
      if event & select.EPOLLIN:
          #若是活動socket爲服務器所監聽,有新鏈接
          if socket = = serversocket:
             connection, address = serversocket.accept()
             print "新鏈接:" , address
             connection.setblocking( 0 )
             #註冊新鏈接fd到待讀事件集合
             epoll.register(connection.fileno(), select.EPOLLIN)
             fd_to_socket[connection.fileno()] = connection
             message_queues[connection] = Queue.Queue()
          #不然爲客戶端發送的數據
          else :
             data = socket.recv( 1024 )
             if data:
                print "收到數據:" , data , "客戶端:" , socket.getpeername()
                message_queues[socket].put(data)
                #修改讀取到消息的鏈接到等待寫事件集合
                epoll.modify(fd, select.EPOLLOUT)
      #可寫事件
      elif event & select.EPOLLOUT:
         try :
            msg = message_queues[socket].get_nowait()
         except Queue.Empty:
            print socket.getpeername() , " queue empty"
            epoll.modify(fd, select.EPOLLIN)
         else :
            print "發送數據:" , data , "客戶端:" , socket.getpeername()
            socket.send(msg)
      #關閉事件
      elif event & select.EPOLLHUP:
         epoll.unregister(fd)
         fd_to_socket[fd].close()
         del fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

2、多線程

 

多線程,多進程:
1,一個應用程序,能夠有多進程和多線程
2,默認:單進程,單線程
3,單進程,多線程下:
python多線程:IO操做是不會佔用CPU,多線程會提升併發
計算性操做,佔用CPU,多進程提升併發
    4,GIL,全局解釋器鎖

首先咱們先看一個多線程的例子,而後在詳細介紹,請看代碼:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import time                                  #多線程的程序
 
def f1(args):
     time.sleep( 5 )
     print (args)
 
import threading
 
t = threading.Thread(target = f1,args = ( 123 ,))  #建立子線程
t.setDaemon( True )                           #True表示主線程不等子線程
t.start()                                   #不表明當前線程會被當即執行
t.join( 2 )                                   #表示主線程到此,等待...直到子線程執行完畢,
                                             #參數,表示主線程在此最多等N秒
print ( 'end' )

上面這個例子是開啓一個線程,主線程最多等子線程兩分鐘的時間而後執行。下面咱們一塊兒看下threading的更多方法:

  • start    線程準備就緒,等待CPU調度;

  • setName  爲線程設置名稱;

  • getName 獲取線程名稱;

  • setDaemon  設置爲後臺線程或前臺線程(默認),是否等待子線程,值爲True或False;

  • join   逐個執行每一個線程,執行完畢後據需往下執行,該方法使得多線程變得無心義;

  • run  線程被CPU調度後自動執行線程對象的run方法。

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
import threading
import time
 
class MyThread(threading.Thread):
     def __init__( self ,num):
         threading.Thread.__init__( self )
         self .num = num
     def run( self ):         #定義每一個線程要運行的函數
         print ( 'running on number:%s' % self .num)
         time.sleep( 2 )
 
if __name__ = = '__main__' :
     t1 = MyThread( 1 )
     t2 = MyThread( 2 )
     t1.start()
     t2.start()
 
#結果:
running on number: 1
running on number: 2

多線程先介紹到這裏,下篇博客在詳細介紹,多線程,多進程和協程的用法。

3、知識點補充

1,python做用域

經過兩個簡單的代碼咱們在來補充一下python做用域的問題,請看代碼:

 

1
2
3
4
5
6
7
if 1 = = 1 :
     name = 'jack1'   #一個代碼塊
print (name)
 
def func():
     name = 'eric'
print (name)

 

1
2
3
4
5
6
7
8
name = 'jack'
def f1():
     print (name)
def f2():
     name = 'eric'
     return f1
ret = f2()
ret()

    ​分析一下上面代碼的執行結果,在第一個例子中函數中的eric是沒法輸出的,由於python中函數爲做用域的,在Python中無塊級做用域,python中以函數爲做用域,而在Java或C#中存在塊級做用域。

    python做用域鏈,由內向外找,直到找不到報錯,而且python做用域在代碼執行以前已經肯定,原定義的那個做用域,就去那個做用域裏找。

python和JavaScript的做用域是相似的,武sir大神給咱們總結了五句話,方便理解做用域,詳細介紹請參考連接:

 

http://www.cnblogs.com/wupeiqi/p/5649402.html

2,XX公司面試題

1
2
3
4
5
6
7
8
9
li = [ lambda :x for x in range ( 10 )]
#li列表
#li列表中的元素:[函數、函數、函數.....]
#函數在沒有執行前,內部代碼不執行
#li[0]是個函數
#執行第一個函數()
#返回值是???
r = li[ 0 ]()
print (r)

咱們一塊兒分析一下這個程序的結果是什麼,這用到了咱們上面補充的做用域知識,咱們能夠先將lambda函數修改爲正常函數的方式,在一步一步分析:

 

1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
for x in range ( 10 ):
     def test():
         return x
 
ret = test()
print ret

     由於python的做用域爲函數,在函數中爲局部做用域,定義在函數外的爲全局做用域,由於python做用域在代碼執行以前已經肯定,原定義的那個做用域,就去那個做用域裏找,這裏的結果爲9。

 

    今天就介紹到這裏,咱們今天主要介紹了I/O多路複用的知識和多線程的定義,雖然I/O多路複用在咱們日常寫代碼的時候用的比較少,但咱們理解了後,方便咱們之後去讀懂源碼。​

相關文章
相關標籤/搜索