多進程、協程、事件驅動及select poll epoll

目錄css

-多線程使用場景python

-多進程linux

 --簡單的一個多進程例子程序員

 --進程間數據的交互實現方法web

   ---經過Queues和Pipe能夠實現進程間數據的傳遞,可是不能實現數據的共享編程

   ---Queueswindows

    ---Pipe數組

    ---經過Manager能夠不一樣進程間實現數據的共享緩存

 --進程同步,即進程鎖安全

 --進程池

-協程

 --先用yield實現簡單的協程

 --Greenlet

 --Gevent

 --用協程gevent寫一個簡單併發爬網頁

-事件驅動

 --IO多路複用

   ---用戶空間和內核空間

    ---文件描述符fd

    ---緩存IO

 --IO模式

   ---阻塞I/O(blocking IO)

    ---非阻塞I/O

    ---I/O多路複用(IO multiplexing)

    ---異步I/O(asynchronous IO)

-關於select poll epoll

 --select

 --poll

 --epoll

 --以select方法爲例子進行理解

多線程的使用場景

IO操做不佔用CPU

計算佔用cpu

python多線程不適合cpu密集型操做的任務,適合IO操做密集型的任務

多進程

簡單的一個多進程例子:(用於理解對多線程方法的使用)

和線程的方法相似,下面是一個簡單的多進程代碼

 1 #AUTHOR:FAN
 2 import time,multiprocessing
 3 
 4 def run(name):
 5     time.sleep(2)
 6     print("hello",name)
 7 
 8 if __name__ =="__main__":
 9     for i in range(6):
10         p = multiprocessing.Process(target=run,args=("dean",))
11       p.start()

和以前學習的多線程結合在一塊兒使用,代碼以下:

 1 #AUTHOR:FAN
 2 
 3 import time,threading
 4 import multiprocessing
 5 
 6 def thread_run():
 7     print(threading.get_ident())   #這裏表示獲取線程id
 8 
 9 
10 def run(name):
11     time.sleep(2)
12     print("hello",name)
13     t=threading.Thread(target=thread_run)
14     t.start()
15 
16 if __name__ =="__main__":
17     for i in range(6):
18         p = multiprocessing.Process(target=run,args=("dean",))
19         p.start()

運行結果以下:

 1 D:\python35\python.exe D:/python培訓/s14/day10/進程與線程結合使用.py
 2 hello dean
 3 10008
 4 hello dean
 5 9276
 6 hello dean
 7 8096
 8 hello dean
 9 1308
10 hello dean
11 hello dean
12 10112
13 8032
14 
15 Process finished with exit code 0
View Code

接着咱們查看下面代碼:

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process
 4 import os
 5 
 6 
 7 def info(title):
 8     print(title)
 9     print('module name:', __name__)
10     print('parent process:', os.getppid())
11     print('process id:', os.getpid())
12     print("\n\n")
13 
14 
15 def f(name):
16     info('\033[31;1mcalled from child process function f\033[0m')
17     print('hello', name)
18 
19 if __name__ == '__main__':
20     info('\033[32;1mmain process line\033[0m')

運行結果以下:

 1 D:\python35\python.exe D:/python培訓/s14/day10/獲取進程id.py
 2 main process line
 3 module name: __main__
 4 parent process: 8368
 5 process id: 7664
 6 
 7 
 8 
 9 
10 Process finished with exit code 0
View Code

咱們這裏能夠看到父進程id:8368,而且會發現不管程序運行多少次都是這個,而後咱們在windows任務管理器查看發現這個是pycharm的進程id,以下圖:

這裏要記住:每個子進程都是由父進程啓動的

咱們將上面代碼中if __name__=」__main__」進行修改,以下:

1 if __name__ == '__main__':
2     info('\033[32;1mmain process line\033[0m')
3     p = Process(target=f, args=('bob',))
4     p.start()

運行結果以下:

進程間數據的交互,實現方法

經過Queues和Pipe能夠實現進程間數據的傳遞,可是不能實現數據的共享

 

不一樣進程間內存不是共享的,要想實現兩個進程間的數據交換,有一下方法:

Queues

使用方法和threading裏的queue使用差很少

先回憶一下線程之間的數據共享,經過下面代碼理解:

 

 1 #AUTHOR:FAN
 2 import threading
 3 import queue
 4 
 5 def func():
 6     q.put([22,"dean",'hello'])
 7 
 8 if __name__=="__main__":
 9     q = queue.Queue()
10     t = threading.Thread(target=func)
11     t.start()
12     print(q.get(q))

 

運行結果:

1 D:\python35\python.exe D:/python培訓/s14/day10/線程之間數據的共享.py
2 [22, 'dean', 'hello']
3 
4 Process finished with exit code 0
View Code

從上述代碼能夠看出線程之間的數據是共享的:父線程能夠訪問子線程放入的數據

若是是多進程之間呢?

將代碼進行修改以下,讓子進程調用父進程數據:

 

 1 from multiprocessing import Process
 2 import queue
 3 
 4 
 5 
 6 def f():
 7     q.put([11,None,"hello"])
 8 
 9 
10 if __name__=="__main__":
11     q = queue.Queue()
12     p = Process(target=f)
13     p.start()
14     print(q.get())

 

運行結果以下:

從這裏咱們也能夠看出子進程是訪問不到父進程的數據

咱們再次將代碼進行修改,寫f方法的時候直接將q給線程傳入,也就是,只有啓動線程,就自動傳入線程q,代碼以下:

 

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process
 4 import queue
 5 
 6 
 7 def f(data):
 8     data.put([11,None,"hello"])
 9 
10 if __name__=="__main__":
11     q = queue.Queue()   #切記這裏是線程q
12     p = Process(target=f,args=(q,))
13     p.start()
14     print(q.get())

 

運行結果以下:

這裏咱們須要知道:進程不能訪問線程q

因此咱們須要改爲進程,代碼以下:

 

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process,Queue
 4 
 5 
 6 def f(data):
 7     data.put([11,None,"hello"])
 8 
 9 if __name__=="__main__":
10     q = Queue() 這裏的q是進程q 11     p = Process(target=f,args=(q,))
12     p.start()
13     print(q.get())

 

運行結果以下:

1 D:\python35\python.exe D:/python培訓/s14/day10/子進程訪問父進程數據.py
2 [11, None, 'hello']
3 
4 Process finished with exit code 0
View Code

此次咱們就發如今父進程裏就能夠調用到子進程放入的數據

這裏咱們須要明白:這裏的q實際上是被克隆了一個q,而後將子線程序列化的內容傳入的克隆q,而後再反序列化給q,從而實現了進程之間數據的傳遞

 

Pipe

實現代碼例子:

 1 #AUTHOR:FAN
 2 
 3 from multiprocessing import Process,Pipe
 4 
 5 def f(conn):
 6     conn.send([22,None,"hello from child"])
 7     conn.send([22,None,"hello from child2"])
 8     print(conn.recv())
 9     conn.close()
10 
11 if __name__=="__main__":
12     left_conn,right_conn = Pipe()
13     p = Process(target=f,args=(right_conn,))
14     p.start()
15     print(left_conn.recv())
16     print(left_conn.recv())
17     left_conn.send("我是left_conn")

運行結果以下:

1 D:\python35\python.exe D:/python培訓/s14/day10/經過pipes實現進程間數據傳遞.py
2 [22, None, 'hello from child']
3 [22, None, 'hello from child2']
4 我是left_conn
5 
6 Process finished with exit code 0
View Code

對上面代碼分析:pip()會生成兩個值,上面的left_conn和right_conn,這就如同一條網線的兩頭,兩頭均可以發送和接收數據

 

經過Manager能夠不一樣進程間實現數據的共享

經過下面代碼進行理解:

 1 #AUTHOR:FAN
 2 from multiprocessing import Manager,Process
 3 import os
 4 
 5 def f(d,l):
 6     d[1]="1"
 7     d["2"] = 2
 8     d[0.25] = None
 9     l.append(os.getpid())
10     print(l)
11 
12 if __name__ == "__main__":
13     with Manager() as manager:  #這種方式和直接manager=Manager()同樣
14         d = manager.dict()  #生成一個字典,能夠在多個進程間共享
15         l = manager.list(range(5))  #生成一個列表,能夠在多個進程間共享
16         p_list = []
17         for i in range(10):
18             p = Process(target=f,args=(d,l))
19             p.start()
20             p_list.append(p)
21         for res in p_list:
22             res.join()
23 
24         print(d)
25         print(l)

運行結果以下:

 1 D:\python35\python.exe D:/python培訓/s14/day10/Manager實現進程間數據的共享.py
 2 [0, 1, 2, 3, 4, 9756]
 3 [0, 1, 2, 3, 4, 9756, 3352]
 4 [0, 1, 2, 3, 4, 9756, 3352, 9220]
 5 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736]
 6 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724]
 7 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860]
 8 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084]
 9 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452]
10 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376]
11 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376, 9952]
12 {0.25: None, 1: '1', '2': 2}
13 [0, 1, 2, 3, 4, 9756, 3352, 9220, 9736, 9724, 9860, 7084, 7452, 7376, 9952]
14 
15 Process finished with exit code 0
View Code

經過結果能夠看出已經實現了不一樣進程間數據的共享

進程同步,即進程鎖

 1 #AUTHOR:FAN
 2 from multiprocessing import Process, Lock
 3 
 4 
 5 def f(l, i):
 6     l.acquire()
 7     print('hello world', i)
 8     l.release()
 9 
10 if __name__ == '__main__':
11     lock = Lock()
12     for num in range(10):
13         Process(target=f, args=(lock, num)).start()

打印結果以下:

 1 D:\python35\python.exe D:/python培訓/s14/day10/進程鎖.py
 2 hello world 3
 3 hello world 2
 4 hello world 1
 5 hello world 0
 6 hello world 7
 7 hello world 6
 8 hello world 4
 9 hello world 5
10 hello world 9
11 hello world 8
12 
13 Process finished with exit code 0
View Code

可能會以爲這個加鎖沒有上面做用,實際上是這樣的,當在屏幕上打印這些內容的時候,不一樣進程之間是共享這個屏幕的,鎖的做用在於當一個進程開始打印的時候,其餘線程不能打印,從而防止打印亂內容

在windows上可能看不到效果,當不一樣進程打印的東西比較多的時候,就能夠看到打印數據出現亂的狀況

 

進程池

 

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

 

進程池中有兩個方法:

 

apply

 

apply_async(這個就表示異步

 

從下面代碼一點一點分析

 

 1 #AUTHOR:FAN
 2 
 3 from  multiprocessing import Process, Pool
 4 import time
 5 import os
 6 
 7 
 8 def Foo(i):
 9     time.sleep(2)
10     print("in the process",os.getpid())
11     return i + 100
12 
13 
14 def Bar(arg):
15     print('-->exec done:', arg)
16 
17 if __name__ == "__main__":
18     pool = Pool(5)
19 
20     for i in range(10):
21         pool.apply(func=Foo, args=(i,))
22     print('end')
23     pool.close()
24     pool.join()  # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。

 

這樣運行結果發現,程序變成了串行了。

將上述代碼中的:

pool.apply(func=Foo, args=(i,))

 

替換爲:

pool.apply_async(func=Foo,args=(i,))

 

以後就解決了以前的的問題

 

這個時候咱們再次將

pool.apply_async(func=Foo,args=(i,))

 

替換爲,這裏的callback叫作回調函數

pool.apply_async(func=Foo, args=(i,), callback=Bar)

 

運行結果以下:

 1 D:\python35\python.exe D:/python培訓/s14/day10/進程池.py
 2 end
 3 in the process 10876
 4 -->exec done: 100
 5 in the process 5084
 6 -->exec done: 101
 7 in the process 9648
 8 -->exec done: 102
 9 in the process 11028
10 -->exec done: 103
11 in the process 8528
12 -->exec done: 104
13 in the process 10876
14 -->exec done: 105
15 in the process 5084
16 -->exec done: 106
17 in the process 9648
18 -->exec done: 107
19 in the process 11028
20 -->exec done: 108
21 in the process 8528
22 -->exec done: 109
23 
24 Process finished with exit code 0
View Code

下面將代碼進行修改,肯定回調函數是由子進程仍是主進程調用

 1 #AUTHOR:FAN
 2 
 3 from  multiprocessing import Process, Pool
 4 import time
 5 import os
 6 
 7 
 8 def Foo(i):
 9     time.sleep(2)
10     print("in the process",os.getpid())
11     return i + 100
12 
13 
14 def Bar(arg):
15     print('-->exec done:', arg,os.getpid())
16 
17 if __name__ == "__main__":
18     pool = Pool(5)
19     print(os.getpid())
20     for i in range(5):
21         pool.apply_async(func=Foo, args=(i,), callback=Bar)
22         #pool.apply(func=Foo, args=(i,))
23         #pool.apply_async(func=Foo,args=(i,))
24 
25     print('end')
26     pool.close()
27     pool.join()  # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。

運行結果以下,能夠看出回調函數的pid和主進程是同樣的

協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。

 

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

 

協程的好處:

無需線程上下文切換的開銷

無需原子操做鎖定及同步的開銷

方便切換控制流,簡化編程模型

高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

 

缺點:

沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。

進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

 

先用yield實現簡單的協程

 1 #AUTHOR:FAN
 2 
 3 import time
 4 import queue
 5 
 6 
 7 def consumer(name):
 8     print("--->starting eating baozi...")
 9     while True:
10         new_baozi = yield
11         print("[%s] is eating baozi %s" % (name, new_baozi))
12         time.sleep(1)
13 def producer():
14     r = con.__next__()
15     r = con2.__next__()
16     n = 0
17     while n < 5:
18         n += 1
19         con.send(n)
20         con2.send(n)
21         print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
22 if __name__ == '__main__':
23     con = consumer("c1")
24     con2 = consumer("c2")
25     p = producer()

運行結果以下:

 1 D:\python35\python.exe D:/python培訓/s14/day10/yield實現協程.py
 2 --->starting eating baozi...
 3 --->starting eating baozi...
 4 [c1] is eating baozi 1
 5 [c2] is eating baozi 1
 6 [producer] is making baozi 1
 7 [c1] is eating baozi 2
 8 [c2] is eating baozi 2
 9 [producer] is making baozi 2
10 [c1] is eating baozi 3
11 [c2] is eating baozi 3
12 [producer] is making baozi 3
13 [c1] is eating baozi 4
14 [c2] is eating baozi 4
15 [producer] is making baozi 4
16 [c1] is eating baozi 5
17 [c2] is eating baozi 5
18 [producer] is making baozi 5
19 
20 Process finished with exit code 0

Greenlet

 1 #AUTHOR:FAN
 2 
 3 from greenlet import greenlet
 4 
 5 def test1():
 6     print(10)
 7     gr2.switch()
 8     print(11)
 9     gr2.switch()
10 
11 
12 def test2():
13     print(12)
14     gr1.switch()
15     print(13)
16 
17 
18 gr1 = greenlet(test1) #啓動一個協程
19 gr2 = greenlet(test2)
20 gr1.switch()

這裏的gr1.switch()是手動切換

Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度.

經過下面代碼進行理解:

 1 import gevent
 2 
 3 
 4 def foo():
 5     print('Running in foo1')
 6     gevent.sleep(2)
 7     print('Running in foo2')
 8 
 9 def bar():
10     print('Running in bar1')
11     gevent.sleep(1)
12     print('Running in bar2')
13 
14 def func3():
15     print("running in func1")
16     gevent.sleep(0)
17     print("running in func2")
18 
19 
20 gevent.joinall([
21     gevent.spawn(foo),
22     gevent.spawn(bar),
23     gevent.spawn(func3),
24 ])

執行結果以下:

1 D:\python35\python.exe D:/python培訓/s14/day10/自動IO切換.py
2 Running in foo1
3 Running in bar1
4 running in func1
5 running in func2
6 Running in bar2
7 Running in foo2
8 
9 Process finished with exit code 0
View Code

從運行結果能夠看出,經過gevent.sleep()模擬執行IO操做,從而實現自動切換,程序最終花費的時間仍是2秒

 

用協程gevent寫一個簡單併發爬網頁

 1 #AUTHOR:FAN
 2 
 3 from urllib import request
 4 import gevent,time
 5 
 6 def f(url):
 7     print("get:%s" %url)
 8     resp = request.urlopen(url)
 9     data = resp.read()
10     print("%d bytes received from %s" %(len(data),url))
11 
12 
13 urls = ["http://sina.com.cn",
14         "http://www.cnblogs.com/",
15         "https://news.cnblogs.com/"
16 ]
17 
18 time_start = time.time()
19 for url in urls:
20     f(url)
21 
22 print("同步串行cost:",time.time()-time_start)
23 
24 async_time = time.time()
25 gevent.joinall([
26     gevent.spawn(f,"http://sina.com.cn"),
27     gevent.spawn(f,"http://www.cnblogs.com/"),
28     gevent.spawn(f,"https://news.cnblogs.com/")
29 ])
30 print("異步cost:",time.time()-async_time)

這樣的運行結果:

這裏能夠看出異步的時候和串行執行的時間基本同樣,其實這裏的異步並無起做用,由於這裏的gevent並不能識別出urllib執行時的IO操做,想要是gevent實現異步的方法是導入模塊:from gevent import monkey

將代碼進行修改以下:

 1 #AUTHOR:FAN
 2 
 3 from urllib import request
 4 import gevent,time
 5 from gevent import monkey  6 
 7 monkey.patch_all()  8 def f(url):
 9     print("get:%s" %url)
10     resp = request.urlopen(url)
11     data = resp.read()
12     print("%d bytes received from %s" %(len(data),url))
13 
14 
15 urls = ["http://sina.com.cn",
16         "http://www.cnblogs.com/",
17         "https://news.cnblogs.com/"
18 ]
19 
20 time_start = time.time()
21 for url in urls:
22     f(url)
23 
24 print("同步串行cost:",time.time()-time_start)
25 
26 async_time = time.time()
27 gevent.joinall([
28     gevent.spawn(f,"http://sina.com.cn"),
29     gevent.spawn(f,"http://www.cnblogs.com/"),
30     gevent.spawn(f,"https://news.cnblogs.com/")
31 ])
32 print("異步cost:",time.time()-async_time)

而後執行,結果以下:

事件驅動

一般,咱們寫服務器處理模型的程序時,有如下幾種模型:

(1)每收到一個請求,建立一個新的進程,來處理該請求;

(2)每收到一個請求,建立一個新的線程,來處理該請求;

(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求

上面的幾種方式,各有千秋,

第(1)中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。

第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。

第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。

綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式

目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:

1. 有一個事件(消息)隊列;

2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);

3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;

4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數

 

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

 

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

 

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

 

(1)程序中有許多任務

(2)任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)

(3)在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

IO多路複用

用戶空間和內核空間

操做系統都是採用虛擬存儲器,對於32位操做系統,它的尋址空間(虛擬存儲空間)爲4G。操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護內存空間,也有訪問底層硬件設備的全部權限,爲了保證用戶進程不能直接操做內核,保證內核的安全,操做系統將虛擬空間分爲兩部分:一部分爲內核空間,一部分是用戶空間,針對linux系統而言,將最高的1G字節給內核使用,稱爲內核空間,將3G字節的供各個進程使用,稱爲用戶空間

文件描述符fd

文件描述符是一個用於表述指向文件的引用的抽象化概念

文件描述符在形式上是一個非負整數,實際上,它是一個索引值,指內核爲每個進程所維護的進程打開文件的記錄的記錄表,當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。

緩存IO

緩存IO,也被稱爲標準IO,大多數文件系統默認IO操做都是緩存IO,在Linux的緩存IO機制中,操做系統會將IO的數據緩存在文件系統的頁緩存(page cache)中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間

緩存IO的缺點:

數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的CPU以及內存開銷是很是大的

IO模式

對於一次IO訪問(以read爲例子),數據會先拷貝到操做系統內核的緩衝區中,而後會從操做系統內核的緩衝區拷貝到應用程序的地址空間,也就是說當一個read操做發生時,它會經歷兩個階段:

1. 等待數據準備

2. 經數據從內核拷貝到進程

正是由於這兩個階段,linux系統產生了五種網絡模式的方案

1. 阻塞I/O(blocking IO)

2. 非阻塞I/O(nonblocking IO)

3. I/O多路複用(IO multiplexing)

4. 信號驅動I/O(signal driven IO)

5. 異步I/O(asynchromous IO)

注意:信號驅動I/O(signal driven IO)在實際中不經常使用

阻塞I/O(blocking IO)

在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據(對於網絡IO來講,不少時候數據在一開始尚未到達。好比,尚未收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程須要等待,也就是說數據被拷貝到操做系統內核的緩衝區中是須要一個過程的。而在用戶進程這邊,整個進程會被阻塞(固然,是進程本身選擇的阻塞)。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。

 

因此,blocking IO的特色就是在IO執行的兩個階段都被block了

非阻塞I/O

linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:

當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。

 

因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。

I/O多路複用(IO multiplexing)

IO multiplexing就是咱們說的select,poll,epoll,有些地方也稱這種IO方式爲event driven IO。select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。

 

當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。

 

因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。

 

這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。

 

因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)

 

在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

異步I/O(asynchronous IO)

Linux下的asynchronous IO其實用得不多。先看一下它的流程:

 

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

關於select poll epoll

select

sekect是經過一個select()系統調用來監視多個文件描述符,當select()返回後,該數組中就緒的文件描述符便會被該內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做

select的優勢就是支持跨平臺

缺點在於單個進程可以監視的文件描述符的數量存在最大限制

另外select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。

poll

和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制

poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。

另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。

 

epoll

epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。

 

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

 

另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知

 

以select方法爲例子進行理解

Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。

接下來經過echo server例子要以瞭解select 是如何經過單進程實現同時處理多個非阻塞的socket鏈接的

代碼以下:

 1 #AUTHOR:FAN
 2 
 3 import select
 4 import socket
 5 import queue
 6 server = socket.socket()
 7 server.bind(('127.0.0.1',9999))
 8 server.listen()
 9 
10 server.setblocking(False)#不阻塞
11 msg_dict = {}
12 inputs=[server,]
13 outputs=[]
14 
15 while True:
16     readable, writeable, exceptional = select.select(inputs, outputs, inputs)
17     print(readable, writeable, exceptional)
18     for r in readable:
19         if r is server:   #表明來了一個新鏈接
20             conn,addr = server.accept()
21             print("來了一個新鏈接:",addr)
22             inputs.append(conn)  #是由於這個新創建的鏈接還沒發數據過來,如今就接收的話程序就報錯了
23             #因此要想要實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn
24             msg_dict[conn] = queue.Queue() #初始化一個隊列,後面須要返回給這個客戶端的數據
25         else:
26             data = r.recv(1024)
27             print("收到數據:",data)
28             msg_dict[r].put(data)
29             outputs.append(r)  #放入返回的鏈接隊列裏
30 
31     for w in writeable:    #要返回給客戶端的鏈接列表
32         data_to_client = msg_dict[w].get()
33         w.send(data_to_client)  #返回給客戶端源數據
34         outputs.remove(w)   #確保下次循環的時候writeable,不能返回這個已經處理完的鏈接了
35     for e in exceptional:
36         if e in outputs:
37             outputs.remove(e)
38         inputs.remove(e)
39         del msg_dict[e]

其實上述的代碼相對來講是比較麻煩,python已經封裝了selectors模塊,而且這個模塊中包含了select和epoll,會根據系統自動識別(windows只支持select,linux是兩者都支持),默認用epoll

若是將上述代碼用selectors模塊的方式寫,代碼以下:

 

 1 #AUTHOR:FAN
 2 
 3 
 4 import selectors
 5 import socket
 6 
 7 sel = selectors.DefaultSelector()
 8 def accept(server,mask):
 9     conn,addr = server.accept()
10     print("一個新的鏈接",addr)
11     print(conn)
12     conn.setblocking(False)
13     sel.register(conn,selectors.EVENT_READ,read)  #新鏈接註冊read回調函數
14     print("done")
15 
16 def read(conn,mask):
17     print("ccc")
18     print("mask:",mask)
19     data = conn.recv(1024)
20     if data:
21         print(data)
22         conn.send(data)
23     else:
24         print("客戶端斷開鏈接")
25         sel.unregister(conn)
26         conn.close()
27 
28 server = socket.socket()
29 server.bind(('127.0.0.1',9999))
30 server.listen()
31 server.setblocking(False)
32 sel.register(server,selectors.EVENT_READ,accept)
33 
34 while True:
35     print("cccccccsssssss")
36     events = sel.select() #默認阻塞,有活動鏈接,有活動鏈接就返回活動的鏈接列表
37     print(events)
38     for key,mask in events:
39         print("key:%s    mask:%s"%(key,mask))
40         callback = key.data  #這裏就是回調函數及上述的accept
41         print("key.data:",key.data)
42         print("key.fileobj:",key.fileobj)
43         callback(key.fileobj,mask) #key.fileobj

 

咱們用客戶端模擬同時併發一萬去鏈接服務端

客戶端代碼以下:

 1 #AUTHOR:FAN
 2 
 3 
 4 import socket
 5 import sys
 6 
 7 messages = [ b'This is the message. ',
 8              b'It will be sent ',
 9              b'in parts.',
10              ]
11 server_address = ('192.168.8.102', 10000)
12 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(10000)
13           ]
14 print('connecting to %s port %s' % server_address)
15 for s in socks:
16     s.connect(server_address)
17 
18 for message in messages:
19     for s in socks:
20         print('%s: sending "%s"' % (s.getsockname(), message) )
21         s.send(message)
22     for s in socks:
23         data = s.recv(1024)
24         print( '%s: received "%s"' % (s.getsockname(), data) )
25         if not data:
26             print(sys.stderr, 'closing socket', s.getsockname() )

將服務端放到linux服務端,在本機執行客戶端,從而實現了上萬的併發

相關文章
相關標籤/搜索