在python中multiprocess模塊提供了Process類,實現進程相關的功能。可是,因爲它是基於fork機制的,所以不被windows平臺支持。想要在windows中運行,必須使用if __name__ == '__main__':的方式,顯然這隻能用於調試和學習,不能用於實際環境。python
下面是一個簡單的多進程例子git
from multiprocessing import Process import time def f(name): time.sleep(2) print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
顯示單獨的進程IDgithub
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join()
使用方法跟threading裏的queue差很少編程
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
利用管道來實現數據交換windows
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
使用Managers來共享數據數組
from multiprocessing import Process, Manager import os def f(d, l): d[os.getpid()] = os.getpid() l.append(os.getpid()) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
爲了防止和多線程同樣的出現數據搶奪和髒數據的問題,一樣須要設置進程鎖。與threading相似,在multiprocessing裏也有同名的鎖類RLock, Lock, Event, Condition, Semaphore,連用法都是同樣的!服務器
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。網絡
進程池中有兩個方法:數據結構
from multiprocessing import Process, Pool,freeze_support import time import os def Foo(i): time.sleep(2) print("in process",os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg,os.getpid()) if __name__ == '__main__': #freeze_support() pool = Pool(processes=5) #容許進程池同時放入5個進程 print("主進程",os.getpid()) for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回調 #pool.apply(func=Foo, args=(i,)) #串行 #pool.apply_async(func=Foo, args=(i,)) #串行 print('end') pool.close() pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。.join()
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。多線程
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
使用yield實現協程操做例子
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n +=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 gr2.switch() def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
輸出:
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
遇到IO阻塞時會自動切換任務
from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #把當前程序的全部的io操做給我單獨的作上標記 def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) urls = ['https://www.python.org/', 'https://www.yahoo.com/', 'https://github.com/' ] time_start = time.time() for url in urls: f(url) print("同步cost",time.time() - time_start) async_time_start = time.time() gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ]) print("異步cost",time.time() - async_time_start)
import sys import socket import time import gevent from gevent import socket, monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;
(2)每收到一個請求,建立一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。
綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。
epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
import select import socket import queue server = socket.socket() server.bind(('localhost',9000)) server.listen(1000) server.setblocking(False) #不阻塞 msg_dic = {} inputs = [server,] #inputs = [server,conn] #[conn,] #inputs = [server,conn,conn2] #[conn2,] outputs = [] # #outputs = [r1,] # while True: readable ,writeable,exceptional= select.select(inputs, outputs, inputs ) print(readable,writeable,exceptional) for r in readable: if r is server: #表明來了一個新鏈接 conn,addr = server.accept() print("來了個新鏈接",addr) inputs.append(conn) #是由於這個新創建的鏈接還沒發數據過來,如今就接收的話程序就報錯了, #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn msg_dic[conn] = queue.Queue() #初始化一個隊列,後面存要返回給這個客戶端的數據 else: #conn2 data = r.recv(1024) print("收到數據",data) msg_dic[r].put(data) outputs.append(r) #放入返回的鏈接隊列裏 # r.send(data) # print("send done....") for w in writeable: #要返回給客戶端的鏈接列表 data_to_client = msg_dic[w].get() w.send(data_to_client) #返回給客戶端源數據 outputs.remove(w) #確保下次循環的時候writeable,不返回這個已經處理完的鏈接了 for e in exceptional: if e in outputs: outputs.remove(e) inputs.remove(e) del msg_dic[e]