本節內容:python
啓動多個進程linux
進程中啓進程git
父進程與子進程github
進程間通訊web
不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法:編程
a) queueswindows
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Queue import queue import threading def f(qq): qq.put("hahaha123") if __name__ == '__main__': #q = queue.Queue() # 線程queue不能直接傳給子進程 q = Queue() p = Process(target=f, args=(q,)) #p = threading.Thread(target=f, args=(q,)) p.start() print(q.get()) p.join()
父進程克隆了一個Queue,將克隆的Queue交給了子進程,當一個Queue對數據進行修改時,會將修改後的Queue數據序列化到某一位置,另外一個Queue會從這個位置反序列化獲取數據,實現進程間的通訊緩存
b) Pipes安全
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def f(conn): conn.send("qqqqqq") conn.send("qqqqqq2") print("from parent:", conn.recv()) conn.close() if __name__ == '__main__': parent_conn, chile_conn = Pipe() p = Process(target=f, args=(chile_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) parent_conn.send("hehehhe") p.join()
c) Managers 實現進程間數據的共享,能夠同時修改,而不是數據的傳遞服務器
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong from multiprocessing import Process, Manager import os def f(d, l): d[os.getpid()] = os.getpid() l.append(os.getpid()) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() # 生成一個字典可在多個進程間共享和傳遞 l = manager.list()# 生成一個列表可在多個進程間共享和傳遞 p_list = [] for i in range(10): p = Process(target=f, args=(d, l,)) p.start() p_list.append(p) for res in p_list:# 等待結果 res.join() print(d) print(l)
進程同步
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process,Lock def f(l, i): l.acquire() print("hello world", i) l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
進程池(生產中經常使用)
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Pool, freeze_support import time, os def Foo(i): time.sleep(2) print("in process ", os.getpid()) return i def Bar(args): print("--->", args, os.getpid()) if __name__ == '__main__': #freeze_support() pool = Pool(processes=5) print("main_process:", os.getpid()) for i in range(10): #pool.apply(func=Foo, args=(i,)) #pool.apply_async(func=Foo, args=(i,)) pool.apply_async(func=Foo, args=(i,),callback=Bar) print('end') pool.close() pool.join() #進程池中進程執行完畢後再關閉,註釋後程序不等進程執行我那後就直接關閉了
協程,微線程
協程的好處:
缺點:
經過yield實現簡單的協程(單線程實現多併發效果):
#!/usr/bin/env python # -*- coding:utf-8 -*- def consumer(name): print("----->starting") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name, new_baozi)) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n += 1 print("\033[32;1m[producer]\033[0m is making baozi %s" % n) con.send(n) # con2.send(n) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
greenlet實現協程手動切換
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1.switch() gr1 = greenlet(test1)#啓動一個協程 gr2 = greenlet(test2) gr1.switch()
gevent實現協程自動切換
#!/usr/bin/env python # -*- coding:utf-8 -*- import gevent def foo(): print("1") gevent.sleep(1) # 模仿IO切換 print("2") def bar(): print("3") gevent.sleep(0) print("4") gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar) ])
協程大併發下載網頁(urllib模塊):
經過gevent調用urllib默認是阻塞的,加入monkey模塊,把全部的io操做加上標記實現並行操做
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong from gevent import monkey from urllib import request import gevent monkey.patch_all() #把當前程序的全部的io操做單獨作上標記 def f(url): print("Get: %s" % url) res = request.urlopen(url) data = res.read() print("%d bytes received from %s" % (len(data), url)) gevent.joinall([ gevent.spawn(f, "https://www.python.org/"), gevent.spawn(f, "https://www.yahoo.com/"), gevent.spawn(f, "https://github.com/") ])
經過gevent實現單線程下的socket併發
Server:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong import sys import socket import time import gevent from gevent import socket, monkey monkey.patch_all() host = "0.0.0.0" def server(port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, port)) s.listen(500) while True: conn, addr = s.accept() gevent.spawn(handle_request, conn) #前邊是函數,後邊是函數所需的參數 def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data.decode()) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as e: print("\033[31;1merr\033[0m", e) finally: conn.close() if __name__ == '__main__': server(5566)
Client:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong import socket host = "localhost" port = 5566 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((host, port)) while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) print("Recv:", repr(data)) s.close()
一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;
(2)每收到一個請求,建立一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。
綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式
事件驅動模型:
事件驅動大致思路:
a)有一個事件(消息)隊列
b)鼠標按下時, 往這個隊列中增長一個點擊事件(消息)
c)有個循環,部隊從隊列取出事件,根據不一樣的事件,調用不一樣的函數
d)事件(消息)通常各自保存各自的處理函數指針,這樣,每一個事件都有獨立的處理函數
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:
當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。
網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。
Select\Poll\Epoll異步IO
select 1)最多能維護1024個socket 2)不知道具體哪一個socket反回了數據
poll 去掉了默認文件連接數的限制
epoll 依然是io多路複用,(tornado)、(twisted)都用epoll, windows不支持epoll 能夠知道哪一個socket有數據,省去循環,此時數據還存放在內核態,須要用戶主動調用接收數據操做,若是此次用戶沒取數據,下次還繼續通知數據來了(水平觸發)
Linux環境下的network IO
在進行解釋以前,首先要說明幾個概念:
- 用戶空間和內核空間
- 進程切換
- 進程的阻塞
- 文件描述符
- 緩存 I/O
如今操做系統都是採用虛擬存儲器,那麼對32位操做系統而言,它的尋址空間(虛擬存儲空間)爲4G(2的32次方)。操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操心繫統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。針對linux操做系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。
爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換。所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。
從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化:
正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的
。
文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。
緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。
緩存 I/O 的缺點:
數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。
剛纔說了,對於一次IO訪問(以read舉例),數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。因此說,當一個read操做發生時,它會經歷兩個階段:
正式由於這兩個階段,linux系統產生了下面五種網絡模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路複用( IO multiplexing)
- 信號驅動 I/O( signal driven IO)
- 異步 I/O(asynchronous IO)
注:因爲signal driven IO在實際中並不經常使用,因此我這隻說起剩下的四種IO Model。
在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據(對於網絡IO來講,不少時候數據在一開始尚未到達。好比,尚未收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程須要等待,也就是說數據被拷貝到操做系統內核的緩衝區中是須要一個過程的。而在用戶進程這邊,整個進程會被阻塞(固然,是進程本身選擇的阻塞)。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。
因此,blocking IO的特色就是在IO執行的兩個階段都被block了。
linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:
當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。
因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。
IO multiplexing就是咱們說的select,poll,epoll,有些地方也稱這種IO方式爲event driven IO。select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。
當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。
這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。
因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)
在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
linux下的asynchronous IO其實用得不多。先看一下它的流程:
用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。
select,poll,epoll都是IO多路複用的機制。I/O多路複用就是經過一種機制,一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。(這裏囉嗦下)
1 |
|
select 函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用後select函數會阻塞,直到有描述副就緒(有數據 可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠 經過遍歷fdset,來找到就緒的描述符。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,但 是這樣也會形成效率的下降。
int poll (struct pollfd *fds, unsigned int nfds, int timeout);
不一樣與select使用三個位圖來表示三個fdset的方式,poll使用一個 pollfd的指針實現。
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events to watch */
short revents; /* returned events witnessed */
};
pollfd結構包含了要監視的event和發生的event,再也不使用select「參數-值」傳遞的方式。同時,pollfd並無最大數量限制(可是數量過大後性能也是會降低)。 和select函數同樣,poll返回後,須要輪詢pollfd來獲取就緒的描述符。
從上面看,select和poll都須要在返回後,經過遍歷文件描述符來獲取已經就緒的socket。事實上,同時鏈接的大量客戶端在一時刻可能只有不多的處於就緒狀態,所以隨着監視的描述符數量的增加,其效率也會線性降低。
epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。
一 epoll操做過程
epoll操做過程須要三個接口,分別以下:
int epoll_create(int size);//建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
1)int epoll_create(int size);
建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。
當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。
2)int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操做。
- epfd:是epoll_create()的返回值。
- op:表示op操做,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
- fd:是須要監聽的fd(文件描述符)
- epoll_event:是告訴內核須要監聽什麼事
3)int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。
select方法(實現socketserver)
傳遞給 select 的參數是這麼幾個列表,分別表示讀事件、寫事件和錯誤事件。 select 方法返回三個列表,其中包含知足條件的對象(讀、寫和異常)。
select 多併發socket 例子
server
#!/usr/bin/python # -*- coding:utf-8 -*- # Author:zhoujunlong import select import socket import queue server = socket.socket() server.setblocking(0) server_addr = ('localhost',10000) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] #本身也要監測呀,由於server自己也是個fd outputs = [] message_queues = {} while True: print("waiting for next event...") readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏 for s in readable: #每一個s就是一個socket if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了, #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀 #新鏈接進來了,接受這個鏈接 conn, client_addr = s.accept() print("new connection from",client_addr) conn.setblocking(0) inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接 #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到 #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的 message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送 else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了 #客戶端的數據過來了,在這接收 data = s.recv(1024) if data: print("收到來自[%s]的數據:" % s.getpeername()[0], data) message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端 if s not in outputs: outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端 else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀 print("客戶端斷開了",s) if s in outputs: outputs.remove(s) #清理已斷開的鏈接 inputs.remove(s) #清理已斷開的鏈接 del message_queues[s] ##清理已斷開的鏈接 for s in writeable: try : next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0], "queue is empty..") outputs.remove(s) else: print("sending msg to [%s]"%s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exeptional: print("handling exception for ",s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
client
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)
selectors模塊
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)