經過定時觸發器,能夠簡單快速地定製一個企業微信機器人。咱們能夠用它來實現喝水、吃飯提醒等小功能,還能實現定時推送新聞、天氣,甚至是監控告警的小功能。python
在企業微信中,選擇添加機器人:git
以後,咱們能夠根據文檔進行企業微信機器人的基礎功能定製:github
如下是用 curl 工具往羣組推送文本消息的示例(注意要將 url 替換成機器人的 webhook 地址,content 必須是 utf8 編碼):web
curl '企業微信機器人地址' \ -H 'Content-Type: application/json' \ -d ' { "msgtype": "text", "text": { "content": "hello world" } }'
經過 Python 語言實現:ajax
url = "" data = { "msgtype": "markdown", "markdown": { "content": "hello world", } } data = json.dumps(data).encode("utf-8") req_attr = urllib.request.Request(url, data) resp_attr = urllib.request.urlopen(req_attr) return_msg = resp_attr.read().decode("utf-8")
此時,咱們能夠經過 Serverless Framework 部署一個機器人的基本功能,而且設置好 API 網關觸發器:express
index.py
文件以下:json
import os import json import urllib.request def main_handler(event, context): url = os.environ.get("url") data = { "msgtype": "markdown", "markdown": { "content": "hello world", } } data = json.dumps(data).encode("utf-8") req_attr = urllib.request.Request(url, data) resp_attr = urllib.request.urlopen(req_attr) return resp_attr.read().decode("utf-8")
serverless.yaml
文件以下:api
MyRobot_Base: component: '@serverless/tencent-scf' inputs: name: MyRobot_Base runtime: Python3.6 timeout: 3 codeUri: ./base_robot description: 機器人推送接口 region: ap-guangzhou environment: variables: url: webhook地址 handler: index.main_handler memorySize: 64 tags: app: myrobot events: - apigw: name: MyRobot parameters: protocols: - http - https description: 機器人推送接口 environment: release endpoints: - path: /push method: ANY
部署成功以後,能夠看到命令行中輸出的地址:瀏覽器
在瀏覽器中打開,能夠看到企業微信機器人已經被觸發了:微信
以上就是一個簡單的 hello world
功能。接下來,好戲開始!
咱們對這個基礎函數進行進一步的改造:
import os import json import urllib.request def main_handler(event, context): url = os.environ.get("url") data = { "msgtype": "markdown", "markdown": { "content": event['body'], } } data = json.dumps(data).encode("utf-8") req_attr = urllib.request.Request(url, data) resp_attr = urllib.request.urlopen(req_attr) return resp_attr.read().decode("utf-8")
經過將 data
中的 content
字段更改成 event['body']
可讓其餘模塊請求該接口,實現機器人推送功能,固然這個基礎函數咱們還能夠進行完善,不單單是 markdown
格式,封裝更多支持的格式:
經過定時觸發器,訪問雲函數,能夠實現該功能。
例如 index.py
代碼:
import os import json import urllib.request def main_handler(event, context): url = os.environ.get("url") data = "天天都要多喝水哦,不要忘記補充水分".encode("utf-8") req_attr = urllib.request.Request(url, data) resp_attr = urllib.request.urlopen(req_attr) return resp_attr.read().decode("utf-8")
serverless.yaml
文件:
MyRobot_Water: component: '@serverless/tencent-scf' inputs: name: MyRobot_Water runtime: Python3.6 timeout: 3 codeUri: ./water description: 提醒喝水的機器人 region: ap-guangzhou environment: variables: url: https://service-lf3ug84s-1256773370.gz.apigw.tencentcs.com/release/push handler: index.main_handler memorySize: 64 tags: app: myrobot events: - timer: name: timer parameters: cronExpression: '0 */30 9-17 * * * *' enable: true
這個函數就是天天上午 9 點到下午 5 點,每 30 分鐘提醒喝一次水。
想要實現天氣預報/新聞播報的功能,咱們能夠經過已有的新聞接口來實現,以騰訊雲的雲市場爲例,尋找一個新聞類 API 接口:
根據 API 文檔,能夠看到請求地址是:https://service-aqvnjmiq-1257101137.gz.apigw.tencentcs.com/release/news/search
Get 方法能夠攜帶一個參數:keyword
,做爲目標的關鍵詞,編寫代碼:
import ssl, hmac, base64, hashlib, os, json from datetime import datetime as pydatetime from urllib.parse import urlencode from urllib.request import Request, urlopen def main_handler(event, context): source = "market" datetime = pydatetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') signStr = "x-date: %s\nx-source: %s" % (datetime, source) sign = base64.b64encode(hmac.new(os.environ.get('secretKey').encode('utf-8'), signStr.encode('utf-8'), hashlib.sha1).digest()) auth = 'hmac id="%s", algorithm="hmac-sha1", headers="x-date x-source", signature="%s"' % (os.environ.get("secretId"), sign.decode('utf-8')) headers = { 'X-Source': source, 'X-Date': datetime, 'Authorization': auth, } queryParams = {'keyword': '科技新聞'} url = 'https://service-aqvnjmiq-1257101137.gz.apigw.tencentcs.com/release/news/search' if len(queryParams.keys()) > 0: url = url + '?' + urlencode(queryParams) content = "" for eve in json.loads(urlopen(Request(url, headers=headers)).read().decode("utf-8"))["result"]["list"][0:5]: content = content + "* [%s](%s) \n"%(eve['title'], eve['url']) if content: urlopen(Request(os.environ.get('url'), content.encode("utf-8")))
serverless.yaml
文件:
MyRobot_News: component: '@serverless/tencent-scf' inputs: name: MyRobot_News runtime: Python3.6 timeout: 3 codeUri: ./news description: 新聞推送 region: ap-guangzhou environment: variables: url: https://service-lf3ug84s-1256773370.gz.apigw.tencentcs.com/release/push secretId: 雲市場密鑰信息 secretKey: 雲市場密鑰信息 handler: index.main_handler memorySize: 64 tags: app: myrobot events: - timer: name: timer parameters: cronExpression: '0 0 */8 * * * *' enable: true
運行效果以下,天天早晨 8 點爲咱們推送當日科技新聞:
咱們還能夠賦予企業微信機器人監控告警的能力:
index.py
文件:
import os import urllib.request def getStatusCode(url): return urllib.request.urlopen(url).getcode() def main_handler(event, context): url = "http://www.anycodes.cn" if getStatusCode(url) == 200: print("您的網站%s能夠訪問!" % (url)) else: urllib.request.urlopen(urllib.request.Request(os.environ.get('url'), ("您的網站%s 不能夠訪問!" % (url)).encode("utf-8"))) return None
serverless.yaml
文件:
MyRobot_Monitor: component: '@serverless/tencent-scf' inputs: name: MyRobot_Monitor runtime: Python3.6 timeout: 3 codeUri: ./monitor description: 網站監控 region: ap-guangzhou environment: variables: url: https://service-lf3ug84s-1256773370.gz.apigw.tencentcs.com/release/push handler: index.main_handler memorySize: 64 tags: app: myrobot events: - timer: name: timer parameters: cronExpression: '0 */30 * * * * *' enable: true
部署完成後,網站的監控腳本就已經啓動,每 30 分鐘檢查一次網站是否可用。若是不可用,則會發送告警:
企業微信機器人能夠經過 Serverless 架構被賦予更多更有趣的功能,那麼還有哪些產品能夠和 Serverless 架構相結合,變得更有趣呢?
隨着網絡技術的不斷髮展,IoT 技術也逐漸走進了千家萬戶,不管是掃地機器人、智能窗簾等智能家居,仍是智能音箱等娛樂設施,IoT 技術都變得可見可及。
小愛同窗,也能經過 Serverless 架構,快速開發出專屬新功能。
首先咱們去「小愛同窗」的開放平臺註冊帳號,而且提交認證:
接下來對小愛同窗的定製化功能進行研究。如圖所示,在開發文檔中,咱們能夠看到小愛同窗開發者平臺爲咱們提供的能力信息,一樣咱們也能夠查看到 request 以及 response 的詳細信息:
繼續進行項目設計。本文的目標是經過對小愛同窗說出「進入雲+社區」等關鍵詞,爲用戶返回騰訊雲+社區的最新熱門文章的題目和簡介。
整個流程如圖所示:
函數代碼編寫:
# -*- coding: utf8 -*- import json import logging import urllib.request import urllib.parse logging.basicConfig(level=logging.NOTSET) def main_handler(event, context): host = "https://cloud.tencent.com/" path = "developer/services/ajax/column/article?action=FetchColumnHomeArticleList" json_data = { "action": "FetchColumnHomeArticleList", "payload": { "pageNumber": 1, "pageSize": 20, "version": 1 } } data = json.dumps(json_data).encode("utf-8") request_attr = urllib.request.Request(url=host + path, data=data) response_attr = urllib.request.urlopen(request_attr).read().decode("utf-8") json_resp = json.loads(response_attr) logging.debug(json_resp) temp_str = "文章題目爲%s,主要內容是%s" list_data = json_resp["data"]["list"][0:5] art_list = [temp_str % (eve["title"], eve["abstract"]) for eve in list_data] news_str = '''今日騰訊雲加社區熱門文章以下:%s''' % ("、".join(art_list)) logging.debug(news_str) xiaoai_response = {"version": "1.0", "response": { "open_mic": False, "to_speak": { "type": 0, "text": news_str } }, "is_session_end": False } return xiaoai_response
完成以後,使用 Serverless Framework 進行部署,綁定 API 網關觸發器,經過請求地址能夠看到測試結果:
能夠看到,咱們已經得到到目標數據。此時,咱們在小愛同窗官網,建立技能開發,在填寫好和保存好基本信息以後,選擇配置服務,填寫 HTTPS 中的測試化地址:
配置完成以後,開始測試,以下圖所示,能夠看到,當咱們輸入預約的命令「打開雲加社區」,系統會正確回取到結果信息,而且給咱們返回:
至此,咱們經過 Serverless 架構,成功地爲「小愛同窗」開發了一項新功能,咱們還能夠將這個新功能就拿去發佈和上線!
本文僅僅是一次簡單的示範,經過企業微信機器人與 Serverless 架構的結合,用若干代碼實現提醒功能、新聞推送功能以及業務監控告警功能。同時咱們還發散思惟,讓小愛同窗也擁有了新的能力。
不難看出,經過 Serverless 架構,咱們能夠快速爲產品增長一些新的功能,賦予新的生機!
咱們誠邀您來體驗最便捷的 Serverless 開發和部署方式。在試用期內,相關聯的產品及服務均提供免費資源和專業的技術支持,幫助您的業務快速、便捷地實現 Serverless!
3 秒你能作什麼?喝一口水,看一封郵件,仍是 —— 部署一個完整的 Serverless 應用?
複製連接至 PC 瀏覽器訪問:https://serverless.cloud.tencent.com/deploy/express
3 秒極速部署,當即體驗史上最快的 Serverless HTTP 實戰開發!
傳送門:
- GitHub: github.com/serverless
- 官網:serverless.com
歡迎訪問:Serverless 中文網,您能夠在 最佳實踐 裏體驗更多關於 Serverless 應用的開發!