基於 Serverless 與 Websocket 的聊天工具實現

傳統業務實現 Websocket 並不難,然而函數計算基本上都是事件驅動,不支持長連接操做。若是將函數計算與 API 網關結合,是否能夠有 Websocket 的實現方案呢?javascript

API 網關觸發器實現 Websocket

WebSocket 協議是基於 TCP 的一種新的網絡協議。它實現了瀏覽器與服務器全雙工 (full-duplex) 通訊,即容許服務器主動發送信息給客戶端。WebSocket 在服務端有數據推送需求時,能夠主動發送數據至客戶端。而原有 HTTP 協議的服務端對於需推送的數據,僅能經過輪詢或 long poll 的方式來讓客戶端得到。java

因爲雲函數是無狀態且以觸發式運行,即在有事件到來時纔會被觸發。所以,爲了實現 WebSocket,雲函數 SCF 與 API 網關相結合,經過 API 網關承接及保持與客戶端的鏈接。您能夠認爲雲函數與 API 網關一塊兒實現了服務端。當客戶端有消息發出時,會先傳遞給 API 網關,再由 API 網關觸發雲函數執行。當服務端雲函數要向客戶端發送消息時,會先由雲函數將消息 POST 到 API 網關的反向推送連接,再由 API 網關向客戶端完成消息的推送。python

具體的實現架構以下:git

實現架構

對於 WebSocket 的整個生命週期,主要由如下幾個事件組成:github

  • 鏈接創建:客戶端向服務端請求創建鏈接並完成鏈接創建;
  • 數據上行:客戶端經過已經創建的鏈接向服務端發送數據;
  • 數據下行:服務端經過已經創建的鏈接向客戶端發送數據;
  • 客戶端斷開:客戶端要求斷開已經創建的鏈接;
  • 服務端斷開:服務端要求斷開已經創建的鏈接。

對於 WebSocket 整個生命週期的事件,雲函數和 API 網關的處理過程以下:web

  • 鏈接創建:客戶端與 API 網關創建 WebSocket 鏈接,API 網關將鏈接創建事件發送給 SCF;
  • 數據上行:客戶端經過 WebSocket 發送數據,API 網關將數據轉發送給 SCF;
  • 數據下行:SCF 經過向 API 網關指定的推送地址發送請求,API 網關收到後會將數據經過 WebSocket 發送給客戶端;
  • 客戶端斷開:客戶端請求斷開鏈接,API 網關將鏈接斷開事件發送給 SCF;
  • 服務端斷開:SCF 經過向 API 網關指定的推送地址發送斷開請求,API 網關收到後斷開 WebSocket 鏈接。

所以,雲函數與 API 網關之間的交互,須要由 3 類雲函數來承載:express

  • 註冊函數:在客戶端發起和 API 網關之間創建 WebSocket 鏈接時觸發該函數,通知 SCF WebSocket 鏈接的 secConnectionID。一般會在該函數記錄 secConnectionID 到持久存儲中,用於後續數據的反向推送;
  • 清理函數:在客戶端主動發起 WebSocket 鏈接中斷請求時觸發該函數,通知 SCF 準備斷開鏈接的 secConnectionID。一般會在該函數清理持久存儲中記錄的該 secConnectionID;
  • 傳輸函數:在客戶端經過 WebSocket 鏈接發送數據時觸發該函數,告知 SCF 鏈接的 secConnectionID 以及發送的數據。一般會在該函數處理業務數據。例如,是否將數據推送給持久存儲中的其餘 secConnectionID。

Websocket 功能實現

根據騰訊雲官網提供的該功能的總體架構圖:json

總體架構圖

這裏咱們能夠使用對象存儲 COS 做爲持久化的方案,當用戶創建連接存儲 ConnectionId 到 COS 中,當用戶斷開鏈接刪除該連接 ID。api

其中註冊函數:瀏覽器

# -*- coding: utf8 -*-
import os
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))


def main_handler(event, context):
    print("event is %s" % event)

    connectionID = event['websocket']['secConnectionID']

    retmsg = {}
    retmsg['errNo'] = 0
    retmsg['errMsg'] = "ok"
    retmsg['websocket'] = {
        "action": "connecting",
        "secConnectionID": connectionID
    }

    cosClient.put_object(
        Bucket=bucket,
        Body='websocket'.encode("utf-8"),
        Key=str(connectionID),
        EnableMD5=False
    )

    return retmsg

傳輸函數:

# -*- coding: utf8 -*-
import os
import json
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))

sendbackHost = os.environ.get("url")


def Get_ConnectionID_List():
    response = cosClient.list_objects(
        Bucket=bucket,
    )
    return [eve['Key'] for eve in response['Contents']]


def send(connectionID, data):
    retmsg = {}
    retmsg['websocket'] = {}
    retmsg['websocket']['action'] = "data send"
    retmsg['websocket']['secConnectionID'] = connectionID
    retmsg['websocket']['dataType'] = 'text'
    retmsg['websocket']['data'] = data
    requests.post(sendbackHost, json=retmsg)


def main_handler(event, context):
    print("event is %s" % event)

    connectionID_List = Get_ConnectionID_List()
    connectionID = event['websocket']['secConnectionID']
    count = len(connectionID_List)
    data = event['websocket']['data'] + "(===Online people:" + str(count) + "===)"
    for ID in connectionID_List:
        if ID != connectionID:
            send(ID, data)

    return "send success"

清理函數:

# -*- coding: utf8 -*-
import os
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))

sendbackHost = os.environ.get("url")


def main_handler(event, context):
    print("event is %s" % event)

    connectionID = event['websocket']['secConnectionID']

    retmsg = {}
    retmsg['websocket'] = {}
    retmsg['websocket']['action'] = "closing"
    retmsg['websocket']['secConnectionID'] = connectionID
    requests.post(sendbackHost, json=retmsg)

    cosClient.delete_object(
        Bucket=bucket,
        Key=str(connectionID),
    )

    return event

Yaml 文件以下:

Conf:
  component: "serverless-global"
  inputs:
    region: ap-guangzhou
    bucket: chat-cos-1256773370
    secret_id: 
    secret_key: 

myBucket:
  component: '@serverless/tencent-cos'
  inputs:
    bucket: ${Conf.bucket}
    region: ${Conf.region}

restApi:
  component: '@serverless/tencent-apigateway'
  inputs:
    region: ${Conf.region}
    protocols:
      - http
      - https
    serviceName: ChatDemo
    environment: release
    endpoints:
      - path: /
        method: GET
        protocol: WEBSOCKET
        serviceTimeout: 800
        function:
          transportFunctionName: ChatTrans
          registerFunctionName: ChatReg
          cleanupFunctionName: ChatClean


ChatReg:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatReg
    codeUri: ./code
    handler: reg.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

ChatTrans:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatTrans
    codeUri: ./code
    handler: trans.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

ChatClean:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatClean
    codeUri: ./code
    handler: clean.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

注意,這裏須要先部署 API 網關。當部署完成,得到回推地址,將回推地址以 url 的形式寫入到對應函數的環境變量中:

理論上應該是能夠經過 ${restApi.url[0].internalDomain} 自動得到到 url 的,可是我並無成功得到到這個 url,只能先部署 API 網關,得到到這個地址以後,再從新部署。

部署完成以後,咱們能夠編寫 HTML 代碼,實現可視化的 Websocket Client,其核心的 JavaScript 代碼爲:

window.onload = function () {
    var conn;
    var msg = document.getElementById("msg");
    var log = document.getElementById("log");

    function appendLog(item) {
        var doScroll = log.scrollTop === log.scrollHeight - log.clientHeight;
        log.appendChild(item);
        if (doScroll) {
            log.scrollTop = log.scrollHeight - log.clientHeight;
        }
    }

    document.getElementById("form").onsubmit = function () {
        if (!conn) {
            return false;
        }
        if (!msg.value) {
            return false;
        }
        conn.send(msg.value);
        //msg.value = "";
		
		var item = document.createElement("div");
		item.innerText = "發送↑:";
		appendLog(item);
		
		var item = document.createElement("div");
		item.innerText = msg.value;
		appendLog(item);
		
        return false;
    };

    if (window["WebSocket"]) {
        //替換爲websocket鏈接地址
        conn = new WebSocket("ws://service-01era6ni-1256773370.gz.apigw.tencentcs.com/release/");
        conn.onclose = function (evt) {
            var item = document.createElement("div");
            item.innerHTML = "<b>Connection closed.</b>";
            appendLog(item);
        };
        conn.onmessage = function (evt) {
			var item = document.createElement("div");
			item.innerText = "接收↓:";
			appendLog(item);
		
            var messages = evt.data.split('\n');
            for (var i = 0; i < messages.length; i++) {
                var item = document.createElement("div");
                item.innerText = messages[i];
                appendLog(item);
            }
        };
    } else {
        var item = document.createElement("div");
        item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
        appendLog(item);
    }
};

完成以後,咱們打開兩個頁面,進行測試:

總結

經過雲函數 + API 網關進行 Websocket 的實踐,絕對不單單是一個聊天工具這麼簡單,它能夠用在不少方面,例如經過 Websocket 進行實時日誌系統的製做等。

單獨的函數計算,僅僅是一個計算平臺,只有和周邊的 BaaS 結合,才能展現出 Serverless 架構的價值和真正的能力。這也是爲何不少人說 Serverless=FaaS+BaaS 的一個緣由。

期待更多小夥伴,能夠經過 Serverless 架構,創造出更多有趣的應用。

Serverless Framework 30 天試用計劃

咱們誠邀您來體驗最便捷的 Serverless 開發和部署方式。在試用期內,相關聯的產品及服務均提供免費資源和專業的技術支持,幫助您的業務快速、便捷地實現 Serverless!

詳情可查閱:Serverless Framework 試用計劃

One More Thing

3 秒你能作什麼?喝一口水,看一封郵件,仍是 —— 部署一個完整的 Serverless 應用?

複製連接至 PC 瀏覽器訪問:https://serverless.cloud.tencent.com/deploy/express

3 秒極速部署,當即體驗史上最快的 Serverless HTTP 實戰開發!

傳送門:

歡迎訪問:Serverless 中文網,您能夠在 最佳實踐 裏體驗更多關於 Serverless 應用的開發!


推薦閱讀:《Serverless 架構:從原理、設計到項目實戰》

相關文章
相關標籤/搜索