萬物互聯之~網絡編程深刻篇

 

深刻篇

上節回顧:5種IO模型 | IO多路複用 and 萬物互聯之~網絡編程增強篇php

官方文檔:https://docs.python.org/3/library/internet.htmlhtml

總結篇:【總結篇】一文搞定網絡編程python

1.概念回顧

1.1.TCP三次握手

畫一張圖來通俗化講講TCP三次握手: 1.通俗化講git

用代碼來講,大概過程就是: 1.tcp3次握手.pnggithub

1.2.TCP四次揮手

畫圖通俗講下TCP四次揮手: 2.tcp4次揮手.pngweb

用代碼來講,大概過程就是: 2.tcp4次揮手2.png數據庫

其實這個也很好的解釋了以前的端口占用問題,若是是服務端先斷開鏈接,那麼服務器就是四次揮手的發送方,最後一次消息是得不到回覆的,端口就會保留一段時間(服務端的端口固定)也就會出現端口占用的狀況。若是是客戶端先斷開,那下次鏈接會自動換個端口,不影響(客戶端的端口是隨機分配的)編程

PS:以前咱們講端口就是send一個空消息,不少人不是很清楚,這邊簡單驗證下就懂了: 2.測試.gifjson

1.3.HTTP

以前其實已經寫了個簡版的Web服務器了,簡單回顧下流程:flask

  1. 輸入要訪問的網址,在回車的那一瞬間瀏覽器和服務器創建了TCP三次握手
  2. 而後瀏覽器send一個http的請求報文,服務器接recv以後進行相應的處理並返回對應的頁面
  3. 瀏覽器關閉頁面時(client close),進行了TCP四次揮手

而後簡單說下HTTP狀態碼

  1. 20x系列:服務器正常響應
  2. 30x系列:重定向
    • 301:表明永久重定向,瀏覽器下次訪問這個頁面就直接去目的url了(不推薦)
    • 302:臨時重定向,項目升級以後Url常常變,這個302常常用
      • eg:訪問baidu.com =302=> www.baidu.com
    • 304:這個是重定向到本地緩存(以前NodeJS說過就不詳細說了)
      • 服務器文件沒更新,瀏覽器就直接訪問本地緩存了
  3. 40x系列:通常都是客戶端請求有問題
    • eg: 404 not found
  4. 50x系列:通常都是服務端出問題了
    • eg:500 Server Error

2.動態服務器(WSGI

2.1.簡化版動態服務器

咱們先本身定義一個動態服務器:

import re
import socket

class HTTPServer(object):
    def __init__(self):
        with socket.socket() as tcp_server:
            tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            tcp_server.bind(('', 8080))
            tcp_server.listen()
            while True:
                self.client_socket, self.client_address = tcp_server.accept()
                self.handle()

    def response(self, status, body=None):
        print(status)
        header = f"HTTP/1.1 {status}\r\n\r\n"
        with self.client_socket:
            self.client_socket.send(header.encode("utf-8"))
            if body:
                self.client_socket.send(body)

    def __static_handler(self, name):
        try:
            with open(f"./www{name}", "rb") as fs:
                return fs.read()
        except Exception as ex:
            print(ex)
            return None

    def __dynamic_handler(self, name):
        try:
            m = __import__(name)
            return m.application().encode("utf-8")
        except Exception as ex:
            print(ex)
            return None

    def handle(self):
        with self.client_socket:
            print(f"[來自{self.client_address}的消息:]\n")
            data = self.client_socket.recv(2048)
            if data:
                header, _ = data.decode("utf-8").split("\r\n", 1)
                print(header)
                # GET /xxx HTTP/1.1
                ret = re.match("^\w+? (/[^ ]*) .+$", header)
                if ret:
                    url = ret.groups(1)[0]
                    # Python三元表達式(以前好像忘說了)
                    url = "/index.html" if url == "/" else url
                    print("請求url:", url)
                    body = str()
                    # 動態頁面
                    if ".py" in url:
                        # 提取模塊名(把開頭的/和.py排除)
                        body = self.__dynamic_handler(url[1:-3])
                    else:  # 靜態服務器
                        body = self.__static_handler(url)
                    # 根據返回的body內容,返回對應的響應碼
                    if body:
                        self.response("200 ok", body)
                    else:
                        self.response("404 Not Found")
                else:  # 匹配不到url(基本上不會發生,不排除惡意修改)
                    self.response((404, "404 Not Found"))

if __name__ == "__main__":
    import sys
    # 防止 __import__ 導入模塊的時候找不到,忘了能夠查看:
    # https://www.cnblogs.com/dotnetcrazy/p/9253087.html#5.本身添加模塊路徑
    sys.path.insert(1, "./www/bin")
    HTTPServer()

效果: 3.動態服務器.gif

代碼不難其中有個技術點說下:模塊名爲字符串怎麼導入

# test.py
# 若是模塊名是字符串,須要使用__import__
s = "time"
time = __import__(s)

def application():
    return time.ctime() # 返回字符串

if __name__ == "__main__":
    time_str = application()
    print(type(time_str))
    print(time_str)

輸出:

<class 'str'>
Thu Dec 20 22:48:07 2018

2.2.路由版動態服務器

和上面基本同樣,多了個路由表(self.router_urls)而已

import re
import socket

class HttpServer(object):
    def __init__(self):
        # 路由表
        self.router_urls = {"/test": "/test.py", "/user": "/test2.py"}

    def run(self):
        with socket.socket() as server:
            # 端口複用
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind(("", 8080))
            server.listen()
            while True:
                self.client_socket, self.client_address = server.accept()
                print(f"[{self.client_address}已上線]")
                self.handler()

    def response(self, status, body=None):
        with self.client_socket as socket:
            header = f"HTTP/1.1 {status}\r\n\r\n"
            socket.send(header.encode("utf-8"))
            if body:
                socket.send(body)

    def __static_handler(self, name):
        try:
            with open(f"./www{name}", "rb") as fs:
                return fs.read()
        except Exception as ex:
            print(ex)
            return None

    def __dynamic_handler(self, name):
        try:
            m = __import__(name)
            return m.application().encode("utf-8")
        except Exception as ex:
            print(ex)
            return None

    def handler(self):
        data = self.client_socket.recv(2048)
        if data:
            header, _ = data.decode("utf-8").split("\r\n", 1)
            # GET /xxx HTTP/1.1
            ret = re.match("^\w+? (/[^ ]*) .+$", header)
            if ret:
                url = ret.group(1)
                print(url)  # print url log
                body = None
                # 路由有記錄:動態頁面
                if url in self.router_urls.keys():
                    url = self.router_urls[url]
                    # 切片提取模塊名
                    body = self.__dynamic_handler(url[1:-3])
                else:  # 靜態服務器
                    if url == "/":
                        url = "/index.html"
                    body = self.__static_handler(url)
                # 沒有這個頁面或者出錯
                if body:
                    self.response("200 ok", body)
                else:
                    self.response("404 Not Found")
            else:
                # 404
                self.response("404 Not Found")
        else:
            print(f"{self.client_address}已下線")
            self.client_socket.close()

if __name__ == "__main__":
    import sys
    # 臨時添加模塊所在路徑
    sys.path.insert(1, "./www/bin")
    HttpServer().run()

輸出: 4.含路由的動態服務器.png

看一眼test2.py

# test2.py
def application():
    return "My Name Is XiaoMing"

if __name__ == "__main__":
    print(application())

2.3.官方接口版

官方文檔:https://docs.python.org/3/library/wsgiref.html

其實Python官方提供了一個WSGI:Web Server Gateway Interface的約定:

它只要求Web開發者實現一個application函數,就能夠響應HTTP請求

2.3.1演示

eg:(只要對應的python文件提供了 application(env,start_response) 方法就好了)

# hello.py
# env 是一個字典類型
def application(env, start_response):
    # 設置動態頁面的響應頭(回頭服務器會再加上本身的響應頭)
    # 列表裏面的 item 是 tuple
    start_response("200 OK", [("Content-Type", "text/html")])
    # 返回一個列表
    return ["<h1>This is Test!</h1>".encode("utf-8")]

先使用官方的簡單服務器看看:

from wsgiref.simple_server import make_server
# 導入咱們本身編寫的application函數:
from hello import application

# 建立一個服務器,端口是8080,處理函數是application:
httpd = make_server('', 8080, application)
print('Serving HTTP on port 8080...')
# 開始監聽HTTP請求:
httpd.serve_forever()

運行後效果:127.0.0.1:8080

This is Test!

若是把hello.py改爲下面代碼(服務端不變),那麼就能夠獲取一些請求信息了:

def application(env, start_response):
    print(env["PATH_INFO"])
    start_response("200 OK", [("Content-Type", "text/html")])
    return [f'<h1>Hello, {env["PATH_INFO"][1:] or "web"}!</h1>'.encode("utf-8")]

輸出: 5.env.png

2.3.2說明

上面的application()函數就是符合WSGI標準的一個HTTP處理函數,它接收兩個參數:

  1. environ:一個包含全部HTTP請求信息的dict對象;
  2. start_response:一個發送HTTP響應的函數(調用服務器定義的方法)
    • Header只能發送一次 ==> 只能調用一次start_response()函數

有了WSGI,咱們關心的就是如何從env這個dict對象拿到HTTP請求信息,而後構造HTML,經過start_response()發送Header,最後返回Body內容

Python內置了一個WSGI服務器,這個模塊叫wsgiref,它是用純Python編寫的WSGI服務器的參考實現(徹底符合WSGI標準,可是不考慮任何運行效率,僅供開發和測試使用)

PS:這樣的好處就是,只要符合WSGI規範的服務器,咱們均可以直接使用了

其實經過源碼就能夠知道這個WSGIServer究竟是何方神聖了:

class WSGIServer(HTTPServer):
    pass

# HTTPServer其實就是基於TCPServer
class HTTPServer(socketserver.TCPServer):
    pass

# 這個就是咱們開頭說的Python封裝的簡單WebServer了
class TCPServer(BaseServer):
    pass

若是仍是記不得能夠回顧下上次說的內容,提示:

__all__ = ["BaseServer", "TCPServer", "UDPServer",
           "ThreadingUDPServer", "ThreadingTCPServer",
           "BaseRequestHandler", "StreamRequestHandler",
           "DatagramRequestHandler", "ThreadingMixIn"]

若是你想要在這個基礎上進行處理,能夠和上面說的同樣,定義一個繼承class WSGIRequestHandler(BaseHTTPRequestHandler)的類,而後再處理

2.3.3.自定義

在本小節結束前咱們模仿一下示例,定義一個符合WSGI規範的簡單服務器:

import re
import socket
from index import WebFrame

class WSGIServer(object):
    def __init__(self):
        # 請求頭
        self.env = dict()
        # 存放處理後的響應頭
        self.response_headers = str()

    def run(self):
        with socket.socket() as server:
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind(("", 8080))
            server.listen()
            while True:
                self.client_socket, self.client_address = server.accept()
                self.handler()

    # 轉換瀏覽器請求頭格式
    def request_headers_handler(self, headers):
        # 過濾一下空字符串(不能過濾空列表)
        headers = list(filter(None, headers.split("\r\n")))
        # 提取 Method 和 Url
        ret = re.match("^([\w]+?) (/[^ ]*?) .+$", headers[0])
        if ret:
            self.env["method"] = ret.group(1)
            url = ret.group(2)
            print(url)
            self.env["path"] = "/index.html" if url == "/" else url
        else:
            return None
        # [['Host', ' localhost:8080'], ['Connection', ' keep-alive']...]
        array = map(lambda item: item.split(":", 1), headers[1:])
        for item in array:
            self.env[item[0].lower()] = item[1]
        # print(self.env)
        return "ok"

    # 響應客戶端(吐槽一下,request和response的headers爲毛格式不同,這設計真不合理!)
    def start_response(self, status, header_list=[]):
        # 響應頭
        self.response_headers = f"HTTP/1.1 {status}\r\n"
        for item in header_list:
            self.response_headers += f"{item[0]}:{item[1]}\r\n"
        # print(self.response_headers)

    # 響應瀏覽器
    def response(self, body):
        with self.client_socket as client:
            # 省略一系列服務器響應的headers
            self.response_headers += "server:WSGIServer\r\n\r\n"
            client.send(self.response_headers.encode("utf-8"))
            if body:
                client.send(body)

    def handler(self):
        with self.client_socket as client:
            data = client.recv(2048)
            if data:
                # 瀏覽器請求頭
                headers = data.decode("utf-8")
                if self.request_headers_handler(headers):
                    # 模仿php全部請求都一個文件處理
                    body = WebFrame().application(self.env,
                                                  self.start_response)
                    # 響應瀏覽器
                    self.response(body)
                else:
                    self.start_response("404 Not Found")
            else:
                client.close()

if __name__ == "__main__":
    WSGIServer().run()

本身定義的框架:

class WebFrame(object):
    def __init__(self):
        # 路由表
        self.router_urls = {"/time": "get_time", "/user": "get_name"}

    def get_time(self):
        import time
        return time.ctime().encode("utf-8")

    def get_name(self):
        return "<h2>My Name Is XiaoMing</h2>".encode("utf-8")

    def application(self, env, start_response):
        body = b""
        url = env["path"]
        # 請求的頁面都映射到路由對應的方法中
        if url in self.router_urls.keys():
            func = self.router_urls[url]
            body = getattr(self, func)()
        else:
            # 不然就請求對應的靜態資源
            try:
                with open(f"./www{url}", "rb") as fs:
                    body = fs.read()
            except Exception as ex:
                start_response("404 Not Found")
                print(ex)
                return b"404 Not Found"  # 出錯就直接返回了
        # 返回對應的頁面響應頭
        start_response("200 ok", [("Content-Type", "text/html"),
                                  ("Scripts", "Python")])
        return body

輸出: 6.wsgi.png

知識擴展:

從wsgiref模塊導入
https://docs.python.org/3/library/wsgiref.html

Python服務器網關接口
https://www.python.org/dev/peps/pep-3333/

Python原始套接字和流量嗅探
https://blog.csdn.net/cj1112/article/details/51303021
https://blog.csdn.net/peng314899581/article/details/78082244

【源碼閱讀】輕量級Web框架:bottle
https://github.com/bottlepy/bottle
 

3.RPC引入

上篇回顧:萬物互聯之~深刻篇

其餘專欄最新篇:協程增強之~兼容答疑篇 | 聊聊數據庫~SQL環境篇

Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/

3.1.概念

RPC(Remote Procedure Call):分佈式系統常見的一種通訊方法(遠程過程調用),通俗講:能夠一臺計算機的程序調用另外一臺計算機的子程序(能夠把它當作以前咱們說的進程間通訊,只不過這一次的進程不在同一臺PC上了)

PS:RPC的設計思想是力圖使遠程調用中的通信細節對於使用者透明,調用雙方無需關心網絡通信的具體實現

引用一張網上的圖: 1.rpc.png

HTTP有點類似,你能夠這樣理解:

  1. 老版本的HTTP/1.0是短連接,而RPC是長鏈接進行通訊
    • HTTP協議(header、body),RPC能夠採起HTTP協議,也能夠自定義二進制格式
  2. 後來HTTP/1.1支持了長鏈接(Connection:keep-alive),基本上和RPC差很少了
    • keep-alive通常都限制有最長時間,或者最多處理的請求數,而RPC是基於長鏈接的,基本上沒有這個限制
  3. 後來谷歌直接基於HTTP/2.0創建了gRPC,它們之間的基本上也就差很少了
    • 若是硬是要區分就是:HTTP-普通話RPC-方言的區別了
    • RPC高效而小衆,HTTP效率沒RPC高,但更通用
  4. PS:RPCHTTP調用不用通過中間件,而是端到端的直接數據交互
    • 網絡交互能夠理解爲基於Socket實現的(RPCHTTP都是Socket的讀寫操做)

簡單歸納一下RPC的優缺點就是:

  1. 優勢:
    1. 效率更高(能夠自定義二進制格式)
    2. 發起RPC調用的一方,在編寫代碼時可忽略RPC的具體實現(跟編寫本地函數調用通常
  2. 缺點:
    • 通用性不如HTTP(方言普及程度確定不如普通話),若是傳輸協議不是HTTP協議格式,調用雙方就須要專門實現通訊庫

PS:HTTP更可能是ClientServer的通信;RPC更可能是內部服務器間的通信

3.2.引入

上面說這麼多,可能尚未來個案例實在,咱們看個案例:

本地調用sum()

def sum(a, b):
    """return a+b"""
    return a + b

def main():
    result = sum(1, 2)
    print(f"1+2={result}")

if __name__ == "__main__":
    main()

輸出:(這個你們都知道)

1+2=3

1.xmlrpc案例

官方文檔:

https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html

都說RPC用起來就像本地調用同樣,那麼用起來啥樣呢?看個案例:

服務端:(CentOS7:192.168.36.123:50051)

from xmlrpc.server import SimpleXMLRPCServer

def sum(a, b):
    """return a+b"""
    return a + b

# PS:50051是gRPC默認端口
server = SimpleXMLRPCServer(('', 50051))
# 把函數註冊到RPC服務器中
server.register_function(sum)
print("Server啓動ing,Port:50051")
server.serve_forever()

客戶端:(Win10:192.168.36.144

from xmlrpc.client import ServerProxy

stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")

輸出:(Client用起來是否是和本地差很少?就是經過代理訪問了下RPCServer而已)

1+2=3

2.server.png

PS:CentOS服務器不是你綁定個端口就必定能訪問的,若是不能記讓防火牆開放對應的端口

這個以前在說MariaDB環境的時候有詳細說:http://www.javashuo.com/article/p-uxkudhsn-dw.html

# 添加 --permanent永久生效(沒有此參數重啓後失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent

2.ZeroRPC案例:

zeroRPC用起來和這個差很少,也簡單舉個例子吧:

把服務的某個方法註冊到RPCServer中,供外部服務調用

import zerorpc

class Test(object):
    def say_hi(self, name):
        return f"Hi,My Name is{name}"


# 註冊一個Test的實例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()

調用服務端代碼

import zerorpc

client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)

3.3.簡單版自定義RPC

看了上面的引入案例,是否是感受RPC不過如此?NoNoNo,要是真這麼簡單也就談不上RPC架構了,上面兩個是最簡單的RPC服務了,能夠這麼說:生產環境基本上用不到,只能當案例練習罷了,對Python來講,最經常使用的RPC就兩個gRPC and Thrift

PS:國產最出名的是Dubbo and Tars,Net最經常使用的是gRPCThriftSurging

1.RPC服務的流程

要本身實現一個RPC Server那麼就得了解整個流程了:

  1. Client(調用者)以本地調用的方式發起調用
  2. 經過RPC服務進行遠程過程調用(RPC的目標就是要把這些步驟都封裝起來,讓使用者感受不到這個過程)
    1. 客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議
    2. 客戶端的RPC Proxy組件在打包完成後經過網絡把數據包發送給RPC Server
    3. 服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數
    4. 服務端的RPC Proxy組件根據方法名和參數進行本地調用
    5. RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy
    6. 服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy組件
    7. 客戶端的RPC Proxy組件收到數據包後,進行拆包解碼,把數據返回給Client
  3. Client(調用者)獲得本次RPC調用的返回結果

用一張時序圖來描述下整個過程: 4.時序圖.png

PS:RPC Proxy有時候也叫Stub(存根):(Client Stub,Server Stub)

爲屏蔽客戶調用遠程主機上的對象,必須提供某種方式來模擬本地對象,這種本地對象稱爲存根(stub),存根負責接收本地方法調用,並將它們委派給各自的具體實現對象

PRC服務實現的過程當中其實就兩核心點:

  1. 消息協議:客戶端調用的參數和服務端的返回值這些在網絡上傳輸的數據以何種方式打包編碼和拆包解碼
    • 經典表明:Protocol Buffers
  2. 傳輸控制:在網絡中數據的收發傳輸控制具體如何實現(TCP/UDP/HTTP

2.手寫RPC

下面咱們就根據上面的流程來手寫一個簡單的RPC:

1.Client調用:

# client.py
from client_stub import ClientStub

def main():
    stub = ClientStub(("192.168.36.144", 50051))

    result = stub.get("sum", (1, 2))
    print(f"1+2={result}")

    result = stub.get("sum", (1.1, 2))
    print(f"1.1+2={result}")

    time_str = stub.get("get_time")
    print(time_str)

if __name__ == "__main__":
    main()

輸出:

1+2=3
1.1+2=3.1
Wed Jan 16 22

2.Client Stub,客戶端存根:(主要有打包解包、和RPC服務器通訊的方法)

# client_stub.py
import socket

class ClientStub(object):
    def __init__(self, address):
        """address ==> (ip,port)"""
        self.socket = socket.socket()
        self.socket.connect(address)

    def convert(self, obj):
        """根據類型轉換成對應的類型編號"""
        if isinstance(obj, int):
            return 1
        if isinstance(obj, float):
            return 2
        if isinstance(obj, str):
            return 3

    def pack(self, func, args):
        """打包:把方法和參數拼接成自定義的協議
        格式:func:函數名@params:類型-參數,類型2-參數2...
        """
        result = f"func:{func}"
        if args:
            params = ""
            # params:類型-參數,類型2-參數2...
            for item in args:
                params += f"{self.convert(item)}-{item},"
            # 去除最後一個,
            result += f"@params:{params[:-1]}"
        # print(result)  # log 輸出
        return result.encode("utf-8")

    def unpack(self, data):
        """解包:獲取返回結果"""
        msg = data.decode("utf-8")
        # 格式應該是"data:xxxx"
        params = msg.split(":")
        if len(params) > 1:
            return params[1]
        return None

    def get(self, func, args=None):
        """1.客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
        data = self.pack(func, args)
        # 2.客戶端的RPC Proxy組件在打包完成後經過網絡把數據包發送給RPC Server
        self.socket.send(data)
        # 等待服務端返回結果
        data = self.socket.recv(2048)
        if data:
            return self.unpack(data)
        return None

簡要說明下:(我根據流程在Code裏面標註了,看起來應該很輕鬆)

以前有說到核心其實就是消息協議and傳輸控制,我客戶端存根的消息協議是自定義的格式(後面會說簡化方案):func:函數名@params:類型-參數,類型2-參數2...,傳輸我是基於TCP進行了簡單的封裝


3.Server端:(實現很簡單)

# server.py
import socket
from server_stub import ServerStub

class RPCServer(object):
    def __init__(self, address, mycode):
        self.mycode = mycode
        # 服務端存根(RPC Proxy)
        self.server_stub = ServerStub(mycode)
        # TCP Socket
        self.socket = socket.socket()
        # 端口複用
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 綁定端口
        self.socket.bind(address)

    def run(self):
        self.socket.listen()
        while True:
            # 等待客戶端鏈接
            client_socket, client_addr = self.socket.accept()
            print(f"來自{client_addr}的請求:\n")
            try:
                # 交給服務端存根(Server Proxy)處理
                self.server_stub.handle(client_socket, client_addr)
            except Exception as ex:
                print(ex)

if __name__ == "__main__":
    from server_code import MyCode
    server = RPCServer(('', 50051), MyCode())
    print("Server啓動ing,Port:50051")
    server.run()

爲了簡潔,服務端代碼我單獨放在了server_code.py中:

# 5.RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy
class MyCode(object):
    def sum(self, a, b):
        return a + b

    def get_time(self):
        import time
        return time.ctime()

4.而後再看看重頭戲Server Stub:

# server_stub.py
import socket

class ServerStub(object):
    def __init__(self, mycode):
        self.mycode = mycode

    def convert(self, num, obj):
        """根據類型編號轉換類型"""
        if num == "1":
            obj = int(obj)
        if num == "2":
            obj = float(obj)
        if num == "3":
            obj = str(obj)
        return obj

    def unpack(self, data):
        """3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
        msg = data.decode("utf-8")
        # 格式應該是"格式:func:函數名@params:類型編號-參數,類型編號2-參數2..."
        array = msg.split("@")
        func = array[0].split(":")[1]
        if len(array) > 1:
            args = list()
            for item in array[1].split(":")[1].split(","):
                temps = item.split("-")
                # 類型轉換
                args.append(self.convert(temps[0], temps[1]))
            return (func, tuple(args))  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和參數拼接成自定義的協議"""
        # 格式:"data:返回值"
        return f"data:{result}".encode("utf-8")

    def exec(self, func, args=None):
        """4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
        # 若是沒有這個方法則返回None
        func = getattr(self.mycode, func, None)
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 無參函數

    def handle(self, client_socket, client_addr):
        while True:
            # 獲取客戶端發送的數據包
            data = client_socket.recv(2048)
            if data:
                try:
                    data = self.unpack(data)  # 解包
                    if len(data) == 1:
                        data = self.exec(data[0])  # 執行無參函數
                    elif len(data) > 1:
                        data = self.exec(data[0], data[1])  # 執行帶參函數
                    else:
                        data = "RPC Server Error Code:500"
                except Exception as ex:
                    data = "RPC Server Function Error"
                    print(ex)
                # 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy組件
                data = self.pack(data)  # 把函數執行結果按指定協議打包
                # 把處理過的數據發送給客戶端
                client_socket.send(data)
            else:
                print(f"客戶端:{client_addr}已斷開\n")
                break

再簡要說明一下:裏面方法其實主要就是解包執行函數返回值打包

輸出圖示: 3.div.png

再貼一下上面的時序圖: 4.時序圖.png

課外拓展:

HTTP1.0、HTTP1.1 和 HTTP2.0 的區別
https://www.cnblogs.com/heluan/p/8620312.html

簡述分佈式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994

分佈式基礎—RPC
http://www.dataguru.cn/article-14244-1.html

下節預估:RPC服務進一步簡化與演變手寫一個簡單的REST接口

 

4.RPC簡化與提煉

上篇回顧:萬物互聯之~RPC專欄 http://www.javashuo.com/article/p-tupqpuii-c.html

上節課解答

以前有網友問,不少開源的RPC中都是使用路由表,這個怎麼實現?

其實路由表實現起來也簡單,代碼基本上不變化,就修改一下server_stub.py__init__exe兩個方法就能夠了:

class ServerStub(object):
    def __init__(self, mycode):
        self.func_dict = dict()
        # 初始化一個方法名和方法的字典({func_name:func})
        for item in mycode.__dir__():
            if not item.startswith("_"):
                self.func_dict[item] = getattr(mycode, item)

    def exec(self, func, args=None):
        """4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
        # 若是沒有這個方法則返回None
        # func = getattr(self.mycode, func, None)
        func = self.func_dict[func]
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 無參函數

4.1.Json序列化

Python比較6的同志對上節課的Code確定嗤之以鼻,上次自定義協議是同的通用方法,這節課咱們先來簡化下代碼:

再貼一下上節課的時序圖: 4.時序圖.png

1.Json知識點

官方文檔:https://docs.python.org/3/library/json.html

# 把字典對象轉換爲Json字符串
json_str = json.dumps({"func": func, "args": args})

# 把Json字符串從新變成字典對象
data = json.loads(data)
func, args = data["func"], data["args"]

須要注意的就是類型轉換了(eg:python tuple ==> json array

Python JSON
dict object
list, tuple array
str string
int, float number
True true
False false
None null

PS:序列化:json.dumps(obj),反序列化:json.loads(json_str)

2.消息協議採用Json格式

在原有基礎上只須要修改下Stubpackunpack方法便可

Client_Stub(類型轉換都省掉了)

import json
import socket

class ClientStub(object):
    def pack(self, func, args):
        """打包:把方法和參數拼接成自定義的協議
        格式:{"func": "sum", "args": [1, 2]}
        """
        json_str = json.dumps({"func": func, "args": args})
        # print(json_str)  # log 輸出
        return json_str.encode("utf-8")

    def unpack(self, data):
        """解包:獲取返回結果"""
        data = data.decode("utf-8")
        # 格式應該是"{data:xxxx}"
        data = json.loads(data)
        # 獲取不到就返回None
        return data.get("data", None)

    # 其餘Code我沒有改變

Server Stub()

import json
import socket

class ServerStub(object):
    def unpack(self, data):
        """3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
        data = data.decode("utf-8")
        # 格式應該是"格式:{"func": "sum", "args": [1, 2]}"
        data = json.loads(data)
        func, args = data["func"], data["args"]
        if args:
            return (func, tuple(args))  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和參數拼接成自定義的協議"""
        # 格式:"data:返回值"
        json_str = json.dumps({"data": result})
        return json_str.encode("utf-8")

    # 其餘Code我沒有改變

輸出圖示: 3.div.png

4.2.Buffer序列化

RPC其實更多的是二進制的序列化方式,這邊簡單介紹下

1.pickle知識點

官方文檔:https://docs.python.org/3/library/pickle.html

用法和Json相似,PS:序列化:pickle.dumps(obj),反序列化:pickle.loads(buffer)

2.簡單案例

和Json案例相似,也只是改了packunpack,我這邊就貼一下完整代碼(防止被吐槽)

1.Client

# 和上一節同樣
from client_stub import ClientStub

def main():
    stub = ClientStub(("192.168.36.144", 50051))

    result = stub.get("sum", (1, 2))
    print(f"1+2={result}")

    result = stub.get("sum", (1.1, 2))
    print(f"1.1+2={result}")

    time_str = stub.get("get_time")
    print(time_str)

if __name__ == "__main__":
    main()

2.ClientStub

import socket
import pickle

class ClientStub(object):
    def __init__(self, address):
        """address ==> (ip,port)"""
        self.socket = socket.socket()
        self.socket.connect(address)

    def pack(self, func, args):
        """打包:把方法和參數拼接成自定義的協議"""
        return pickle.dumps((func, args))

    def unpack(self, data):
        """解包:獲取返回結果"""
        return pickle.loads(data)

    def get(self, func, args=None):
        """1.客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
        data = self.pack(func, args)
        # 2.客戶端的RPC Proxy組件在打包完成後經過網絡把數據包發送給RPC Server
        self.socket.send(data)
        # 等待服務端返回結果
        data = self.socket.recv(2048)
        if data:
            return self.unpack(data)
        return None

3.Server

# 和上一節同樣
import socket
from server_stub import ServerStub

class RPCServer(object):
    def __init__(self, address, mycode):
        self.mycode = mycode
        # 服務端存根(RPC Proxy)
        self.server_stub = ServerStub(mycode)
        # TCP Socket
        self.socket = socket.socket()
        # 端口複用
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 綁定端口
        self.socket.bind(address)

    def run(self):
        self.socket.listen()
        while True:
            # 等待客戶端鏈接
            client_socket, client_addr = self.socket.accept()
            print(f"來自{client_addr}的請求:\n")
            try:
                # 交給服務端存根(Server Proxy)處理
                self.server_stub.handle(client_socket, client_addr)
            except Exception as ex:
                print(ex)

if __name__ == "__main__":
    from server_code import MyCode
    server = RPCServer(('', 50051), MyCode())
    print("Server啓動ing,Port:50051")
    server.run()

4.ServerCode

# 和上一節同樣
# 5.RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy
class MyCode(object):
    def sum(self, a, b):
        return a + b

    def get_time(self):
        import time
        return time.ctime()

5.ServerStub

import socket
import pickle

class ServerStub(object):
    def __init__(self, mycode):
        self.mycode = mycode

    def unpack(self, data):
        """3.服務端的RPC Proxy組件把經過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
        func, args = pickle.loads(data)
        if args:
            return (func, args)  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和參數拼接成自定義的協議"""
        return pickle.dumps(result)

    def exec(self, func, args=None):
        """4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
        # 若是沒有這個方法則返回None
        func = getattr(self.mycode, func)
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 無參函數

    def handle(self, client_socket, client_addr):
        while True:
            # 獲取客戶端發送的數據包
            data = client_socket.recv(2048)
            if data:
                try:
                    data = self.unpack(data)  # 解包
                    if len(data) == 1:
                        data = self.exec(data[0])  # 執行無參函數
                    elif len(data) > 1:
                        data = self.exec(data[0], data[1])  # 執行帶參函數
                    else:
                        data = "RPC Server Error Code:500"
                except Exception as ex:
                    data = "RPC Server Function Error"
                    print(ex)
                # 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並經過網絡發送給客戶端的RPC Proxy組件
                data = self.pack(data)  # 把函數執行結果按指定協議打包
                # 把處理過的數據發送給客戶端
                client_socket.send(data)
            else:
                print(f"客戶端:{client_addr}已斷開\n")
                break

輸出圖示: 3.div.png

而後關於RPC高級的內容(會涉及到註冊中心),我們後面說架構的時候繼續,網絡這邊就說到這

5.Restful API

RESTful只是接口協議規範,它是創建在http基礎上的,咱們在網絡增強篇的末尾簡單帶一下,後面講爬蟲應該會再給你們說的

5.1.實現一個簡單的REST接口

在編寫REST接口時,通常都是爲HTTP服務的。爲了實現一個簡單的REST接口,你只需讓代碼知足Python的WSGI標準便可

1.Restful引入

這邊我就不本身實現了(上面手寫服務器的時候其實已經展現了Restful接口是啥樣),用Flask快速過一遍:

看個引入案例:

import flask

app = flask.Flask(__name__)

@app.route("/")
def index():
    return "This is Restful API Test"

if __name__ == "__main__":
    app.run()

圖示輸出: 5.api.png

Server Log:

* Serving Flask app "1.test" (lazy loading)
 * Environment: production
   WARNING: Do not use the development server in a production environment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:8080/ (Press CTRL+C to quit)
127.0.0.1 - - [17/Jan/2019 17:24:02] "GET / HTTP/1.1" 200 -

2.簡單版RESTful Services

舉個查詢服務器節點信息的例子:/api/servers/

import flask
from infos import info_list

app = flask.Flask(__name__)

# Json的404自定義處理(不加自定義處理會返回默認404頁面)
@app.errorhandler(404)
def not_found(error):
    return flask.make_response(
        flask.jsonify({
            "data": "Not Found",
            "status": 404
        }), 404)

# 運行Get和Post請求
@app.route("/api/v1.0/servers/<name>", methods=["GET", "POST"])
def get_info(name):
    infos = list(filter(lambda item: item["name"] == name, info_list))
    if len(infos) == 0:
        flask.abort(404) # 404
    # 基於json.dumps的封裝版
    return flask.jsonify({"infos": infos})  # 返回Json字符串

if __name__ == "__main__":
    app.run(port=8080)

圖示輸出:(不深刻說,後面爬蟲會再提的) 6.test.gif

課後拓展:

RESTful API 設計指南
http://www.ruanyifeng.com/blog/2014/05/restful_api.html

RESTful API 最佳實踐
http://www.ruanyifeng.com/blog/2018/10/restful-api-best-practices.html

異步 API 的設計
http://www.ruanyifeng.com/blog/2018/12/async-api-design.html

使用python的Flask實現一個RESTful API服務器端[翻譯]
https://www.cnblogs.com/vovlie/p/4178077.html
相關文章
相關標籤/搜索