1、以客戶端的形式同HTTP服務交互html
(1)使用urllib.request模塊發送HTTP GET請求node
from urllib import request,parse url = 'http://httpbin.org/get' parms = { 'name1':'value1', 'name2':'value2', } querystring = parse.urlencode(parms) u = request.urlopen(url+'?'+querystring) resp = u.read()
(2)發送HTTP POST請求python
url = 'http://httpbin.org/post' u = request.urlopen(url, querystring.encode('ascii')) resp = u.read()
(3)使用自定義HTTP頭web
headers = { 'User-agent':'none/ofyourbusiness', 'Spam':'Eggs' } req = request.Request(url, querystring.encode('ascii'), headers=headers) u = request.urlopen(req) resp = u.read()
(4)使用request庫,以多種方式從請求中返回響應結果的內容apache
import requests resp = requests.post(url, data=parms, headers=headers) text = resp.text
resp.text 獲得以Unicode解碼的響應文本;resp.content 獲得原始的二進制數據;resp.json 獲得JSON格式的響應內容json
(5)request庫發起HEAD請求,提供HTTP頭數據windows
requests.head('http://www.python.org/index.html') status = resp.status_code last_modified = resp.headers['last-modifued'] content_type = resp.headers['content-type'] content_length = resp.headers['content-length']
(6)request庫將第一個請求獲得的 http cookies傳遞給下一個請求數組
resp1 = requests.get(url)
requests.get(url, cookies=resp1.cookies)
(7)request庫實現內容上傳服務器
files = {'file':('data.csv',open('data.csv','rb'))} requests.post(url, files=files)
(8)用底層http.client模塊實現HEAD請求cookie
from http.client import HTTPConnection c = HTTPConnection('www.python.org', 80) c.request('HEAD', '/index.html') resp = c.getresponse() print('Status',resp.status) for name, value in resp.getheaders(): print(name, value)
(9)http://httpbin.org 這個站點會接受發出的請求,而後以JSON的形式將響應信息回傳回來。
import requests r = requests.get('http://httpbin.org/get?name=Dave&n=37', headers={'User-agent':'goaway/1.0'}) resp = r.json() print(resp['headers']) {'Accept': '*/*', 'User-Agent': 'goaway/1.0', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org'} print(resp['args']) {'name': 'Dave', 'n': '37'}
2、建立TCP服務器
利用socketserver庫實現
from socketserver import BaseRequestHandler, TCPServer class EchoHandle(BaseRequestHandler, TCPServer): def handle(self): print(' Got connenction from ', self.client_address) while True: msg = self.request.recv(8192) if not msg: break self.request.send(msg) if __name__ == '__main__': serv = TCPServer(('',20000), EchoHandle) serv.serve_forever()
request屬性就表明着底層的客戶端socket,而client_address中包含了客戶端的地址。
客戶端測試代碼:
>>> from socket import socket, AF_INET, SOCK_STREAM >>> s = socket(AF_INET, SOCK_STREAM) >>> s.connect(('localhost',20000)) >>> s.send(b'HEllo') 5 >>> s.recv(8192) b'HEllo'
使用StreamRequestHandler做爲基類,給底層的socket加上了文件類型的接口:
from socketserver import StreamRequestHandler, TCPServer class EchoHandle(StreamRequestHandler, TCPServer): def handle(self): print(' Got connection from ', self.client_address) for line in self.rfile: self.wfile.write(line) if __name__ == '__main__': serv = TCPServer(('', 20000), EchoHandle) serv.serve_forever()
實例化ForkingTCPServer或者ThreadingTCPServer對象處理多個客戶端
from socketserver import ThreadingTCPServer if __name__ == '__main__': serv = ThreadingTCPServer(('', 20000), EchoHandle) serv.serve_forever()
建立一個預先分配好的工做者線程或進程池。
if __name__ == '__main__': from threading import Thread NWORKERS = 16 serv = TCPServer(('', 20000), EchoHandle) for n in range(NWORKERS): t = Thread(target=serv.serve_forever) t.daemon = True t.start() serv.serve_forever()
TCPServer在實例化時會綁定並激活底層的socket。提供bind_and_activate參數調整底層socket的行爲。
if __name__ == '__main__': serv = TCPServer(('',20000), EchoHandle, bind_and_activate=False) serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) serv.server_bind() serv.server_activate() serv.serve_forever()
設置容許服務器從新對以前使用過的端口號進行綁定。
if __name__ == '__main__': TCPServer.allow_reuse_address = True serv = TCPServer(('', 20000), EchoHandle) serv.serve_forever()
使用socket庫實現服務器
from socket import socket, AF_INET, SOCK_STREAM def echo_handler(address, client_sock): print(' Got connection from {}'.format(address)) while True: msg = socket.recv(8192) if not msg: break socket.send(msg) socket.close() def echo_server(address, backlog=5): sock = socket(AF_INET, SOCK_STREAM) sock.bind(address) sock.listen(backlog) while True: client_sock, client_addr = sock.accept() echo_handler(client_addr, client_sock) if __name__ == '__main__': echo_server(('',20000))
3、建立一個UDP服務器
UDP通訊底層不須要簡歷鏈接,所以UDP是不可靠鏈接。處理消息丟失要引入序列號、重傳、超時以及其餘的機制來確保傳輸的可靠性。
UDP經常使用在對可靠性傳輸要求不高的應用中,如多媒體流應用以及遊戲中經常使用到UDP,由於應用中不會倒退回去試圖重傳某個丟失的數據包。
利用socketserver庫建立出UDP服務器
from socketserver import BaseRequestHandler, UDPServer import time class TimeHandler(BaseRequestHandler, UDPServer): def handle(self): print(' GOt connection from ',self.client_address) msg, sock = self.request resp = time.ctime() sock.sendto(resp.encode('ascii'), self.client_address) if __name__ == '__main__': serv = UDPServer(('',20000), TimeHandler) serv.serve_forever()
測試客戶端:
from socket import socket, AF_INET, SOCK_DGRAM s = socket(AF_INET, SOCK_DGRAM) s.sendto(b'', ('localhost',20000)) s.recvfrom(8192)
在UDP通訊中應該使用sendto()和recvfrom()方法。
UDPServer類也是單線程的,能夠實例化ForkingUDPServer或者ThreadingUDPServer
直接經過socket來實現UDP服務器:
from socket import socket, AF_INET, SOCK_DGRAM import time def time_server(address): sock = socket(AF_INET, SOCK_DGRAM) sock.bind(address) while True: sock.recvfrom(8192) print(' Got message from ',address) resp = time.ctime() sock.sendto(resp.encode('ascii'), address) if __name__ == '__main__': time_server(('', 20000))
4、從CIDR地址中生成IP地址範圍
(1)利用ipaddress模塊
>>> import ipaddress >>> net = ipaddress.ip_network('123.45.67.64/27') >>> net IPv4Network('123.45.67.64/27') >>> for a in net: ... print(a) ... 123.45.67.64 123.45.67.65 ... 123.45.67.95
(2)支持數組那樣的索引操做
>>> net[0]
IPv4Address('123.45.67.64')
>>> net[-1]
IPv4Address('123.45.67.95')
(3)檢查成員歸屬
>>> a = ipaddress.ip_address('123.45.67.69')
>>> a in net
True
(4)ip地址加上網絡號能夠用來指定一個ip接口interface
>>> inet = ipaddress.ip_interface('123.45.67.73/27')
>>> inet.network
IPv4Network('123.45.67.64/27')
>>> inet.ip
IPv4Address('123.45.67.73')
▲ ipaddress模塊同其餘網絡相關的模塊socket庫交互是由侷限性,須要經過str()轉換爲字符串。
5、基於REST風格的簡單接口
根據WSGI規範,建立一個小型庫
import cgi def notfound_404(environ, start_response): start_response('404 Not Found',[('Content-type', 'text/plain')]) return [b'Not Found'] class PathDispatcher: def __init__(self): self.pathmap = {} def __call__(self, environ, start_response): path = environ['PATH_INFO'] params = cgi.FieldStorage(environ['wsgi.input'], environ=environ) method = environ['REQUEST_METHOD'].lower() environ['params'] = {key:params.getvalue(key) for key in params} handler = self.pathmap.get((method,path), notfound_404) return handler(environ, start_response) def register(self, method, path, function): self.pathmap[method.lower(), path] = function return function import time _hello_resp = ''' <html> <head> <title>Hello {name}</title> </head> <body> <h1>Hello {name}!</h1> </body> </html> ''' def hello_world(environ, start_response): start_response('200 OK', [ ('Content-type','text/html')]) params = environ['params'] resp = _hello_resp.format(name=params.get('name')) yield resp.encode('utf-8') if __name__ == '__main__': from wsgiref.simple_server import make_server dispatcher = PathDispatcher() dispatcher.register('GET', '/hello', hello_world) httpd = make_server('', 8080, dispatcher) print('Serving on port 8080 ....') httpd.serve_forever()
在WSGI中,應用程序是以一個接受以下調用形式的可調用對象來實現的:
import cgi def wsgi_app(environ, start_response): ...
參數environ是一個字典,其中須要包含的值參考了許多web服務器好比apache所提供的CGI接口啓發。
def wsgi_app(environ, start_response): method = environ['REQUEST_METHOD'] path = environ['PATH_INFO'] params = cgi.FieldStorage(environ['wsgi.input'], environ=environ) ...
environ['REQUEST_METHOD'],表示請求的類型(GET、POST、HEAD等)
environ['PATH_INFO'],所請求資源的路徑
調用cgi.FieldStorage()能夠從請求中提取出所提供的查詢參數,並將它們放入到一個相似於字典的對象中以供稍後使用。
參數start_response是個函數,必須調用它才能發起響應。start_response的第一個參數是HTTP結果狀態。第二個參數是一個元組序列,以(name,value)這樣的形式組成響應的HTTP頭。
def wsgi_app(environ, start_response): ... start_response('200 OK', [ ('Content-type','text/html')])
要返回數據,知足WSGI規範的應用程序必須返回字節串序列。
def wsgi_app(environ, start_response): ... start_response('200 OK', [ ('Content-type','text/html')]) resp = [] resp.append(b'Hello World\n') return resp
還可使用yield做爲替代方案:
def wsgi_app(environ, start_response): ... start_response('200 OK', [ ('Content-type','text/html')]) yield b'Hello World\n'
返回的結果必須使用字節串的形式。
儘管遵循WSGI規範的應用程序一般都被定義爲函數,就如咱們的示例那樣,可是類實例一樣也是可行的,只要它實現了合適的__cal__()方法便可。
class WSGIApplication: def __init__(self): ... def __call__(self, environ, start_response): ...
6、利用XML-RPC實現簡單的遠端過程調用
在遠端機器上運行的Python程序中執行函數或者方法
from xmlrpc.server import SimpleXMLRPCServer class KeyValueServer: _rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys'] def __init__(self, address): self._data = {} self._serv = SimpleXMLRPCServer(address, allow_none=True) for name in self._rpc_methods_: self._serv.register_function(getattr(self, name)) def get(self, name): return self._data[name] def set(self, name, value): self._data[name] = value def delete(self, name): del self._data[name] def exists(self, name): return name in self._data def keys(self): return list(self._data) def serve_forever(self): self._serv.serve_forever() if __name__ == '__main__': kvserv = KeyValueServer(('', 15000)) kvserv.serve_forever()
客戶端遠程訪問服務器:
>>> from xmlrpc.client import ServerProxy >>> s = ServerProxy('http://localhost:15000',allow_none=True) >>> s.set('foo', 'bar') >>> s.set('spam', [1,2,3]) >>> s.keys() ['spam', 'foo'] >>> s.get('spam') [1, 2, 3] >>> s.get('foo') 'bar' >>> s.delete('spam') >>> s.exists('spam') False
配置一個簡單的遠端過程調用服務器,能夠用XML-RPC實現。
建立一個服務器實例,經過register_function()方法註冊處理函數,而後經過serve_forever()方法加載便可。
from xmlrpc.server import SimpleXMLRPCServer def add(x,y): return x+y serv = SimpleXMLRPCServer(('', 15000)) serv.register_function(add) serv.serve_forever()
經過XML-RPC傳遞一個實例
class Point: def __init__(self, x, y): self.x = x self.y = y >>> p = Point(2, 3) >>> s.set('foo', p) >>> s.get('foo') {'x':2, 'y':3}
對二進制數據的處理:
>>> s.set('foo', b'Hello World') >>> s.get('foo') <xmlrpc.client.Binary object at 0xxxxx> >>> _.data b'Hello World'
XML-RPC的缺點在於它的性能。SimpleXMLRPCServer是以單線程來實現的,因爲XML-RPC會將全部的數據序列化爲XML格式,所以就會比其餘的方法要慢一些。
7、在不一樣解釋間進行通訊
正在運行的多個Python解釋器實例,有可能仍是在不一樣的機器上。
使用multiprocessing.connection模塊。
from multiprocessing.connection import Listener import traceback def echo_client(conn): try: while True: msg = conn.recv() conn.send(msg) except EOFError: print('Connection closed') def echo_server(address, authkey): serv = Listener(address, authkey=authkey) while True: try: client = serv.accept() echo_client(client) except Exception: traceback.print_exc() echo_server(('', 25000), authkey=b'peekaboo')
客戶端鏈接到服務器上併發送各類消息:
>>> from multiprocessing.connection import Client >>> c = Client(('localhost', 25000), authkey=b'peekaboo') >>> c.send('hello') >>> c.recv() 'hello' >>> c.send(42) >>> c.recv() 42 >>> c.send([1,2,3,4,5]) >>> c.recv() [1, 2, 3, 4, 5]
這裏全部的消息都是完整的,對象都是經過pickle來進行序列化的。
所以,任何同pickle兼容的對象均可以再鏈接之間傳遞和接收。
若是,知道解釋器運行在同一臺機器上,那麼能夠利用網絡做爲替代方案,好比UNIX域socket或者windows上的命名管道。
要經過UNIX域socket建立鏈接,只要簡單地將地址改成文件名便可。
s = Listener('/tmp/myconn', authkey=b'peekboo') s = Listener(r'\\.\pipe\myconn', authkey=b'peekboo')
multiprocessing.connection模塊最好適用於能長時間運行的鏈接,而不是大量的短鏈接。
8、實現遠端過程調用
在Socket、mutiprocessing.connection或者ZeroMQ這樣的消息傳遞層之上實現簡單的遠端過程調用(RPC)
import pickle class RPCHandle: def __init__(self): self._function = {} def register_functions(self, func): self._function[func.__name__] = func def handle_connection(self, connection): try: func_name, args, kwargs = pickle.loads(connection.recv()) try: result = self._function[func_name](*args,**kwargs) connection.send(pickle.dumps(result)) except Exception as e: connection.send(pickle.dumps(e)) except EOFError: pass from multiprocessing.connection import Listener from threading import Thread def rpc_server(handler, address, authkey): sock = Listener(address, authkey=authkey) while True: client= sock.accept() t = Thread(target=handler.handle_connection, args=(client,)) t.daemon = True t.start() def add(x, y): return x + y def sub(x, y): return x - y handler = RPCHandle() handler.register_functions(add) handler.register_functions(sub) rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')
要在遠端的客戶端中訪問這個服務器須要建立一個相應的RPC代理類來轉發請求。
from multiprocessing.connection import Listener import pickle class RPCProxy: def __init__(self, connection): self._connection = connection def __getattr__(self, name): def do_rpc(*args,**kwargs): self._connection.send(pickle.dumps((name,args,kwargs))) result = pickle.loads(self._connection.recv()) if isinstance(result, Exception): raise result return result return do_rpc
客戶端測試:
>>> from multiprocessing.connection import Client >>> c = Client(('localhost', 17000), authkey=b'peekaboo') >>> proxy = RPCProxy(c) >>> proxy.add(2,3) 5
客戶端想要調用一個遠端函數,好比foo(1,2,z=3),代理類將建立出一個包含了函數名和參數的元組('foo',(1,2),{'z':3})。這個元組經pickle序列化處理後經過鏈接發送出去。
服務器端接收到消息後執行反序列化處理,而後檢查函數名是否已經註冊過了。若是是註冊過的函數,就用給定的參數調用該函數。
把獲得的結果(或者異常)進行pickle序列化處理而後再發送回去。
9、以簡單的方式驗證客戶端身份
利用hmac模塊實現一個我收鏈接來達到簡單且搞笑的身份驗證目的
import hmac import os def client_authenticate(connection, secret_key): message = connection.recv(32) hash = hmac.new(secret_key, message) digest = hash.digest() connection.send(digest) def server_authenticate(connection, secret_key): message = os.urandom(32) connection.send(message) hash = hmac.new(secret_key, message) digest = hash.digest() response = connection.recv(len(digest)) return hmac.compare_digest(digest,response)
在發起鏈接時,服務器將一段由隨機字節組成的消息發送給客戶端。客戶端和服務器經過hmac模塊以及雙方事先都知道的密鑰計算出隨機數據的加密hash。
客戶端發送它計算出的摘要值(digest)給服務器,而服務器對摘要值進行比較,以此來決定是要接受仍是拒絕這個鏈接。
對摘要值進行比較須要使用hmac.compare_digest()函數。
合併到socket服務器中使用
import hmac import os def client_authenticate(connection, secret_key): message = connection.recv(32) hash = hmac.new(secret_key, message) digest = hash.digest() connection.send(digest) def server_authenticate(connection, secret_key): message = os.urandom(32) connection.send(message) hash = hmac.new(secret_key, message) digest = hash.digest() response = connection.recv(len(digest)) return hmac.compare_digest(digest,response) from socket import socket, AF_INET, SOCK_STREAM authkey = b'peekaboo' def echo_handler(client_sock): print(client_sock) if not server_authenticate(client_sock, authkey): client_sock.close() return while True: msg = client_sock.recv(8192) if not msg: break client_sock.sendall(msg) def echo_server(address): s = socket(AF_INET, SOCK_STREAM) s.bind(address) s.listen(5) while True: c,a = s.accept() echo_handler(c) echo_server(('', 18000))
客戶端測試代碼:
from socket import socket, AF_INET, SOCK_STREAM secret_key = b'peekaboo' s = socket(AF_INET, SOCK_STREAM) s.connect(('localhost',18000)) client_authenticate(s, secret_key) s.send(b'Hello World') resp=s.recv(1024) print(resp)
在內部消息系統以及進程間通訊中經常會用到hmac來驗證身份。如實現跨集羣的多進程間通訊,確保只有得到許可的進程才能互相通訊。
在multiprocessing庫中,當同子進程創建通訊時在內部也是使用基於hmac的身份驗證方式。
在通過驗證的鏈接上後續的通訊都是以明文發送的。
10、爲網絡服務增長SSL支持
經過socket實現一個網絡服務,要求服務器端和客戶端能夠經過SSL實現身份驗證,而且對傳輸的數據進行加密。
ssl模塊爲底層的socket鏈接添加對SSL的支持。ssl.wrap_socket()函數可接受一個已有的socket,併爲其包裝一個SSL層。
from socket import socket, AF_INET, SOCK_STREAM import ssl KEYFILE = 'server_key.pem' # private key of the server CERTFILE = 'server_cert.pem' # server certificate (given to client) def echo_client(s): while True: msg = s.recv(8192) if not msg: break s.sendall(msg) s.close() print('Connection closed ///') def echo_server(address): s = socket(AF_INET, SOCK_STREAM) s.bind(address) s.listen(1) s_ssl = ssl.wrap_socket(s, keyfile=KEYFILE, certfile=CERTFILE, server_side=True) while True: try: c,a = s_ssl.accept() print('Got connection', c, a) echo_client(c) except Exception as e: print('{} {}'.format(e.__class__.__name__, e)) echo_server(('', 17899))
客戶端鏈接到服務器:客戶端要求服務器初始本身的整數並完成驗證。
>>> from socket import socket, AF_INET, SOCK_STREAM >>> import ssl >>> s = socket(AF_INET, SOCK_STREAM) >>> s_ssl = ssl.wrap_socket(s,cert_reqs=ssl.CERT_REQUIRED,ca_certs = 'server_cert.pem') >>> s_ssl.connect(('localhost', 17899)) >>> s_ssl.send(b'Hello World !!!!') 16 >>> s_ssl.recv(8192) b'Hello World !!!!'
底層socket技巧帶來的問題在於,沒法和已經經過標準庫實現的網絡服務很好的結合在一塊兒。
大部分的服務器端代碼是基於socketserver 。
對服務器端能夠經過混入類來添加對SSL的支持。
import ssl class SSLMixin: ''' Mixin class that adds support for SSL to existing servers based on the socketserver module. ''' def __init__(self, *args, keyfile=None, certfile=None, ca_certs=None, cert_reqs=ssl.CERT_NONE, **kwargs): self._keyfile = keyfile self._certfile = certfile self._ca_certs = ca_certs self._cert_reqs = cert_reqs super().__init__(*args, **kwargs) def get_request(self): client, addr = super().get_request() client_ssl = ssl.wrap_socket(client, keyfile = self._keyfile, certfile = self._certfile, ca_certs = self._ca_certs, cert_reqs = self._cert_reqs, server_side = True) return client_ssl, addr from xmlrpc.server import SimpleXMLRPCServer class SSLSimpleXMLRPCServer(SSLMixin, SimpleXMLRPCServer): pass
class KeyValueServer: _rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys'] def __init__(self, *args, **kwargs): self._data = {} self._serv = SSLSimpleXMLRPCServer(*args, allow_none=True, **kwargs) for name in self._rpc_methods_: self._serv.register_function(getattr(self, name)) def get(self, name): return self._data[name] def set(self, name, value): self._data[name] = value def delete(self, name): del self._data[name] def exists(self, name): return name in self._data def keys(self): return list(self._data) def serve_forever(self): self._serv.serve_forever() if __name__ == '__main__': KEYFILE='server_key.pem' # Private key of the server CERTFILE='server_cert.pem' # Server certificate kvserv = KeyValueServer(('', 15001), keyfile=KEYFILE, certfile=CERTFILE) kvserv.serve_forever()
要使用這個服務器端,利用xmlrpc.client模塊來完成鏈接。指定https:便可
>>> import ssl >>> context = ssl._create_unverified_context() >>> s = ServerProxy('https://localhost:15001', allow_none=True, context=context) >>> s.set('foo','bar') >>> s.get('foo') 'bar'
SSL客戶端中問題在於如何執行額外的步驟來驗證服務器證書,或者向服務器展現客戶端的憑證。
(1)客戶端驗證 服務器端發來的證書:
from xmlrpc.client import SafeTransport, ServerProxy import ssl class VerifyCertSafeTransport(SafeTransport): def __init__(self, cafile, certfile=None, keyfile=None): SafeTransport.__init__(self) self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self._ssl_context.load_verify_locations(cafile) if certfile: self._ssl_context.load_cert_chain(certfile, keyfile) self._ssl_context.verify_mode = ssl.CERT_REQUIRED def make_connection(self, host): # Items in the passed dictionary are passed as keyword # arguments to the http.client.HTTPSConnection() constructor. # The context argument allows an ssl.SSLContext instance to # be passed with information about the SSL configuration s = super().make_connection((host, {'context': self._ssl_context})) return s # Create the client proxy s = ServerProxy('https://localhost:15000', transport=VerifyCertSafeTransport('server_cert.pem'), allow_none=True)
(2)服務器端驗證客戶端證書:
if __name__ == '__main__': KEYFILE='server_key.pem' # Private key of the server CERTFILE='server_cert.pem' # Server certificate CA_CERTS='client_cert.pem' # Certificates of accepted clients kvserv = KeyValueServer(('', 15000), keyfile=KEYFILE, certfile=CERTFILE, ca_certs=CA_CERTS, cert_reqs=ssl.CERT_REQUIRED, ) kvserv.serve_forever()
客戶端發出證書
# Create the client proxy s = ServerProxy('https://localhost:15000', transport=VerifyCertSafeTransport('server_cert.pem', 'client_cert.pem', 'client_key.pem'), allow_none=True)
須要先建立自簽名的證書:
openssl req -new -x509 -days 365 -nodes -out server_cert.pem -keyout server_key.pem
11、在進程間傳遞socket文件描述符
在進程間傳遞文件描述符,首選須要將進程鏈接在一塊兒。
一旦進程間的鏈接創建起來了,就可使用multiprocessing.reduction 模塊中的send_handle()和recv_handle()函數來在進程之間傳送文件描述符了。
import multiprocessing from multiprocessing.reduction import recv_handle,send_handle import socket def worker(in_p,out_p): out_p.close() fd = recv_handle(in_p) print(' ChILD GOT FD',fd) with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as s: while True: msg = s.recv(1024) if not msg: break s.send(msg) def server(address, in_p, out_p, worker_pid): in_p.close() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) s.bind(address) s.listen(1) while True: client, address = s.accept() send_handle(out_p, client.fileno(), worker_pid) client.close() if __name__ == '__main__': c1, c2=multiprocessing.Pipe() worker_p = multiprocessing.Process(target=worker,args=(c1,c2)) worker_p.start() server_p = multiprocessing.Process(target=server,args=(('',20000), c1, c2, worker_p.pid)) server_p.start() c1.close() c2.close()
示例中生成了兩個進程,而且利用multiprocessing模塊的Pipe對象將它們鏈接在一塊兒。
服務器進程打開一個socket並等待客戶端的鏈接。
工做者進程只是經過recv_handle()在管道上等待接受文件描述符。
當服務器接收到一條鏈接時,會將獲得的socket文件描述符經過send_handle()發送給工做者進程。
工做者進程接管這個socket並將數據回顯給客戶端直到鏈接關閉爲止。
▲ 服務器端接收到的客戶端socket其實是有另外一個進程去處理的。服務器僅僅只是將它轉手出去,關閉它而後等待下一個鏈接。
將服務器和工做者進程實現爲徹底的分離:
# servermp.py from multiprocessing.connection import Listener from multiprocessing.reduction import send_handle import socket def server(work_address, port): # Wait for the worker to connect work_serv = Listener(work_address, authkey=b'peekaboo') worker = work_serv.accept() worker_pid = worker.recv() # Now run a TCP/IP server and send clients to worker s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) s.bind(('', port)) s.listen(1) while True: client, addr = s.accept() print('SERVER: Got connection from', addr) send_handle(worker, client.fileno(), worker_pid) client.close() if __name__ == '__main__': import sys if len(sys.argv) != 3: print('Usage: server.py server_address port', file=sys.stderr) raise SystemExit(1) server(sys.argv[1], int(sys.argv[2]))
運行這個服務器,只須要執行 python3 servermp.py /tmp/servconn 15000 ,下面是相應的工做者代碼:
# workermp.py from multiprocessing.connection import Client from multiprocessing.reduction import recv_handle import os from socket import socket, AF_INET, SOCK_STREAM def worker(server_address): serv = Client(server_address, authkey=b'peekaboo') serv.send(os.getpid()) while True: fd = recv_handle(serv) print('WORKER: GOT FD', fd) with socket(AF_INET, SOCK_STREAM, fileno=fd) as client: while True: msg = client.recv(1024) if not msg: break print('WORKER: RECV {!r}'.format(msg)) client.send(msg) if __name__ == '__main__': import sys if len(sys.argv) != 2: print('Usage: worker.py server_address', file=sys.stderr) raise SystemExit(1) worker(sys.argv[1])
要運行工做者,執行執行命令 python3 workermp.py /tmp/servconn . 效果跟使用Pipe()例子是徹底同樣的。
文件描述符的傳遞會涉及到UNIX域套接字的建立和套接字的 sendmsg()
方法。 不過這種技術並不常見,下面是使用套接字來傳遞描述符的另一種實現:
# server.py import socket import struct def send_fd(sock, fd): ''' Send a single file descriptor. ''' sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('i', fd))]) ack = sock.recv(2) assert ack == b'OK' def server(work_address, port): # Wait for the worker to connect work_serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) work_serv.bind(work_address) work_serv.listen(1) worker, addr = work_serv.accept() # Now run a TCP/IP server and send clients to worker s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) s.bind(('',port)) s.listen(1) while True: client, addr = s.accept() print('SERVER: Got connection from', addr) send_fd(worker, client.fileno()) client.close() if __name__ == '__main__': import sys if len(sys.argv) != 3: print('Usage: server.py server_address port', file=sys.stderr) raise SystemExit(1) server(sys.argv[1], int(sys.argv[2]))
下面是使用套接字的工做者實現:
# worker.py import socket import struct def recv_fd(sock): ''' Receive a single file descriptor ''' msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(struct.calcsize('i'))) cmsg_level, cmsg_type, cmsg_data = ancdata[0] assert cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS sock.sendall(b'OK') return struct.unpack('i', cmsg_data)[0] def worker(server_address): serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) serv.connect(server_address) while True: fd = recv_fd(serv) print('WORKER: GOT FD', fd) with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as client: while True: msg = client.recv(1024) if not msg: break print('WORKER: RECV {!r}'.format(msg)) client.send(msg) if __name__ == '__main__': import sys if len(sys.argv) != 2: print('Usage: worker.py server_address', file=sys.stderr) raise SystemExit(1) worker(sys.argv[1])
12、事件驅動型I/O
事件驅動型I/O是一種將基本的I/O操做(讀和寫)轉換成事件的技術,而咱們必須在程序中去處理這種事件。
如:當在socket上接受到數據時,這就稱爲一個接收事件,由咱們提供的回調方法或者函數負責處理以此來響應這個事件。
一個事件驅動型框架可能會以一個基類做爲其實點,實現一系列基本的事件處理方法。
class EventHandler: def fileno(self): 'Return the associated file descriptor' raise NotImplemented('must implement') def wants_to_receive(self): 'Return True if receiving is allowed' return False def handle_receive(self): 'Perform the receive operation' pass def wants_to_send(self): 'Return True if sending is requested' return False def handle_send(self): 'Send outgoing data' pass
把這個類的實例插入到一個事件循環中。
import select def event_loop(handlers): while True: wants_recv = [h for h in handlers if h.wants_to_receive()] wants_send = [h for h in handlers if h.wants_to_send()] can_recv, can_send, _ = select.select(wants_recv, wants_send, []) for h in can_recv: h.handle_receive() for h in can_send: h.handle_send()
事件循環的核心在於select()調用,它會輪詢文件描述符檢查他們是否處於活躍狀態。
在調用select()以前,事件循環會簡單地查詢全部的處理方法,看它們是但願接收仍是發送數據。而後把查詢的結果以列表的方式提供給select()
結果就是,select()會返回已經在接受或發送事件上就緒的對象列表。對應的handle_receive()或者handle_send()方法就會被觸發執行。
(1)基於UDP的網絡服務:
import socket import time class UDPServer(EventHandler): def __init__(self, address): self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.bind(address) def fileno(self): return self.sock.fileno() def wants_to_receive(self): return True class UDPTimeServer(UDPServer): def handle_receive(self): msg, addr = self.sock.recvfrom(1) self.sock.sendto(time.ctime().encode('ascii'), addr) class UDPEchoServer(UDPServer): def handle_receive(self): msg, addr = self.sock.recvfrom(8192) self.sock.sendto(msg, addr) if __name__ == '__main__': handlers = [ UDPTimeServer(('',14000)), UDPEchoServer(('',15000)) ] event_loop(handlers)
測試代碼:
>>> from socket import * >>> s = socket(AF_INET, SOCK_DGRAM) >>> s.sendto(b'',('localhost',14000)) 0 >>> s.recvfrom(128) (b'Tue Sep 18 14:29:23 2012', ('127.0.0.1', 14000)) >>> s.sendto(b'Hello',('localhost',15000)) 5 >>> s.recvfrom(128) (b'Hello', ('127.0.0.1', 15000))
(2)基於TCP服務器:
class TCPServer(EventHandler): def __init__(self, address, client_handler, handler_list): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) self.sock.bind(address) self.sock.listen(1) self.client_handler = client_handler self.handler_list = handler_list def fileno(self): return self.sock.fileno() def wants_to_receive(self): return True def handle_receive(self): client, addr = self.sock.accept() # Add the client to the event loop's handler list self.handler_list.append(self.client_handler(client, self.handler_list)) class TCPClient(EventHandler): def __init__(self, sock, handler_list): self.sock = sock self.handler_list = handler_list self.outgoing = bytearray() def fileno(self): return self.sock.fileno() def close(self): self.sock.close() # Remove myself from the event loop's handler list self.handler_list.remove(self) def wants_to_send(self): return True if self.outgoing else False def handle_send(self): nsent = self.sock.send(self.outgoing) self.outgoing = self.outgoing[nsent:] class TCPEchoClient(TCPClient): def wants_to_receive(self): return True def handle_receive(self): data = self.sock.recv(8192) if not data: self.close() else: self.outgoing.extend(data) if __name__ == '__main__': handlers = [] handlers.append(TCPServer(('',16000), TCPEchoClient, handlers)) event_loop(handlers)
TCP例子的關鍵點是從處理器中列表增長和刪除客戶端的操做。
對每個鏈接,一個新的處理器被建立並加到列表中。當鏈接被關閉後,每一個客戶端負責將其從列表中刪除。
事件驅動型I/O的核心部分都是有一個循環來輪詢socket的活躍性並執行響應操做。
事件驅動型I/O優點在於:不使用線程和進程的條件下同時處理大量的鏈接。select()調用能夠用來監視成百上千個socket,而且針對他們中間發生的事件作出響應。
缺點在於沒有涉及真正的併發,若是任何一個事件處理方法阻塞了或者執行了一個耗時較長的計算,那麼會阻塞整個程序的執行進程。
對於阻塞型或者須要長時間運行的計算,能夠經過將任務發送給單獨的線程或者進程來解決。
經過concurrent.futures模塊來實現:
from concurrent.futures import ThreadPoolExecutor import os class ThreadPoolHandler(EventHandler): def __init__(self, nworkers): if os.name == 'posix': self.signal_done_sock, self.done_sock = socket.socketpair() else: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('127.0.0.1', 0)) server.listen(1) self.signal_done_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.signal_done_sock.connect(server.getsockname()) self.done_sock, _ = server.accept() server.close() self.pending = [] self.pool = ThreadPoolExecutor(nworkers) def fileno(self): return self.done_sock.fileno() # Callback that executes when the thread is done def _complete(self, callback, r): self.pending.append((callback, r.result())) self.signal_done_sock.send(b'x') # Run a function in a thread pool def run(self, func, args=(), kwargs={},*,callback): r = self.pool.submit(func, *args, **kwargs) r.add_done_callback(lambda r: self._complete(callback, r)) def wants_to_receive(self): return True # Run callback functions of completed work def handle_receive(self): # Invoke all pending callback functions for callback, result in self.pending: callback(result) self.done_sock.recv(1) self.pending = []
在代碼中,run()
方法被用來將工做提交給回調函數池,處理完成後被激發。
實際工做被提交給 ThreadPoolExecutor
實例。 不過一個難點是協調計算結果和事件循環,爲了解決它,咱們建立了一對socket並將其做爲某種信號量機制來使用。
當線程池完成工做後,它會執行類中的 _complete()
方法。
這個方法再某個socket上寫入字節以前會講掛起的回調函數和結果放入隊列中。 fileno()
方法返回另外的那個socket。
所以,這個字節被寫入時,它會通知事件循環, 而後 handle_receive()
方法被激活併爲全部以前提交的工做執行回調函數。
坦白講,說了這麼多連我本身都暈了。 下面是一個簡單的服務器,演示瞭如何使用線程池來實現耗時的計算:
# A really bad Fibonacci implementation def fib(n): if n < 2: return 1 else: return fib(n - 1) + fib(n - 2) class UDPFibServer(UDPServer): def handle_receive(self): msg, addr = self.sock.recvfrom(128) n = int(msg) pool.run(fib, (n,), callback=lambda r: self.respond(r, addr)) def respond(self, result, addr): self.sock.sendto(str(result).encode('ascii'), addr) if __name__ == '__main__': pool = ThreadPoolHandler(16) handlers = [ pool, UDPFibServer(('',16000))] event_loop(handlers)
運行這個服務器,而後試着用其它Python程序來測試它:
from socket import * sock = socket(AF_INET, SOCK_DGRAM) for x in range(40): sock.sendto(str(x).encode('ascii'), ('localhost', 16000)) resp = sock.recvfrom(8192) print(resp[0])
你應該能在不一樣窗口中重複的執行這個程序,而且不會影響到其餘程序,儘管當數字便愈來愈大時候它會變得愈來愈慢。
十3、發送和接收大型數組
經過網絡鏈接發送和接受連續數據的大型數組,並儘可能減小數據的複製操做。
利用 memoryviews
來發送和接受大數組:
def send_from(arr, dest): view = memoryview(arr).cast('B') while len(view): nsent = dest.send(view) view = view[nsent:] def recv_into(arr, source): view = memoryview(arr).cast('B') while len(view): nrecv = source.recv_into(view) view = view[nrecv:]
爲了測試程序,首先建立一個經過socket鏈接的服務器和客戶端程序:
>>> from socket import * >>> s = socket(AF_INET, SOCK_STREAM) >>> s.bind(('', 25000)) >>> s.listen(1) >>> c,a = s.accept() >>>
在客戶端(另一個解釋器中):
>>> from socket import * >>> c = socket(AF_INET, SOCK_STREAM) >>> c.connect(('localhost', 25000))
本節的目標是你能經過鏈接傳輸一個超大數組。這種狀況的話,能夠經過 array
模塊或 numpy
模塊來建立數組:
# Server >>> import numpy >>> a = numpy.arange(0.0, 50000000.0) >>> send_from(a, c) >>> # Client >>> import numpy >>> a = numpy.zeros(shape=50000000, dtype=float) >>> a[0:10] array([ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) >>> recv_into(a, c) >>> a[0:10] array([ 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]) >>>
在數據密集型分佈式計算和平行計算程序中,本身寫程序來實現發送/接受大量數據並不常見。
不過,要是你確實想這樣作,你可能須要將你的數據轉換成原始字節,以便給低層的網絡函數使用。
你可能還須要將數據切割成多個塊,由於大部分和網絡相關的函數並不能一次性發送或接受超大數據塊。
一種方法是使用某種機制序列化數據——可能將其轉換成一個字節字符串。
不過,這樣最終會建立數據的一個複製。 就算你只是零碎的作這些,你的代碼最終仍是會有大量的小型複製操做。
經過使用內存視圖展現了一些魔法操做。
本質上,一個內存視圖就是一個已存在數組的覆蓋層。
不只僅是那樣, 內存視圖還能以不一樣的方式轉換成不一樣類型來表現數據。 這個就是下面這個語句的目的:
>>> view = memoryview(arr).cast('B')
它接受一個數組 arr並將其轉換爲一個無符號字節的內存視圖。
這個視圖能被傳遞給socket相關函數, 好比 socket.send()
或 send.recv_into()
。
在內部,這些方法可以直接操做這個內存區域。例如,sock.send()
直接從內存中發生數據而不須要複製。 send.recv_into()
使用這個內存區域做爲接受操做的輸入緩衝區。
剩下的一個難點就是socket函數可能只操做部分數據。
一般來說,咱們得使用不少不一樣的 send()
和 recv_into()
來傳輸整個數組。
不用擔憂,每次操做後,視圖會經過發送或接受字節數量被切割成新的視圖。 新的視圖一樣也是內存覆蓋層。所以,仍是沒有任何的複製操做。
這裏有個問題就是接受者必須事先知道有多少數據要被髮送, 以便它能預分配一個數組或者確保它能將接受的數據放入一個已經存在的數組中。
若是沒辦法知道的話,發送者就得先將數據大小發送過來,而後再發送實際的數組數據。