Python Day10

進程

在python中multiprocess模塊提供了Process類,實現進程相關的功能。可是,因爲它是基於fork機制的,所以不被windows平臺支持。想要在windows中運行,必須使用if __name__ == '__main__':的方式,顯然這隻能用於調試和學習,不能用於實際環境。python

下面是一個簡單的多進程例子git

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

顯示單獨的進程IDgithub

from multiprocessing import Process
import os
 
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("\n\n")
 
def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)
 
if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

進程間通信

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法
Queues

使用方法跟threading裏的queue差很少編程

from multiprocessing import Process, Queue
 
def f(q):
    q.put([42, None, 'hello'])
 
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
Pipes

利用管道來實現數據交換windows

from multiprocessing import Process, Pipe
 
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
Managers

使用Managers來共享數據數組

from multiprocessing import Process, Manager
import os
 
def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)
 
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
 
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)

進程同步

爲了防止和多線程同樣的出現數據搶奪和髒數據的問題,一樣須要設置進程鎖。與threading相似,在multiprocessing裏也有同名的鎖類RLock, Lock, Event, Condition, Semaphore,連用法都是同樣的!服務器

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
 
if __name__ == '__main__':
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。網絡

進程池中有兩個方法:數據結構

  • apply
  • apply_async
from  multiprocessing import Process, Pool,freeze_support
import time
import os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ == '__main__':
    #freeze_support()
    pool = Pool(processes=5) #容許進程池同時放入5個進程
    print("主進程",os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回調
        #pool.apply(func=Foo, args=(i,)) #串行
        #pool.apply_async(func=Foo, args=(i,)) #串行
    print('end')
    pool.close()
    pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。.join()

協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。多線程

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

協程的好處:
  • 無需線程上下文切換的開銷
  • 無需原子操做鎖定及同步的開銷
  • 方便切換控制流,簡化編程模型
  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。
缺點:
  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子 

import time
import queue
def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        #time.sleep(1)
 
def producer():
 
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n +=1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
 
 
if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

Greenlet

from greenlet import greenlet
  
  
def test1():
    print 12
    gr2.switch()
    print 34
    gr2.switch()
  
  
def test2():
    print 56
    gr1.switch()
    print 78
  
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

import gevent
 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

輸出:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

遇到IO阻塞時會自動切換任務

from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all() #把當前程序的全部的io操做給我單獨的作上標記

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

urls = ['https://www.python.org/',
        'https://www.yahoo.com/',
        'https://github.com/' ]
time_start = time.time()
for url in urls:
    f(url)
print("同步cost",time.time() - time_start)
async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])
print("異步cost",time.time() - async_time_start)
經過gevent實現單線程下的多socket併發
import sys
import socket
import time
import gevent

from gevent import socket, monkey

monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8001)

論事件驅動與異步IO

一般,咱們寫服務器處理模型的程序時,有如下幾種模型:

(1)每收到一個請求,建立一個新的進程,來處理該請求;

(2)每收到一個請求,建立一個新的線程,來處理該請求;

(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求

上面的幾種方式,各有千秋,

第(1)中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。

第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。

第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。

綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式

Select\Poll\Epoll異步IO 

select

select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。

另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。

poll

poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。

poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。

另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。

epoll

直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。

epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

select 多併發socket 例子
import select
import socket
import queue


server = socket.socket()
server.bind(('localhost',9000))
server.listen(1000)


server.setblocking(False) #不阻塞

msg_dic = {}

inputs = [server,]
#inputs = [server,conn] #[conn,]
#inputs = [server,conn,conn2] #[conn2,]
outputs = [] #
#outputs = [r1,] #
while True:
    readable ,writeable,exceptional= select.select(inputs, outputs, inputs )
    print(readable,writeable,exceptional)
    for r in readable:
        if r is server: #表明來了一個新鏈接
            conn,addr = server.accept()
            print("來了個新鏈接",addr)
            inputs.append(conn) #是由於這個新創建的鏈接還沒發數據過來,如今就接收的話程序就報錯了,
            #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn
            msg_dic[conn] = queue.Queue() #初始化一個隊列,後面存要返回給這個客戶端的數據
        else: #conn2
            data = r.recv(1024)
            print("收到數據",data)
            msg_dic[r].put(data)

            outputs.append(r) #放入返回的鏈接隊列裏
            # r.send(data)
            # print("send done....")

    for w in writeable: #要返回給客戶端的鏈接列表
        data_to_client = msg_dic[w].get()
        w.send(data_to_client) #返回給客戶端源數據

        outputs.remove(w) #確保下次循環的時候writeable,不返回這個已經處理完的鏈接了

    for e in exceptional:
        if e in outputs:
            outputs.remove(e)

        inputs.remove(e)

        del msg_dic[e]
相關文章
相關標籤/搜索