Python自學第十週(2)

協程
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的好處:
* 無需線程上下文切換的開銷
* 無需原子操做鎖定及同步的開銷
    *   "原子操做(atomic operation)是不須要synchronized",所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。
* 方便切換控制流,簡化編程模型
* 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。
缺點:
* 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
* 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序
 
 
 
一、yield
 1 #Author: Zachary
 2 import time
 3 import queue
 4  
 5 def consumer(name):
 6     print("--->starting eating baozi...")
 7     while True:
 8         new_baozi = yield
 9         print("[%s] is eating baozi %s" % (name, new_baozi))
10         # time.sleep(1)
11  
12 def producer():
13     r = con.__next__()
14     r = con2.__next__()
15     n = 0
16     while n < 5:
17         n += 1
18         print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
19         con.send(n)
20         con2.send(n)
21  
22 if __name__ == '__main__':
23     con = consumer("c1")
24     con2 = consumer("c2")
25     p = producer()
 
 
 
二、greenlet(封裝的協程)
 1 #Author: Zachary
 2 from greenlet import greenlet
 3 def test1():
 4     print(12)
 5     gr2.switch()
 6     print(34)
 7     gr2.switch()
 8  
 9 def test2():
10     print(56)
11     gr1.switch()
12     print(78)
13  
14 gr1 = greenlet(test1)    #啓動一個協程
15 gr2 = greenlet(test2)
16 gr1.switch()
switch會跳轉
 
 
 
三、Gevent
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
 
 1 #Author: Zachary
 2  
 3 import gevent
 4  
 5 def func1():
 6     print('\033[31;1m李闖在跟海濤搞...\033[0m')
 7     gevent.sleep(2)
 8     print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m')
 9  
10 def func2():
11     print('\033[32;1m李闖切換到了跟海龍搞...\033[0m')
12     gevent.sleep(1)
13     print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m')
14 def func3():
15     print("running func3")
16     gevent.sleep(0)
17     print("running func3 again")
18  
19 gevent.joinall([
20     gevent.spawn(func1),
21     gevent.spawn(func2),
22     gevent.spawn(func3),
23 ])
 
 
 
 
 
 
協程之爬蟲
 1 #Author: Zachary
 2 from urllib import request
 3 import time
 4 import gevent
 5 from gevent import monkey
 6 monkey.patch_all()
 7 def f(url):
 8     print("GET: %s" %url)
 9     resp = request.urlopen(url)
10     data = resp.read()
11     print("%d bytes received from %s"%(len(data),url))
12 urls = ['https://www.python.org/',
13         'https://www.yahoo.com/',
14         'https://github.com/'
15         ]
16 time_start = time.time()
17 for url in urls:
18     f(url)
19 print("同步cost",time.time() - time_start)
20 async_time_start = time.time()
21 gevent.joinall([
22         gevent.spawn(f, 'https://www.python.org/'),
23         gevent.spawn(f, 'https://www.yahoo.com/'),
24         gevent.spawn(f, 'https://github.com/'),
25 ])
26 print("異步cost",time.time() - async_time_start)
 
 
 
 
 
協程之socket
能夠實現異步
 
服務端
 1 #Author: Zachary
 2 import sys
 3 import socket
 4 import time
 5 import gevent
 6 from gevent import socket, monkey
 7 monkey.patch_all()
 8  
 9 def server(port):
10     s = socket.socket()
11     s.bind(('0.0.0.0', port))
12     s.listen(500)
13     while True:
14         cli, addr = s.accept()
15         gevent.spawn(handle_request, cli)
16  
17 def handle_request(conn):
18     try:
19         while True:
20             data = conn.recv(1024)
21             print("recv:", data)
22             conn.send(data)
23             if not data:
24                 conn.shutdown(socket.SHUT_WR)
25     except Exception as  ex:
26         print(ex)
27     finally:
28         conn.close()
29  
30 if __name__ == '__main__':
31     server(8001)
 
 
客戶端
 1 #Author: Zachary
 2 import socket
 3 HOST = 'localhost'  # The remote host
 4 PORT = 8001  # The same port as used by the server
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 s.connect((HOST, PORT))
 7 while True:
 8     msg = bytes(input(">>:"), encoding="utf8")
 9     s.sendall(msg)
10     data = s.recv(1024)
11     # print(data)
12     print('Received', repr(data))
13 s.close()
 
 
 
 
IO多路複用
 
 
 
 
 
能夠多看看上面這兩個
只有在非阻塞模式下才能實現IO多路複用
select代碼實現
 
 
服務端
 1 #Author: Zachary
 2 import select,socket,queue
 3  
 4 server = socket.socket()
 5 server.bind(('localhost',9000))
 6 server.listen(1024)
 7 server.setblocking(False)   #不阻塞
 8 inputs = [server,]
 9 outputs = []
10 while True:
11     readable,writeable,exceptional = select.select(inputs,outputs,inputs)
12     print(readable,writeable,exceptional)
13     for r in readable:
14         if r is server:   #表明來了一個新鏈接,創建鏈接
15             conn, addr = server.accept()
16             print("來了個新鏈接", addr)
17             inputs.append(conn)     #由於這個新創建的鏈接尚未發數據過來,如今就接收的話程序就報錯了
18             #因此想要實現這個客戶端發數據來時server端能知道,就須要讓select再檢測這個conn
19         else:
20             data = r.recv(1024)
21             print("收到數據>>>",data)
22             r.send(data)
23             print("send done")

 

 
 
客戶端
 1 #Author: Zachary
 2 import socket
 3 HOST = 'localhost'  # The remote host
 4 PORT = 9000  # The same port as used by the server
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 s.connect((HOST, PORT))
 7 while True:
 8     msg = bytes(input(">>:"), encoding="utf8")
 9     s.sendall(msg)
10     data = s.recv(1024)
11     # print(data)
12     print('Received', repr(data))
13 s.close()
 
能夠開啓多個客戶端
 
 
 
服務端
 1 #Author: Zachary
 2 import select,socket,queue
 3  
 4 server = socket.socket()
 5 server.bind(('localhost',9000))
 6 server.listen(1024)
 7 server.setblocking(False)   #不阻塞
 8 msg_dic = {}
 9 inputs = [server,]
10 outputs = []
11 while True:
12     readable,writeable,exceptional = select.select(inputs,outputs,inputs)
13     print(readable,writeable,exceptional)
14     for r in readable:
15         if r is server:   #表明來了一個新鏈接,創建鏈接
16             conn, addr = server.accept()
17             print("來了個新鏈接", addr)
18             inputs.append(conn)     #由於這個新創建的鏈接尚未發數據過來,如今就接收的話程序就報錯了
19             #因此想要實現這個客戶端發數據來時server端能知道,就須要讓select再檢測這個conn
20             msg_dic[conn] = queue.Queue()    #初始化一個隊列,後面存要返回給客戶端的數據
21         else:
22             data = r.recv(1024)
23             print("收到數據>>>",data)
24             msg_dic[r].put(data)
25             outputs.append(r)    #放入返回的連接隊列裏
26             # r.send(data)
27             # print("send done")
28     for w in writeable:  #要返回給客戶端的連接列表
29         data_to_client = msg_dic[w].get()
30         w.send(data_to_client)
31         outputs.remove(w)   #確保下次循環的時候writeable,不返回這個已經處理完的鏈接了
32     for e in exceptional:
33         if e in outputs:
34             outputs.remove(e)
35         inputs.remove(e)
36         del msg_dic[e]
客戶端不變
 
 
 
selectors模塊
 1 import selectors
 2 import socket
 3 sel = selectors.DefaultSelector()
 4 def accept(sock, mask):
 5     conn, addr = sock.accept()  # Should be ready
 6     print('accepted', conn, 'from', addr)
 7     conn.setblocking(False)
 8     sel.register(conn, selectors.EVENT_READ, read)
 9 def read(conn, mask):
10     data = conn.recv(1000)  # Should be ready
11     if data:
12         print('echoing', repr(data), 'to', conn)
13         conn.send(data)  # Hope it won't block
14     else:
15         print('closing', conn)
16         sel.unregister(conn)
17         conn.close()
18 sock = socket.socket()
19 sock.bind(('localhost', 10000))
20 sock.listen(100)
21 sock.setblocking(False)
22 sel.register(sock, selectors.EVENT_READ, accept)
23 while True:
24     events = sel.select()
25     for key, mask in events:
26         callback = key.data
27         callback(key.fileobj, mask)

 

 
 
客戶端可使用
 1 #Author: Zachary
 2 import socket
 3 import sys
 4 messages = [ b'This is the message. ',
 5              b'It will be sent ',
 6              b'in parts.',
 7              ]
 8 server_address = ('localhost', 9999)
 9 # Create a TCP/IP socket
10 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
11           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
12           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
13           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
14           ]
15 # Connect the socket to the port where the server is listening
16 print('connecting to %s port %s' % server_address)
17 for s in socks:
18     s.connect(server_address)
19 for message in messages:
20     # Send messages on both sockets
21     for s in socks:
22         print('%s: sending "%s"' % (s.getsockname(), message) )
23         s.send(message)
24     # Read responses on both sockets
25     for s in socks:
26         data = s.recv(1024)
27         print( '%s: received "%s"' % (s.getsockname(), data) )
28         if not data:
29             print(sys.stderr, 'closing socket', s.getsockname() )
30  
相關文章
相關標籤/搜索