前言
這個例子描述 YoMo 在工業互聯網數據採集中的應用,以收集噪聲傳感器的數據爲例,涉及數據收集/處理/工做流/數據展現的全過程,爲了便於體驗運行效果,還會對其進行容器化,並經過docker快速部署體驗版。git
述語
- xxx-source: 表示一個數據源收集程序
- xxx-zipper: 表示一個工做流和控制平面
- xxx-flow: 表示一個工做流單元,用於實際的業務邏輯處理,被zipper調度。
- xxx-sink: 表示一個數據的傳送目的地,通常落地數據庫或者傳遞給下一級代理,被zipper調度。
架構
從圖中可見,區分了邊緣端和雲端兩個獨立區域,區域之間是經過弱網或者互聯網鏈接,這裏簡單介紹一下各個服務:github
- 邊緣端部署了傳感器設備(Noise)和數據收集網關(ZLAN),網關會定時向設備請求狀態數據,並轉換爲MQTT協議數據發送給noise-source收集器,source起到轉換編碼並與YoMo工做流引擎zipper創建鏈接的做用。關於傳感器設備和數據收集網關的硬件選購和配置能夠參與這篇文章:https://yomo.run/zh/aiot。
- 對於不想購買硬件設備的開發者,這裏也提供了一個noise-emitter模擬器用來產生噪聲數據。
- zipper是一個強大的工做流引擎,經過編排(workflow.yaml)能夠調度多個flow和sink,讓他們以流的方式把業務邏輯串聯起來,以知足複雜的需求。與之相連的全部通訊和編解碼均以QUIC+Y3進行,提供可靠實時的流式處理,全程體驗流式編程的樂趣。
- noise-flow 實現把噪音值除以10的簡單處理,而且監控若是超過必定閥值後輸出日誌進行警報。
- noise-sink 沒有真的輸出到數據庫,而是經過搭建一個WebSocket服務器,把實時的噪音狀態輸出給任意的網頁進行展現消費。
- noise-web 是一個消費WebSocket的網頁服務,他部署在哪裏均可以,只要能訪問到noise-sink提供的WebSocket服務地址便可,這裏咱們假設部署回邊緣端也是沒有問題的。
代碼
下表提供了案例的所有代碼,供感興趣的朋友查看,參照這個案體的代碼,能夠輕鬆開發出類擬場景的案例。web
項目 | 地址 | 說明 |
---|---|---|
noise-source | yomo-source-noise-example | 收集MQTT消息格式的噪音數據 |
noise-zipper | yomo-zipper-noise-example | 編排本案體的工做流和數據流向 |
noise-flow | yomo-flow-noise-example | 對噪音數據進行預處理和警報 |
noise-sink | yomo-sink-socketio-server-example | 提供WebSocket服務用於數據展現 |
noise-web | yomo-sink-socket-io-example | 消費WebSocket服務展現噪音狀態 |
noise-emitter | yomo-source-noise-emitter-example | 模擬產生噪聲數據 |
quic-mqtt | yomo-source-mqtt-starter | 開發xxx-source的通用組件 |
容器化部署
經過下載上節的項目代碼能夠進行本地原生部署,體驗YoMo開發的樂趣,可是對於想急於立刻看到效果的朋友來講,更爽的方式固然是先快速運行起來看看效果,因此對上節的項目也作了容器化處理,每一個項目的根目錄均提供了Dockerfile文件,而且在hub.docker.com提供了官方鏡像下載:docker
項目 | 鏡像地址 | 最新版本 |
---|---|---|
noise-source | yomorun/noise-source | yomorun/noise-source:latest |
noise-zipper | yomorun/noise-zipper | yomorun/noise-zipper:latest |
noise-flow | yomorun/noise-flow | yomorun/noise-flow:latest |
noise-sink | yomorun/noise-sink | yomorun/noise-sink:latest |
noise-web | yomorun/noise-web | yomorun/noise-web:latest |
noise-emitter | yomorun/noise-emitter | yomorun/noise-emitter:latest |
quic-mqtt | yomorun/quic-mqtt | yomorun/quic-mqtt:latest |
yomorun/quic-mqtt:latest 是開發xxx-source的基礎鏡像,能夠快速打包自定義代碼,但本案例中能夠暫時忽略。數據庫
快速部署
爲了快速運行體驗運行效果,本小節描述如何總體容器化在同一宿主機上運行。編程
快速運行
有了上述的官方鏡像就簡單多了,只需簡單的步驟就能夠體驗效果了:服務器
- 下載 docker-compose.yml 文件。
- 運行
docker-compose up -d
- 先喝杯茶稍做等待,經過訪問 http://localhost:3000/ 可看到以下效果圖:
注意事項:架構
- 這裏Delay的值可能不很準確,由於是經過Docker容器部署,各個容器的時鐘並不對齊得那麼完美,若是要查看到最精確的延時值,須要把noise-source和noise-web原生部署到同一個宿主機上。
- 若是部署不是在本地,則須要修改docker-compose.yml文件中的環境變量SOCKET_SERVER_ADDR爲你部署服務的宿主機地址。
查看狀態
經過 docker-compose ps
查看服務狀態app
Name Command State Ports ----------------------------------------------------------------------------------------- noise-emitter sh -c go run main.go Up noise-flow sh -c yomo run app.go -p 4242 Up 4242/udp noise-sink sh -c go run main.go Up 4141/udp, 0.0.0.0:8000->8000/tcp noise-source sh -c go run main.go Up 1883/tcp noise-web ./docker-entrypoint.sh yar ... Up 0.0.0.0:3000->3000/tcp noise-zipper sh -c yomo wf run workflow ... Up 9999/udp
- noise-sink 暴露了8000的WebSocket端口提給noise-web展現消費。
- noise-web 暴露了3000的http端口用於展現實時的噪聲值和延時。
- noise-zipper/noise-flow/noise-sink 均提供了udp端口的QUIC服務,全流程QUIC通訊。
- noise-source 是咱們對接不一樣設備的關鍵,提供1883的MQTT端口,固然也能夠修改的。
查看日誌
-
查看noise-emitter
docker-compose logs -f noise-emitter
lessnoise-emitter | 2021-04-26 10:11:03: Publish counter=12438, topic=NOISE, payload={"noise":12438} noise-emitter | 2021-04-26 10:11:04: Publish counter=12439, topic=NOISE, payload={"noise":12439} noise-emitter | 2021-04-26 10:11:05: Publish counter=12440, topic=NOISE, payload={"noise":12440} noise-emitter | 2021-04-26 10:11:06: Publish counter=12441, topic=NOISE, payload={"noise":12441}
這個模擬發生器產生了MQTT數據:主題是
NOISE
, 值是不斷遞增的序號(JSON格式)。 -
查看noise-source
docker-compose logs -f noise-source
noise-source | 2021/04/26 15:27:32 receive: topic=NOISE, payload={"noise":2638} noise-source | 2021/04/26 15:27:32 write: sendingBuf=[]byte{0x81, 0x1b, 0x90, 0x19, 0x11, 0x3, 0x45, 0x24, 0xe0, 0x12, 0x6, 0xaf, 0x90, 0xe8, 0xce, 0x83, 0x1c, 0x13, 0xa, 0x31, 0x37, 0x32, 0x2e, 0x31, 0x39, 0x2e, 0x30, 0x2e, 0x36} noise-source | 2021/04/26 15:27:33 receive: topic=NOISE, payload={"noise":2639} noise-source | 2021/04/26 15:27:33 write: sendingBuf=[]byte{0x81, 0x1b, 0x90, 0x19, 0x11, 0x3, 0x45, 0x24, 0xf0, 0x12, 0x6, 0xaf, 0x90, 0xe8, 0xce, 0x8b, 0x5, 0x13, 0xa, 0x31, 0x37, 0x32, 0x2e, 0x31, 0x39, 0x2e, 0x30, 0x2e, 0x36}
- receive: 表示source收到的MQTT數據。
- write: 表示把通過Y3轉碼的字節碼向noise-zipper工做流引擎發送。
-
查看noise-zipper
docker-compose logs -f noise-zipper
noise-zipper | 2021/04/26 14:39:28 Found 1 flows in zipper config noise-zipper | 2021/04/26 14:39:28 Flow 1: Noise Serverless on noise-flow:4242 noise-zipper | 2021/04/26 14:39:28 Found 1 sinks in zipper config noise-zipper | 2021/04/26 14:39:28 Sink 1: Socket.io Server on noise-sink:4141 noise-zipper | 2021/04/26 14:39:28 Running YoMo workflow... noise-zipper | 2021/04/26 14:39:28 ✅ Listening on 0.0.0.0:9999 noise-zipper | 2021/04/26 14:43:32 ✅ Connect to Noise Serverless (noise-flow:4242) successfully. noise-zipper | 2021/04/26 14:44:33 ✅ Connect to Socket.io Server (noise-sink:4141) successfully.
工做流引擎鏈接上了noise-flow和noise-sink這兩個工做流的處理單元了。
-
查看noise-flow
docker-compose logs -f noise-flow
noise-flow | ❗ value: 561.700012 reaches the threshold 16! 45.700012 noise-flow | [172.19.0.6] 1619425035923 > value: 561.799988 ⚡️=0ms noise-flow | ❗ value: 561.799988 reaches the threshold 16! 45.799988 noise-flow | [172.19.0.6] 1619425036923 > value: 561.900024 ⚡️=1ms
噪聲數據是模擬器產生的,遠遠超過了預設的閥值,打印出警告信息。
分區部署
通過上一小節的快速部署
應該瞭解如何在同一臺宿主機上進行容器化部署並查看各個服務的狀態,但與咱們的開始提出的雲端與邊緣分離的實際架構圖不符,現實場景下flow/sink這類Serverless的服務會部署在雲端,而數據接收器source會部署在邊緣,那本節就來分離他們看看若是編排部署。
雲端部署
部署服務
目標是把 noise-zipper / noise-flow / noise-sink 部署在雲端,能夠查看下面的配置文件和運行步驟:
- 下載 docker-compose-cloud.yml 文件。
- 運行
docker-compose -f docker-compose-cloud.yml up -d
查看狀態
經過 docker-compose -f docker-compose-cloud.yml ps
查看服務狀態
Name Command State Ports ---------------------------------------------------------------------------------------- noise-flow sh -c yomo run app.go -p 4242 Up 4242/udp noise-sink sh -c go run main.go Up 4141/udp, 0.0.0.0:8000->8000/tcp noise-zipper sh -c yomo wf run workflow ... Up 0.0.0.0:9999->9999/udp
- noise-zipper 暴露了雲端工做流引擎的服務端口,由於通訊是QUIC協議,因此可見是一個udp端口。
- noise-sink 暴露了經過WebSocket數據消費的端口,讓其它Web服務展現消費。
邊端部署
部署服務
目標是把 noise-source / noise-web / noise-emitter 部署在邊緣端,能夠查看下面的配置文件和運行步驟:
- 下載 docker-compose-edge.yml 文件。
- 運行
docker-compose -f docker-compose-edge.yml up -d
注意事項:
- Docker-compose-edge.yml中的SOCKET_SERVER_ADDR變量須要設置爲cloud端的source-sink暴露的地址和端口(默認8000)。
查看狀態
經過 docker-compose -f docker-compose-edge.yml ps
查看服務狀態
Name Command State Ports ------------------------------------------------------------------------------- noise-emitter sh -c go run main.go Up noise-source sh -c go run main.go Up 1883/tcp noise-web docker-entrypoint.sh sh -c ... Up 0.0.0.0:3000->3000/tcp
- noise-web 暴露了效果展現網站的端口。通(過訪問 http://localhost:3000/ 可)以查看到以前相同的展現界面。
路由器部署
到了這裏就結束了?尚未!實際上邊緣部署的狀況要比雲端複雜,由於邊緣設備不少是老舊,不必定支持Docker容器,這時你能夠選擇經過源代碼編譯成不一樣平臺的二進制運行文件,直接就把noise-source跑在對應的平臺上。固然,更方便的選擇是購買一臺支持Docker容器的路由器,直接就能夠經過容器部署在邊緣端了,目前YoMo與iKuai達成深度合做,攜手推動工業互聯網在各個領域的應用與發展,能夠經過下面連接查看詳細的信息:
引用參考
上面就是這個噪聲傳感器案例從數據收集處理到展現的全部代碼和部署過程了。對於須要擴展或者引伸到別的應用場景的開發者,能夠點開每一個項目的連接進行詳細閱讀,每一個項目都是微服務化構建,服務角色明確,代碼清晰易懂,若是有什麼問題歡迎提出Issues或者討論。參考連接:
最後,補充一張帶有各個服務可暴露端口號的架構圖,以便碰到困難的朋友查閱。