12.python進程\協程\異步IO

進程

Python中的多線程沒法利用多核優點 , 因此若是咱們想要充分地使用多核CPU的資源 , 那麼就只能靠多進程了html

multiprocessing模塊中提供了Process , Queue , Pipe , Lock , RLock , Event , Condition等組件 , 與threading模塊有不少類似之處python

1.建立進程

from multiprocessing import Process
import time

def func(name):
    time.sleep(2)
    print('hello',name)

if __name__ == '__main__':
    p= Process(target=func,args=('derek',))
    p.start()
    # p.join()
    print('end...')
View Code

2.進程間通信

(1)Queuegit

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換。進程間通訊有兩種主要形式 , 隊列和管道github

from multiprocessing import Process, Queue   #Queue是進程排列

def f(test):
    test.put('22')   #經過建立的子進程往隊列添加數據,實線父子進程交互

if __name__ == '__main__':
    q = Queue()      #父進程
    q.put("11")

    p = Process(target=f, args=(q,))   #子進程
    p.start()
    p.join()

    print("取到:",q.get_nowait())
    print("取到:",q.get_nowait())

#父進程在建立子進程的時候就把q克隆一份給子進程
#經過pickle序列化、反序列化,來達到兩個進程之間的交互



結果:
取到: 11
取到: 22
Queue

(2)Pipe(管道)編程

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).windows

from multiprocessing import Process, Pipe

def f(conn):
    conn.send('11')
    conn.send('22')
    print("from parent:",conn.recv())
    print("from parent:", conn.recv())
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()   #生成管道實例,能夠互相send()和recv()

    p = Process(target=f, args=(child_conn,))
    p.start()

    print(parent_conn.recv())      # prints "11"
    print(parent_conn.recv())      # prints "22"
    parent_conn.send("33")         # parent 發消息給 child
    parent_conn.send("44")
    p.join()
Pipe

3.Manager

進程之間是相互獨立的 ,Queue和pipe只是實現了數據交互,並沒實現數據共享,Manager能夠實現進程間數據共享 。多線程

Manager還支持進程中的不少操做 , 好比Condition , Lock , Namespace , Queue , RLock , Semaphore等併發

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] =os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #{} #生成一個字典,可在多個進程間共享和傳遞

        l = manager.list(range(5))     #生成一個列表,可在多個進程間共享和傳遞
        p_list = []
        for i in range(2):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list: #等待結果
            res.join()
        print(d)
        print(l)
View Code

4.lock

from multiprocessing import Process, Lock

def f(l, i):
    #l.acquire()
    print('hello world', i)
    #l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(100):
        Process(target=f, args=(lock, num)).start()     #要把lock傳到函數的參數l
        
#lock防止在屏幕上打印的時候會亂
lock

5.進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。app

進程池中有如下幾個主要方法:異步

  1. apply:從進程池裏取一個進程並執行
  2. apply_async:apply的異步版本
  3. terminate:馬上關閉線程池
  4. join:主進程等待全部子進程執行完畢,必須在close或terminate以後
  5. close:等待全部進程結束後,才關閉線程池
from  multiprocessing import Process, Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ == '__main__':    #多進程,必須加這一句(windows系統)
    pool = Pool(processes=3) #容許進程池同時放入3個進程
    print("主進程",os.getpid())
    
    for i in range(10):       
        pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回調,執行完Foo(),接着執行Bar()
        # pool.apply(func=Foo, args=(i,)) #串行
        
    print('end')
    pool.close()
    pool.join()   #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。必須先close(),再join()
Pool

協程

1.簡介

協程(Coroutine) : 是單線程下的併發 , 又稱微線程 , 纖程 . 協程是一種用戶態的輕量級線程 , 即協程有用戶本身控制調度

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態

使用協程的優缺點

優勢 :

  1. 協程的切換開銷更小 , 屬於程序級別的切換 , 更加輕量級
  2. 單線程內就能夠實現併發的效果 , 最大限度利用CPU

缺點 :

  1. 協程的本質是單線程下 , 沒法利用多核 , 能夠是一個程序開啓多個進程 , 每一個進程內開啓多個線程 , 每一個線程內開啓協程
  2. 協程指的是單個線程 , 於是一旦協程出現阻塞 將會阻塞整個線程

2.Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

手動切換

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()      #到這裏切換到gr2,執行test2()
    print(34)
    gr2.switch()      #切換到上次gr2運行的位置

def test2():
    print(56)
    gr1.switch()      #切換到上次gr1運行的位置
    print(78)

gr1 = greenlet(test1)      #啓動一個協程gr1
gr2 = greenlet(test2)      #啓動一個協程gr2

gr1.switch()        #開始運行gr1
greenlet

3.Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。

(1)IO阻塞自動切換

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(2)
    print('阻塞時間最長,最後運行')

def bar():
    print('running in bar')
    gevent.sleep(1)
    print('foo()還在阻塞,這裏第二個運行')

def func3():
    print("running in func3 ")
    gevent.sleep(0)
    print("其它兩個還在IO阻塞先運行")

#建立協程實例
gevent.joinall([
    gevent.spawn(foo), #生成,
    gevent.spawn(bar),
    gevent.spawn(func3),
])

#遇到IO自動切換




結果:
Running in foo
running in bar
running in func3 
其它兩個還在IO阻塞先運行
foo()還在阻塞,這裏第二個運行
阻塞時間最長,最後運行

Process finished with exit code 0
View Code

 因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:

(2)爬蟲例子:

複製代碼
from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all() #做用:把當前程序的全部的io操做給我單獨的作上標記

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

#同步須要的時間
urls = ['https://www.python.org/',
        'https://www.yahoo.com/',
        'https://github.com/' ]
time_start = time.time()
for url in urls:
    f(url)
print("同步cost",time.time() - time_start)

#下面是異步花費的時間
async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])
print("異步cost",time.time() - async_time_start)


結果:
GET: https://www.python.org/
48954 bytes received from https://www.python.org/.
GET: https://www.yahoo.com/
491871 bytes received from https://www.yahoo.com/.
GET: https://github.com/
51595 bytes received from https://github.com/.
同步cost 4.928282260894775
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
48954 bytes received from https://www.python.org/.
494958 bytes received from https://www.yahoo.com/.
51599 bytes received from https://github.com/.
異步cost 1.4920852184295654
複製代碼

IO多路複用

詳解:http://www.cnblogs.com/alex3714/articles/5876749.html

selectors模塊

selectors基於select模塊實現IO多路複用,調用語句selectors.DefaultSelector(),特色是根據平臺自動選擇最佳IO多路複用機制,調用順序:epoll > poll > select

作一個socket servers

import selectors
import socket
sel = selectors.DefaultSelector()        # 根據平臺自動選擇最佳IO多路複用機制

def accept(sock, mask):
    conn, addr = sock.accept()           # Should be ready
    # print('accepted', conn, 'from', addr,mask)
    conn.setblocking(False)              #設置爲非阻塞IO
    sel.register(conn, selectors.EVENT_READ, read)
                                         #新鏈接註冊read回調函數
                                         #將conn和read函數註冊到一塊兒,當conn有變化時執行read函數

def read(conn, mask):
    data = conn.recv(1024)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)                  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 9999))
sock.listen(100)
sock.setblocking(False)             #設置爲非阻塞IO
sel.register(sock, selectors.EVENT_READ, accept)
                                    # 將sock和accept函數註冊到一塊兒,當sock有變化時執行accept函數

while True:
    events = sel.select()  #默認阻塞,有活動鏈接就返回活動的鏈接列表,監聽[(key1,mask1),(key2),(mask2)]

    for key, mask in events:
        callback = key.data                 #accept      #1 key.data就是accept   # 2 key.data就是read
        callback(key.fileobj, mask)         #key.fileobj=  文件句柄
                                            # 1 key.fileobj就是sock   # 2 key.fileobj就是conn
server
client
import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 9999)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(5)]
print(socks)
# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print( 'closing socket', s.getsockname() )
mutlti conn socket client
相關文章
相關標籤/搜索