用戶經過 Kuiper 進行數據分析處理後,使用各類 sink 能夠往不一樣的系統發送數據分析結果。針對一樣的分析結果,不一樣的 sink 須要的格式可能未必同樣。好比,在某物聯網場景中,當發現某設備溫度太高的時候,須要向雲端某 rest 服務發送一個請求,同時在本地須要經過 MQTT 協議 往設備發送一個控制命令,這二者須要的數據格式可能並不同,所以,須要對來自於分析的結果進行「二次處理」後,才能夠往不一樣的目標發送針對數據。本文將介紹如何利用 sink 中的數據模版(data template )來實現對分析結果的「二次處理」。git
Golang 模版將一段邏輯應用到數據上,而後按照用戶指定的邏輯對數據進行格式化輸出,Golang 模版常見的使用場景爲在網頁開發中,好比將 Golang 中的某數據結構進行轉換和控制後,將其轉換爲 HTML 標籤輸出到瀏覽器。在Kuiper 使用了 Golang 的 template(模版)對分析結果實現「二次處理」,請參考如下來自於 Golang 的官方介紹。github
模版是經過將其應用到一個數據結構上來執行的。模版中的註釋 (Annotations) 指的是數據結構中的元素(典型的爲結構體中的一個字段,或者 map 中的一個 key),註釋用於控制執行、並獲取用於顯示的值。模版的執行會迭代數據結構並設置遊標,經過符號「.」 來表示,稱之爲「dot」,在執行過程當中指向數據結構中的當前位置。模版的輸入文本能夠爲 UTF-8 編碼的任意文本。「
動做
(Actions)」 -- 數據求值或者控制結構 - 是經過 "{{" 和 "}}" 來界定的;全部在動做
以外的文本會被保持原樣到輸出,除了 raw strings,動做
不可跨行(註釋除外)。golang
Golang 模版提供了一些內置的動做,可讓用戶寫各類控制語句,用於提取內容。好比,json
{{if pipeline}} T1 {{else}} T0 {{end}}
{{range pipeline}} T1 {{else}} T0 {{end}}
讀者能夠看到,動做是用 {{}}
界定的,在 Kuiper 的數據模版使用過程當中,因爲輸出通常也是 JSON 格式, 而 JSON 格式是用 {}
來界定,所以讀者在不太熟悉使用的時候,在使用 Kuiper 的數據模版的功能會以爲比較難以理解。好比如下的例子中,數組
{{if pipeline}} {"field1": true} {{else}} {"field1": false} {{end}}
上述表達式的意思以下(請注意動做的界定符和 JSON 的界定符):瀏覽器
{"field1": true}
{"field1": false}
Golang 的模版能夠做用於各類數據結構,好比 map、切片 (slice),通道等,而 Kuiper 的 sink 中的數據模版獲得的數據類型是固定的,是一個包含了 Golang map
切片的數據類型,以下所示。數據結構
[]map[string]interface{}
流入 sink 的數據是一個 map[string]interface{}
切片的數據結構,可是用戶往目標 sink 發送數據的時候,多是須要單條的數據,而不是全部的數據。好比在這篇 Kuiper 與 AWS IoT Hub 集成 的文章中所介紹的,規則產生的樣例數據以下所示。函數
[ {"device_id":"1","t_av":36.25,"t_count":4,"t_max":80,"t_min":10}, {"device_id":"2","t_av":27,"t_count":4,"t_max":45,"t_min":12} ]
在發送到 sink 的時候,但願每條數據分開發送,首先須要將 sink 的 sendSingle
設置爲 true
,而後使用數據模版:{{json .}}
,完整配置以下,用戶能夠將其拷貝到某 sink 配置的最後。oop
... "sendSingle": true, "dataTemplate": "{{json .}}"
sendSingle
設置爲 true
後,Kuiper 把傳遞給 sink 的 []map[string]interface{}
數據類型進行遍歷處理,對於遍歷過程當中的每一條數據都會應用用戶指定的數據模版json
是 Kuiper 提供的函數(用戶能夠參考 Kuiper 擴展模版函數來了解更多的 Kuiper 擴展),能夠將傳入的參數轉化爲 JSON 字符串輸出,對於遍歷到的每一條數據,將 map 中的內容轉換爲 JSON 字符串Golang 還內置提供了一些函數,用戶能夠參考更多 Golang 內置提供的函數來獲取更多函數信息。優化
仍是針對上述例子,須要對返回的 t_av
(平均溫度)作一些轉換,轉換的基本要求就是根據不一樣的平均溫度,加入不一樣的描述文字,用於目標 sink 中的處理。規則以下,
$t_av
, it's normal.」$t_av
, it's high.」假設目標 sink 仍是須要 JSON 數據,該數據模版的內容以下,
... "dataTemplate": "{\"device_id\": {{.device_id}}, \"description\": \"{{if lt .t_av 30.0}}Current temperature is {{.t_av}}, it's normal.\"{{else if ge .t_av 30.0}}Current temperature is {{.t_av}}, it's high.\"{{end}}}" "sendSingle": true,
在上述的數據模版中,使用了 {{if pipeline}} T1 {{else if pipeline}} T0 {{end}}
的內置動做,看上去比較複雜,稍微調整一下,去掉轉義並加入縮進後排版以下(注意:在生成 Kuiper 規則的時候,不能傳入如下優化後排版的規則)。
{"device_id": {{.device_id}}, "description": " {{if lt .t_av 30.0}} Current temperature is {{.t_av}}, it's normal." {{else if ge .t_av 30.0}} Current temperature is {{.t_av}}, it's high." {{end}} }
使用了 Golang 內置的二元比較函數,
lt
: 小於ge
:大於等於值得注意的是,在 lt
和 ge
函數中,第二個參數值的類型應該與 map 中的數據實際的數據類型一致,不然會出錯。如在上述的例子中,溫度大於 30
的狀況,由於 map 中實際平均數的類型爲 float,所以第二個參數的值需傳入 30.0
,而不是 30
。
另外,模版仍是應用到切片中每條記錄上,因此仍是須要將 sendSingle
屬性設置爲 true
。最終該數據模版針對上述數據產生的內容以下,
{"device_id": 1, "description": "Current temperature is 36.25, it's high."} {"device_id": 2, "description": "Current temperature is 27, it's normal."}
經過給 sink 的 sendSingle
屬性設置爲 true
,能夠實現把傳遞給 sink 的切片數據進行遍歷。在此處,咱們將介紹一些更爲複雜的例子,好比在 sink 的結果中,包含了嵌套的數組類型的數據,如何經過在數據模版中提供的遍歷功能,本身來實現遍歷。
假設流入 sink 中的數據內容以下所示,
{"device_id":"1", "values": [ {"temperature": 10.5}, {"temperature": 20.3}, {"temperature": 30.3} ] }
需求爲,
temperature
值小於等於 25
的時候,增長一個名爲 description
的屬性,將其值設置爲 fine
。temperature
值大於 25
的時候,增長一個名爲 description
的屬性,將其值設置爲 high
。"sendSingle": true, "dataTemplate": "{{$len := len .values}} {{$loopsize := add $len -1}} {\"device_id\": \"{{.device_id}}\", \"description\": [{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}}"
該數據模板比較複雜,解釋以下,
{{$len := len .values}} {{$loopsize := add $len -1}}
,這一段執行了兩個表達式,第一個 len
函數取得數據中 values
的長度,第二個 add
將其值減 1 並賦值到變量 loopsize
:因爲 Golang 的表達式中目前還不支持直接將數值減 1 的操做, add
是 Kuiper 爲實現該功能而擴展的函數。{\"device_id\": \"{{.device_id}}\", \"description\": [
這一段模版在做用到樣例數據後,生成了 JSON 串 {"device_id": "1", "description": [
{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}
,這一段模版看起來比較複雜,可是若是把它調整一下,去掉轉義並加入縮進後排版以下,看起來可能會更加清晰(注意:在生成 Kuiper 規則的時候,不能傳入如下優化後排版的規則)。
{{range $index, $ele := .values}} {{if le .temperature 25.0}} "fine" {{else if gt .temperature 25.0}} "high" {{end}} {{if eq $loopsize $index}} ] {{else}} , {{end}} {{end}}
第一個條件判斷生成是 fine
或者 high
;第二個條件判斷是生成分隔數組的 ,
仍是數組結尾的 ]
。
另外,模版仍是應用到切片中每條記錄上,因此仍是須要將 sendSingle
屬性設置爲 true
。最終該數據模版針對上述數據產生的內容以下,
{"device_id": "1", "description": [ "fine" , "fine" , "high" ]}
經過 Kuiper 提供的數據模版功能能夠實現對分析結果的二次處理,以知足不一樣的 sink 目標的需求。可是讀者也能夠看到,因爲 Golang 模版自己的限制,實現比較複雜的數據轉換的時候會比較笨拙,但願未來 Golang 模版的功能能夠作得更增強大和靈活,這樣能夠支持處理更加複雜的需求。目前建議用戶能夠經過數據模版來實現一些較爲簡單的數據的轉換;若是用戶須要對數據進行比較複雜的處理,而且本身擴展了 sink 的狀況下,能夠在 sink 的實現中直接進行處理。
另外,Kuiper 團隊在規劃未來支持自定義擴展 sink 中的模版函數,這樣一些比較複雜的邏輯能夠在函數內部實現,用戶調用的時候只需一個簡單的模版函數調用便可實現。
版權聲明: 本文爲 EMQ 原創,轉載請註明出處。