http://www.cnblogs.com/phennry/p/5645369.htmlhtml
接着上篇博客咱們繼續介紹socket網絡編程,今天主要介紹的內容:IO多路複用、多線程、補充知識點。python
IO多路複用是指內核一旦發現進程指定的一個或者多個IO條件準備讀取,它就通知該進程。IO多路複用適用於如下場合:面試
當客戶端處理多個描述符時(通常是交互式輸入和網絡套接字),必須使用IO複用;編程
當一個客戶經過處理過個套接字時,而這種狀況是可能的,但不多出現;數組
若是一個TCP服務器既要處理監聽套接字,又要處理已鏈接套接字,通常也要用到IO複用;緩存
若是服務器紀要處理TCP,又要處理UDP時;服務器
若是一個服務器要處理多個服務或多個協議時,使用IO複用。網絡
IO多路複用的事件方式有三種,分別是:select、poll、epoll。數據結構
下面咱們就介紹下這三種事件方式:多線程
首先select是能夠跨平臺的,select函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用後select函數會阻塞,知道有描述符就緒(有數據可讀、可寫或者有except異常),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠經過遍歷fdset,來找到就緒的描述符。具體用法,請看下面代碼:
服務器端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import
socket
import
select
#select的監聽是有個數限制1024
sk
=
socket.socket()
sk.bind((
'127.0.0.1'
,
9999
,))
sk.listen(
5
)
inputs
=
[sk,]
while
True
:
rlist,w,x,
=
select.select(inputs,[],[],
1
)
print
(
len
(inputs),
len
(rlist))
#監聽sk(服務端)對象若是sk對象發生變化,表示有客戶端來鏈接了,此時rlist值爲[sk,]
#監聽conn對象,若是conn發生變化時,表示客戶端有新消息發送過來,此時rlist的值爲[客戶端]
#當s1向服務端發送消息時,rlist =[s1]
for
r
in
rlist:
if
r
=
=
sk:
#判斷新客戶來鏈接
conn,address
=
r.accept()
#conn也是socket的對象
inputs.append(conn)
conn.sendall(bytes(
'hello'
,encoding
=
'utf-8'
))
else
:
r.recv(
1024
)
#等待接收客戶端發來消息
|
客戶端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
socket
sk
=
socket.socket()
sk.connect((
"127.0.0.1"
,
9999
,))
data
=
sk.recv(
1024
)
print
(data)
while
True
:
inp
=
input
(
'>>>:'
)
sk.sendall(bytes(inp,encoding
=
'utf-8'
))
print
(sk.recv(
1024
))
sk.close()
|
select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。上面的例子只用來監視sk對象和conn對象。
從上面的例子咱們能夠判斷出若是同時多個客戶端鏈接過來,某一個斷開的話,服務器端會報錯,爲了解決這個問題咱們將代碼修改以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import
socket
import
select
sk
=
socket.socket()
sk.bind((
'127.0.0.1'
,
9999
,))
sk.listen(
5
)
inputs
=
[sk,]
while
True
:
rlist,w,x
=
select.select(inputs,[],[],
1
)
print
(
len
(inputs),
len
(rlist))
for
r
in
rlist:
if
r
=
=
sk:
conn,address
=
r.accept()
inputs.append(conn)
conn.sendall(bytes(
'hello'
,encoding
=
'utf-8'
))
else
:
print
(
'====================='
)
try
:
ret
=
r.recv(
1024
)
r.sendall(ret)
if
not
ret:
#若是接收的數據Wie空的話,主動觸發下面的raise錯誤
raise
Exception(
'斷開鏈接!!!'
)
except
Exception as e:
inputs.remove(r)
#若是客戶端斷開的話,移除監聽的鏈接
|
下面咱們就使用select來實現一下socketserver服務端的功能,具體代碼以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
import
socket
import
select
sk
=
socket.socket()
#建立套接字
sk.bind((
'127.0.0.1'
,
9999
,))
#綁定套接字
sk.listen(
5
)
#等待鏈接隊列長度
inputs
=
[sk,]
#初始化讀取數據的監聽列表,最開始時但願從sk這個套接字上讀取數據
outputs
=
[]
#初始化寫入數據的監聽列表,最開始時並無客戶端鏈接進來,因此列表爲空
messages
=
{}
#建立字典,用來記錄發往客戶端的數據
while
True
:
rlist,wlist,elist
=
select.select(inputs,outputs,[],
1
)
#調用select監聽全部列表中的套接字,並將準備好的套接字加入到對應的列表中
print
(
len
(inputs),
len
(rlist),
len
(wlist),
len
(outputs))
for
r
in
rlist:
if
r
=
=
sk:
conn,address
=
r.accept()
inputs.append(conn)
messages[conn]
=
[]
conn.sendall(bytes(
'hello'
,encoding
=
'utf-8'
))
else
:
print
(
'====================='
)
try
:
ret
=
r.recv(
1024
)
if
not
ret:
#若是接收的數據爲空的話,主動觸發下面的raise錯誤
raise
Exception(
'斷開鏈接!!!'
)
else
:
outputs.append(r)
messages[r].append(ret)
except
Exception as e:
inputs.remove(r)
#若是客戶端斷開的話,移除監聽的鏈接
del
messages[r]
#全部給我發過消息的人
for
w
in
wlist:
msg
=
messages[w].pop()
resp
=
msg
+
bytes(
'response'
,encoding
=
'utf-8'
)
w.sendall(resp)
outputs.remove(w)
|
在上面的例子中監控文件句柄有某一處發生了變化,可寫、可讀、異常屬於Linux中的網絡編程,屬於同步I/O操做,屬於I/O複用模型的一種:
rlist-->等待到準備好讀;
wlist-->等待到準備好寫;
xlist-->等待到一種異常。
若是sk這個套接字可讀,則說明有新連接到來,此時在sk套接字上調用accept,生成一個與客戶端通信的套接字,並將與客戶端通信的套接字加入到inputs列表,下一次能夠經過select檢查連接是否可讀,而後在發往客戶端的緩衝加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列,select系統調用是用來讓咱們的程序監視多個文件句柄(file descriptor)的狀態變化的。程序會停在select這裏等待,知道被監視的文件句柄有某一個會多個發生了狀態改變。
若可讀的套接字不是sk套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開。
若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後將於客戶端通信的套接字加入到寫數據的監聽列表;
若是套接字可讀,但沒有接收到數據,則說明客戶端已經斷開,這時須要關閉與客戶端連接的套接字,進行資源清理。
select本質上是經過設置或者檢查存放fd標誌位的數據結構來進行下一步處理的,這樣所帶來的缺點是:
select最大的缺陷就是單個進程所打開的FD是有必定限制的,它由FD_SIZE設置,默認值是1024。通常來講這個數目和系統內存關係很大,具體數目能夠cat /proc/sys/fs/file-max查看。32位的系統默認爲1024,64位的系統默認爲2048。
對socket進行掃描是採用的輪詢的方法,效率較低當套接字比較多的時候,無論哪一個socket是活躍的,都要遍歷一遍,這樣會浪費CPU時間。
須要維護一個用來存放大量fd的數據結構,這樣會使得用戶空間和內核空間在傳遞該結構時複製開銷大。
poll本質上和select沒有區別,它將用戶傳入的數組拷貝到內核空間,而後查詢每一個fd對應的設備狀態,若是設備就緒則在設備等待隊列中加入一項並繼續遍歷,若是遍歷完全部fd後沒有發現就緒設備,則掛起當前進程,直到設備就緒或者主動超時,被喚醒後它又要再次遍歷fd,這個過程經歷了屢次無謂的遍歷。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import
socket
import
select
import
Queue
server
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(
False
)
#設置成非阻塞
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
1
)
server_address
=
(
"127.0.0.1"
,
9999
)
server.bind(server_address)
server.listen(
5
)
print
"服務器啓動成功,監聽IP:"
, server_address
message_queues
=
{}
#超時,毫秒
timeout
=
5000
#監聽哪些事件
READ_ONLY
=
( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE
=
(READ_ONLY|select.POLLOUT)
#新建輪詢事件對象
poller
=
select.poll()
#註冊本機監聽socket到等待可讀事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket
=
{server.fileno():server,}
while
True
:
print
"等待活動鏈接......"
#輪詢註冊的事件集合
events
=
poller.poll(timeout)
if
not
events:
print
"poll超時,無活動鏈接,從新poll......"
continue
print
"有"
,
len
(events),
"個新事件,開始處理......"
for
fd ,flag
in
events:
s
=
fd_to_socket[fd]
#可讀事件
if
flag & (select.POLLIN | select.POLLPRI) :
if
s
is
server :
#若是socket是監聽的server表明有新鏈接
connection , client_address
=
s.accept()
print
"新鏈接:"
, client_address
connection.setblocking(
False
)
fd_to_socket[connection.fileno()]
=
connection
#加入到等待讀事件集合
poller.register(connection,READ_ONLY)
message_queues[connection]
=
Queue.Queue()
else
:
#接收客戶端發送的數據
data
=
s.recv(
1024
)
if
data:
print
"收到數據:"
, data ,
"客戶端:"
, s.getpeername()
message_queues[s].put(data)
#修改讀取到消息的鏈接到等待寫事件集合
poller.modify(s,READ_WRITE)
else
:
# Close the connection
print
" closing"
, s.getpeername()
# Stop listening for input on the connection
poller.unregister(s)
s.close()
del
message_queues[s]
#鏈接關閉事件
elif
flag & select.POLLHUP :
print
" Closing "
, s.getpeername() ,
"(HUP)"
poller.unregister(s)
s.close()
#可寫事件
elif
flag & select.POLLOUT :
try
:
msg
=
message_queues[s].get_nowait()
except
Queue.Empty:
print
s.getpeername() ,
" queue empty"
poller.modify(s,READ_ONLY)
else
:
print
"發送數據:"
, data ,
"客戶端:"
, s.getpeername()
s.send(msg)
#異常事件
elif
flag & select.POLLERR:
print
" exception on"
, s.getpeername()
poller.unregister(s)
s.close()
del
message_queues[s]
|
epoll是在2.6內核中提出的,是以前的select和poll的加強版本。先對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理過個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。
epoll支持水平觸發和邊緣觸發,最大的特色在於邊緣觸發,它只告訴進程哪些fd剛剛變爲就緒狀態,而且只會通知一次。還有一個特色是,epoll使用"事件"的就緒通知方式,經過epoll_ctl註冊fd,一旦該fd就緒,內核就會採用相似callback的回調機制來激活該fd,epoll_wait即可以收到通知。
使用epoll的優勢:
沒有最大併發鏈接的限制,能打開的FD的上限遠大於1024(1G的內存上能監聽約10萬個端口);
效率提高,不是輪詢的方式,不會隨着FD數目的增長效率降低。只有活躍可用的FD纔會調用callback函數;即epoll最大的優勢就在於它只管你"活躍"的鏈接,而跟鏈接總數無關,所以在實際的網絡環境中,epoll的效率就會遠遠高於select和poll;
內存拷貝,利用mmap()文件映射內存加速與內核空間的消息傳遞,即epoll使用mmap減小
epoll對文件描述符的操做有兩種模式:LT(level trigger)和ET(edge trigger)。LT模式是默認的模式,LT模式與ET模式的區別以下:
LT模式(缺省工做模式):當epoll_wait檢測到描述符事件發生並將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件;
ET模式(高速工做模式):當epoll_wait檢測到描述符事件發生將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。
下面舉一個epoll事件處理的方式,來監聽socket套接字的變化,請看下面代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
#!/usr/bin/python
# -*- coding: utf-8 -*-
import
socket, select
import
Queue
serversocket
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
1
)
server_address
=
(
"127.0.0.1"
, 9999
)
serversocket.bind(server_address)
serversocket.listen(
1
)
print
"服務器啓動成功,監聽IP:"
, server_address
serversocket.setblocking(
0
)
timeout
=
10
#新建epoll事件對象,後續要監控的事件添加到其中
epoll
=
select.epoll()
#添加服務器監聽fd到等待讀事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues
=
{}
fd_to_socket
=
{serversocket.fileno():serversocket,}
while
True
:
print
"等待活動鏈接......"
#輪詢註冊的事件集合
events
=
epoll.poll(timeout)
if
not
events:
print
"epoll超時無活動鏈接,從新輪詢......"
continue
print
"有"
,
len
(events),
"個新事件,開始處理......"
for
fd, event
in
events:
socket
=
fd_to_socket[fd]
#可讀事件
if
event & select.EPOLLIN:
#若是活動socket爲服務器所監聽,有新鏈接
if
socket
=
=
serversocket:
connection, address
=
serversocket.accept()
print
"新鏈接:"
, address
connection.setblocking(
0
)
#註冊新鏈接fd到待讀事件集合
epoll.register(connection.fileno(), select.EPOLLIN)
fd_to_socket[connection.fileno()]
=
connection
message_queues[connection]
=
Queue.Queue()
#不然爲客戶端發送的數據
else
:
data
=
socket.recv(
1024
)
if
data:
print
"收到數據:"
, data ,
"客戶端:"
, socket.getpeername()
message_queues[socket].put(data)
#修改讀取到消息的鏈接到等待寫事件集合
epoll.modify(fd, select.EPOLLOUT)
#可寫事件
elif
event & select.EPOLLOUT:
try
:
msg
=
message_queues[socket].get_nowait()
except
Queue.Empty:
print
socket.getpeername() ,
" queue empty"
epoll.modify(fd, select.EPOLLIN)
else
:
print
"發送數據:"
, data ,
"客戶端:"
, socket.getpeername()
socket.send(msg)
#關閉事件
elif
event & select.EPOLLHUP:
epoll.unregister(fd)
fd_to_socket[fd].close()
del
fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
|
多線程,多進程:
1,一個應用程序,能夠有多進程和多線程
2,默認:單進程,單線程
3,單進程,多線程下:
python多線程:IO操做是不會佔用CPU,多線程會提升併發
計算性操做,佔用CPU,多進程提升併發
4,GIL,全局解釋器鎖
首先咱們先看一個多線程的例子,而後在詳細介紹,請看代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import
time
#多線程的程序
def
f1(args):
time.sleep(
5
)
print
(args)
import
threading
t
=
threading.Thread(target
=
f1,args
=
(
123
,))
#建立子線程
t.setDaemon(
True
)
#True表示主線程不等子線程
t.start()
#不表明當前線程會被當即執行
t.join(
2
)
#表示主線程到此,等待...直到子線程執行完畢,
#參數,表示主線程在此最多等N秒
print
(
'end'
)
|
上面這個例子是開啓一個線程,主線程最多等子線程兩分鐘的時間而後執行。下面咱們一塊兒看下threading的更多方法:
start 線程準備就緒,等待CPU調度;
setName 爲線程設置名稱;
getName 獲取線程名稱;
setDaemon 設置爲後臺線程或前臺線程(默認),是否等待子線程,值爲True或False;
join 逐個執行每一個線程,執行完畢後據需往下執行,該方法使得多線程變得無心義;
run 線程被CPU調度後自動執行線程對象的run方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import
threading
import
time
class
MyThread(threading.Thread):
def
__init__(
self
,num):
threading.Thread.__init__(
self
)
self
.num
=
num
def
run(
self
):
#定義每一個線程要運行的函數
print
(
'running on number:%s'
%
self
.num)
time.sleep(
2
)
if
__name__
=
=
'__main__'
:
t1
=
MyThread(
1
)
t2
=
MyThread(
2
)
t1.start()
t2.start()
#結果:
running on number:
1
running on number:
2
|
多線程先介紹到這裏,下篇博客在詳細介紹,多線程,多進程和協程的用法。
經過兩個簡單的代碼咱們在來補充一下python做用域的問題,請看代碼:
1
2
3
4
5
6
7
|
if
1
=
=
1
:
name
=
'jack1'
#一個代碼塊
print
(name)
def
func():
name
=
'eric'
print
(name)
|
1
2
3
4
5
6
7
8
|
name
=
'jack'
def
f1():
print
(name)
def
f2():
name
=
'eric'
return
f1
ret
=
f2()
ret()
|
分析一下上面代碼的執行結果,在第一個例子中函數中的eric是沒法輸出的,由於python中函數爲做用域的,在Python中無塊級做用域,python中以函數爲做用域,而在Java或C#中存在塊級做用域。
python做用域鏈,由內向外找,直到找不到報錯,而且python做用域在代碼執行以前已經肯定,原定義的那個做用域,就去那個做用域裏找。
python和JavaScript的做用域是相似的,武sir大神給咱們總結了五句話,方便理解做用域,詳細介紹請參考連接:
http://www.cnblogs.com/wupeiqi/p/5649402.html
1
2
3
4
5
6
7
8
9
|
li
=
[
lambda
:x
for
x
in
range
(
10
)]
#li列表
#li列表中的元素:[函數、函數、函數.....]
#函數在沒有執行前,內部代碼不執行
#li[0]是個函數
#執行第一個函數()
#返回值是???
r
=
li[
0
]()
print
(r)
|
咱們一塊兒分析一下這個程序的結果是什麼,這用到了咱們上面補充的做用域知識,咱們能夠先將lambda函數修改爲正常函數的方式,在一步一步分析:
1
2
3
4
5
6
7
8
9
|
#!/usr/bin/env python
#-*- coding:utf-8 -*-
for
x
in
range
(
10
):
def
test():
return
x
ret
=
test()
print
ret
|
由於python的做用域爲函數,在函數中爲局部做用域,定義在函數外的爲全局做用域,由於python做用域在代碼執行以前已經肯定,原定義的那個做用域,就去那個做用域裏找,這裏的結果爲9。
今天就介紹到這裏,咱們今天主要介紹了I/O多路複用的知識和多線程的定義,雖然I/O多路複用在咱們日常寫代碼的時候用的比較少,但咱們理解了後,方便咱們之後去讀懂源碼。