物聯網設備終端種類繁雜,各廠商使用的編碼格式各異,因此在接入物聯網平臺的時候就產生了統一數據格式的需求,以便平臺之上的應用進行設備管理。node
EMQ X 企業版 3.4.0 提供了 Schema Registry 功能,提供編解碼能力。Schema Registry 管理編解碼使用的 Schema、處理編碼或解碼請求並返回結果。Schema Registry 配合規則引擎,可適配各類場景的設備接入和規則設計。python
數據格式
下圖展現了 Schema Registry 的一個應用案例。多個設備上報不一樣格式的數據,通過 Schema Registry 解碼以後,變爲統一的內部格式,而後轉發給後臺應用。mysql
[圖1: 使用 Schema Registry 對設備數據進行編解碼]git
二進制格式支持
EMQ X 3.4.0 內置的 Schema Registry 數據格式包括 Avro 和 Protobuf。Avro 和 Protobuf 是依賴 Schema 的數據格式,編碼後的數據爲二進制,使用 Schema Registry 解碼後的內部數據格式(Map,稍後講解) 可直接被規則引擎和其餘插件使用。此外 Schema Registry 支持用戶自定義的 (3rd-party) 編解碼服務,經過 HTTP 或 TCP 回調的方式,進行更加貼近業務需求的編解碼。github
架構設計
Schema Registry 爲 Avro 和 Protobuf 等內置編碼格式維護 Schema 文本,但對於自定義編解碼 (3rd-party) 格式,如須要 Schema,Schema 文本需由編解碼服務本身維護。Schema Registry 爲每一個 Schema 建立一個 Schema ID,Schema API 提供了經過 Schema ID 的添加、查詢和刪除操做。web
Schema Registry 既能夠解碼,也能夠編碼。編碼和解碼時須要指定 Schema ID。sql
[圖2: Schema Registry 架構示意圖]shell
編碼調用示例:參數爲 Schemaapache
schema_encode(SchemaID, Data) -> RawData
解碼調用示例:json
schema_decode(SchemaID, RawData) -> Data
常見的使用案例是,使用規則引擎來調用 Schema Registry 提供的編碼和解碼接口,而後將編碼或解碼後的數據做爲後續動做的輸入。
編解碼 + 規則引擎
EMQ X 的消息處理層面可分爲消息路由(Messaging)、規則引擎(Rule Engine)、數據格式轉換(Data Conversion) 三個部分。
EMQ X 的 PUB/SUB 系統將消息路由到指定的主題。規則引擎能夠靈活地配置數據的業務規則,按規則匹配消息,而後指定相應動做。數據格式轉換髮生在規則匹配的過程以前,先將數據轉換爲可參與規則匹配的 Map 格式,而後進行匹配。
[圖3: Messaging, Rule Engine and Schema Registry]
規則引擎內部數據格式(Map)
規則引擎內部使用的數據格式爲 Erlang Map,因此若是原數據內容爲二進制或者其餘格式,必須使用編解碼函數(好比上面提到的 schema_decode 和 json_decode 函數) 將其轉換爲 Map。
Map 是一個 Key-Value 形式的數據結構,形如 #{key => value}。例如,user = #{id => 1, name => "Steve"}
定義了一個 id
爲 1
,name
爲 "Steve"
的 user
Map。
SQL 語句提供了 "." 操做符嵌套地提取和添加 Map 字段。下面是使用 SQL 語句對這個 Map 操做的示例:
SELECT user.id AS my_id
SQL 語句的篩選結果爲 #{my_id => 1}
。
JSON 編解碼
規則引擎的 SQL 語句提供了對 JSON 格式字符串的編解碼支持,將 JSON 字符串和 Map 格式相互轉換的 SQL 函數爲 json_decode() 和 json_encode():
SELECT json_decode(payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~= "t/#"
上面這個 SQL 語句將會匹配到 payload 內容爲 JSON 字符串: {"x" = 1, "y" = 1}
, 而且 topic 爲 t/a
的 MQTT 消息。
json_decode(payload) as p
將 JSON 字符串解碼爲下面的 Map 數據結構,從而能夠在 WHERE
子句中使用 p.x 和 p.y 使用 Map 中的字段:
#{ p => #{ x => 1, y => 1 } }
注意: AS
子句是必須的,將解碼以後的數據賦值給某個Key,後面才能對其進行後續操做。
編解碼實戰
Protobuf 數據解析舉例
規則需求
設備發佈一個使用 Protobuf 編碼的二進制消息,須要經過規則引擎匹配事後,將消息從新發布到與 "name" 字段相關的主題上。主題的格式爲 "person/${name}"。
好比,將 "name" 字段爲 "Shawn" 的消息從新發布到主題 "person/Shawn"。
建立 Schema
在 EMQ X 的 Dashboard 界面,使用下面的參數建立一個 Protobuf Schema:
-
名稱:protobuf_person
-
編解碼類型:protobuf
-
Schema:下面的 protobuf schema 定義了一個 Person 消息。
message Person { required string name = 1; required int32 id = 2; optional string email = 3; }
Schema 建立完成後,emqx 會分配一個 Schema ID 和 Version。若是是第一次建立 "protobuf_person",Schema ID 爲 "protobuf_person:1.0"。
建立規則
使用剛纔建立好的 Schema ID 來編寫規則 SQL 語句:
SELECT schema_decode('protobuf_person:1.0', payload, 'Person') as person, payload FROM "message.publish" WHERE topic =~ 't/#' and person.name = 'Shawn'
這裏的關鍵點在於 schema_decode('protobuf_person:1.0', payload, 'Person')
:
schema_decode
函數將 payload 字段的內容按照 'protobuf_person:1.0' 這個 Schema 來作解碼;as person
將解碼後的值保存到變量 "person" 裏;- 最後一個參數
Person
指明瞭 payload 中的消息的類型是 protobuf schema 裏定義的 'Person' 類型。
而後使用如下參數添加動做:
- 動做類型:消息從新發布
- 目的主題:person/${person.name}
- 消息內容模板:${person}
這個動做將解碼以後的 "person" 以 JSON 的格式發送到 person/${person.name}
這個主題。其中${person.name}
是個變量佔位符,將在運行時被替換爲消息內容中 "name" 字段的值。
設備端代碼
規則建立好以後,就能夠模擬數據進行測試了。
下面的代碼使用 Python 語言填充了一個 Person 消息並編碼爲二進制數據,而後將其發送到 "t/1" 主題。詳見 完整代碼。
def publish_msg(client): p = person_pb2.Person() p.id = 1 p.name = "Shawn" p.email = "liuxy@emqx.io" message = p.SerializeToString() topic = "t/1" print("publish to topic: t/1, payload:", message) client.publish(topic, payload=message, qos=0, retain=False)
檢查規則執行結果
-
在 Dashboard 的 Websocket 工具裏,登陸一個 MQTT Client 並訂閱 "person/#"。
-
安裝 python 依賴,並執行設備端代碼:
$ pip3 install protobuf $ pip3 install paho-mqtt $ python3 ./pb2_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'\n\x05Shawn\x10\x01\x1a\rliuxy@emqx.io' t/1 b'\n\x05Shawn\x10\x01\x1a\rliuxy@emqx.io'
- 檢查 Websocket 端收到主題爲
person/Shawn
的消息:
{"email":"liuxy@emqx.io","id":1,"name":"Shawn"}
Avro 數據解析舉例
規則需求
設備發佈一個使用 Avro 編碼的二進制消息,須要經過規則引擎匹配事後,將消息從新發布到與 "name" 字段相關的主題上。主題的格式爲 "avro_user/${name}"。
好比,將 "name" 字段爲 "Shawn" 的消息從新發布到主題 "avro_user/Shawn"。
建立 Schema
在 EMQ X 的 Dashboard 界面,使用下面的參數建立一個 Avro Schema:
-
名稱:avro_user
-
編解碼類型:avro
-
Schema:
{ "type":"record", "fields":[ {"name":"name", "type":"string"}, {"name":"favorite_number", "type":["int", "null"]}, {"name":"favorite_color", "type":["string", "null"]} ] }
Schema 建立完成後,emqx 會分配一個 Schema ID 和 Version。若是是第一次建立 "avro_user",Schema ID 爲 "avro_user:1.0"。
建立規則
使用剛纔建立好的 Schema ID 來編寫規則 SQL 語句:
SELECT schema_decode('avro_user:1.0', payload) as avro_user, payload FROM "message.publish" WHERE topic =~ 't/#' and avro_user.name = 'Shawn'
這裏的關鍵點在於 schema_decode('avro_user:1.0', payload)
:
schema_decode
函數將 payload 字段的內容按照 'avro_user:1.0' 這個 Schema 來作解碼;as avro_user
將解碼後的值保存到變量 "avro_user" 裏。
而後使用如下參數添加動做:
- 動做類型:消息從新發布
- 目的主題:avro_user/${avro_user.name}
- 消息內容模板:${avro_user}
這個動做將解碼以後的 "user" 以 JSON 的格式發送到 avro_user/${avro_user.name}
這個主題。其中${avro_user.name}
是個變量佔位符,將在運行時被替換爲消息內容中 "name" 字段的值。
設備端代碼
規則建立好以後,就能夠模擬數據進行測試了。
下面的代碼使用 Python 語言填充了一個 User 消息並編碼爲二進制數據,而後將其發送到 "t/1" 主題。詳見 完整代碼。
def publish_msg(client): datum_w = avro.io.DatumWriter(SCHEMA) buf = io.BytesIO() encoder = avro.io.BinaryEncoder(buf) datum_w.write({"name": "Shawn", "favorite_number": 666, "favorite_color": "red"}, encoder) message = buf.getvalue() topic = "t/1" print("publish to topic: t/1, payload:", message) client.publish(topic, payload=message, qos=0, retain=False)
檢查規則執行結果
-
在 Dashboard 的 Websocket 工具裏,登陸一個 MQTT Client 並訂閱 "avro_user/#"。
-
安裝 python 依賴,並執行設備端代碼:
$ pip3 install protobuf $ pip3 install paho-mqtt $ python3 avro_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'\nShawn\x00\xb4\n\x00\x06red'
- 檢查 Websocket 端收到主題爲
avro_user/Shawn
的消息:
{"favorite_color":"red","favorite_number":666,"name":"Shawn"}
自定義編解碼舉例
規則需求
設備發佈一個任意的消息,驗證自部署的編解碼服務能正常工做。
建立 Schema
在 EMQ X 的 Dashboard 界面,使用下面的參數建立一個 3rd-Party Schema:
- 名稱:my_parser
- 編解碼類型:3rd-party
- 第三方類型: HTTP
- URL: http://127.0.0.1:9003/parser
- 編解碼配置: xor
其餘配置保持默認。emqx 會分配一個 Schema ID "my_parser"。自定義編解碼沒有 Version 管理。
上面第 5 項編解碼配置是個可選項,是個字符串,內容跟編解碼服務的業務相關。
建立規則
使用剛纔建立好的 Schema ID 來編寫規則 SQL 語句:
SELECT schema_encode('my_parser', payload) as encoded_data, schema_decode('my_parser', encoded_data) as decoded_data FROM "message.publish" WHERE topic =~ 't/#'
這個 SQL 語句首先對數據作了 Encode,而後又作了 Decode,目的在於驗證編解碼過程是否正確:
schema_encode
函數將 payload 字段的內容按照 'my_parser' 這個 Schema 來作編碼,結果存儲到encoded_data
這個變量裏;schema_decode
函數將 payload 字段的內容按照 'my_parser' 這個 Schema 來作解碼,結果存儲到decoded_data
這個變量裏;
最終這個 SQL 語句的篩選結果是 encoded_data
和 decoded_data
這兩個變量。
而後使用如下參數添加動做:
- 動做類型:檢查(調試)
這個檢查動做會把 SQL 語句篩選的結果打印到 emqx 控制檯 (erlang shell) 裏。
若是是使用 emqx console 啓動的服務,打印會直接顯示在控制檯裏;若是是使用 emqx start 啓動的服務,打印會輸出到日誌目錄下的 erlang.log.N 文件裏,這裏 "N" 爲整數,好比 "erlang.log.1", "erlang.log.2"。
編解碼服務端代碼
規則建立好以後,就能夠模擬數據進行測試了。因此首先須要編寫一個本身的編解碼服務。
下面的代碼使用 Python 語言實現了一個 HTTP 編解碼服務,爲簡單起見,這個服務提供兩種簡單的方式來進行編解碼(加解密),詳見 完整代碼:
- 按位異或
- 字符替換
def xor(data): """ >>> xor(xor(b'abc')) b'abc' >>> xor(xor(b'!}~*')) b'!}~*' """ length = len(data) bdata = bytearray(data) bsecret = bytearray(secret * length) result = bytearray(length) for i in range(length): result[i] = bdata[i] ^ bsecret[i] return bytes(result) def subst(dtype, data, n): """ >>> subst('decode', b'abc', 3) b'def' >>> subst('decode', b'ab~', 1) b'bc!' >>> subst('encode', b'def', 3) b'abc' >>> subst('encode', b'bc!', 1) b'ab~' """ adata = array.array('B', data) for i in range(len(adata)): if dtype == 'decode': adata[i] = shift(adata[i], n) elif dtype == 'encode': adata[i] = shift(adata[i], -n) return bytes(adata)
將這個服務運行起來:
$ pip3 install flask $ python3 http_parser_server.py * Serving Flask app "http_parser_server" (lazy loading) * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: off * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit)
檢查規則執行結果
因爲本示例比較簡單,咱們直接使用 MQTT Websocket 客戶端來模擬設備端發一條消息。
-
在 Dashboard 的 Websocket 工具裏,登陸一個 MQTT Client 併發布一條消息到 "t/1",內容爲 "hello"。
-
檢查 emqx 控制檯 (erlang shell) 裏的打印:
(emqx@127.0.0.1)1> [inspect] Selected Data: #{decoded_data => <<"hello">>, encoded_data => <<9,4,13,13,14>>} Envs: #{event => 'message.publish', flags => #{dup => false,retain => false}, from => <<"mqttjs_76e5a35b">>, headers => #{allow_publish => true, peername => {{127,0,0,1},54753}, username => <<>>}, id => <<0,5,146,30,146,38,123,81,244,66,0,0,62,117,0,1>>, node => 'emqx@127.0.0.1',payload => <<"hello">>,qos => 0, timestamp => {1568,34882,222929}, topic => <<"t/1">>} Action Init Params: #{}
Select Data 是通過 SQL 語句篩選以後的數據,Envs 是規則引擎內部可用的環境變量,Action Init Params 是動做的初始化參數。這三個數據均爲 Map
格式。
Selected Data 裏面的兩個字段 decoded_data
和 encoded_data
對應 SELECT 語句裏面的兩個 AS。由於 decoded_data
是編碼而後再解碼以後的結果,因此它又被還原爲了咱們發送的內容 "hello",代表編解碼插件工做正常。