Kuiper 中使用 Golang 模版 (template) 定製分析結果

簡介

用戶經過 Kuiper 進行數據分析處理後,使用各類 sink 能夠往不一樣的系統發送數據分析結果。針對一樣的分析結果,不一樣的 sink 須要的格式可能未必同樣。好比,在某物聯網場景中,當發現某設備溫度太高的時候,須要向雲端某 rest 服務發送一個請求,同時在本地須要經過 MQTT 協議 往設備發送一個控制命令,這二者須要的數據格式可能並不同,所以,須要對來自於分析的結果進行「二次處理」後,才能夠往不一樣的目標發送針對數據。本文將介紹如何利用 sink 中的數據模版(data template )來實現對分析結果的「二次處理」。git

Golang 模版介紹

Golang 模版將一段邏輯應用到數據上,而後按照用戶指定的邏輯對數據進行格式化輸出,Golang 模版常見的使用場景爲在網頁開發中,好比將 Golang 中的某數據結構進行轉換和控制後,將其轉換爲 HTML 標籤輸出到瀏覽器。在Kuiper 使用了 Golang 的 template(模版)對分析結果實現「二次處理」,請參考如下來自於 Golang 的官方介紹。github

模版是經過將其應用到一個數據結構上來執行的。模版中的註釋 (Annotations) 指的是數據結構中的元素(典型的爲結構體中的一個字段,或者 map 中的一個 key),註釋用於控制執行、並獲取用於顯示的值。模版的執行會迭代數據結構並設置遊標,經過符號「.」 來表示,稱之爲「dot」,在執行過程當中指向數據結構中的當前位置。

模版的輸入文本能夠爲 UTF-8 編碼的任意文本。「動做 (Actions)」 -- 數據求值或者控制結構 - 是經過 "{{" 和 "}}" 來界定的;全部在動做以外的文本會被保持原樣到輸出,除了 raw strings,動做不可跨行(註釋除外)。golang

動做 (Actions)

Golang 模版提供了一些內置的動做,可讓用戶寫各類控制語句,用於提取內容。好比,json

  • 根據判斷條件來輸出不一樣的內容
{{if pipeline}} T1 {{else}} T0 {{end}}
  • 循環遍歷數據,並進行處理
{{range pipeline}} T1 {{else}} T0 {{end}}

讀者能夠看到,動做是用 {{}} 界定的,在 Kuiper 的數據模版使用過程當中,因爲輸出通常也是 JSON 格式, 而 JSON 格式是用 {} 來界定,所以讀者在不太熟悉使用的時候,在使用 Kuiper 的數據模版的功能會以爲比較難以理解。好比如下的例子中,數組

{{if pipeline}} {"field1": true} {{else}}  {"field1": false} {{end}}

上述表達式的意思以下(請注意動做的界定符和 JSON 的界定符):瀏覽器

  • 若是知足了條件 pipeline,則輸出 JSON 字符串 {"field1": true}
  • 不然輸出 JSON 字符串 {"field1": false}

Kuiper sink 數據格式

Golang 的模版能夠做用於各類數據結構,好比 map、切片 (slice),通道等,而 Kuiper 的 sink 中的數據模版獲得的數據類型是固定的,是一個包含了 Golang map 切片的數據類型,以下所示。數據結構

[]map[string]interface{}

切片 (slice) 數據按條發送

流入 sink 的數據是一個 map[string]interface{} 切片的數據結構,可是用戶往目標 sink 發送數據的時候,多是須要單條的數據,而不是全部的數據。好比在這篇 Kuiper 與 AWS IoT Hub 集成 的文章中所介紹的,規則產生的樣例數據以下所示。函數

[
  {"device_id":"1","t_av":36.25,"t_count":4,"t_max":80,"t_min":10},
  {"device_id":"2","t_av":27,"t_count":4,"t_max":45,"t_min":12}
]

在發送到 sink 的時候,但願每條數據分開發送,首先須要將 sink 的 sendSingle 設置爲 true,而後使用數據模版:{{json .}},完整配置以下,用戶能夠將其拷貝到某 sink 配置的最後。oop

...
 "sendSingle": true,
 "dataTemplate": "{{json .}}"
  • sendSingle 設置爲 true後,Kuiper 把傳遞給 sink 的 []map[string]interface{} 數據類型進行遍歷處理,對於遍歷過程當中的每一條數據都會應用用戶指定的數據模版
  • json 是 Kuiper 提供的函數(用戶能夠參考 Kuiper 擴展模版函數來了解更多的 Kuiper 擴展),能夠將傳入的參數轉化爲 JSON 字符串輸出,對於遍歷到的每一條數據,將 map 中的內容轉換爲 JSON 字符串

Golang 還內置提供了一些函數,用戶能夠參考更多 Golang 內置提供的函數來獲取更多函數信息。優化

數據內容轉換

仍是針對上述例子,須要對返回的 t_av(平均溫度)作一些轉換,轉換的基本要求就是根據不一樣的平均溫度,加入不一樣的描述文字,用於目標 sink 中的處理。規則以下,

  • 當溫度小於 30,描述字段爲「Current temperature is$t_av, it's normal.」
  • 當溫度大於 30,描述字段爲「Current temperature is$t_av, it's high.」

假設目標 sink 仍是須要 JSON 數據,該數據模版的內容以下,

...
"dataTemplate": "{\"device_id\": {{.device_id}}, \"description\": \"{{if lt .t_av 30.0}}Current temperature is {{.t_av}}, it's normal.\"{{else if ge .t_av 30.0}}Current temperature is {{.t_av}}, it's high.\"{{end}}}"
"sendSingle": true,

在上述的數據模版中,使用了 {{if pipeline}} T1 {{else if pipeline}} T0 {{end}} 的內置動做,看上去比較複雜,稍微調整一下,去掉轉義並加入縮進後排版以下(注意:在生成 Kuiper 規則的時候,不能傳入如下優化後排版的規則)。

{"device_id": {{.device_id}}, "description": "
  {{if lt .t_av 30.0}}
    Current temperature is {{.t_av}}, it's normal."
  {{else if ge .t_av 30.0}}
    Current temperature is {{.t_av}}, it's high."
  {{end}}
}

使用了 Golang 內置的二元比較函數,

  • lt: 小於
  • ge:大於等於

值得注意的是,在 ltge 函數中,第二個參數值的類型應該與 map 中的數據實際的數據類型一致,不然會出錯。如在上述的例子中,溫度大於 30 的狀況,由於 map 中實際平均數的類型爲 float,所以第二個參數的值需傳入 30.0,而不是 30

另外,模版仍是應用到切片中每條記錄上,因此仍是須要將 sendSingle 屬性設置爲 true。最終該數據模版針對上述數據產生的內容以下,

{"device_id": 1, "description": "Current temperature is 36.25, it's high."}
{"device_id": 2, "description": "Current temperature is 27, it's normal."}

數據遍歷

經過給 sink 的 sendSingle 屬性設置爲 true ,能夠實現把傳遞給 sink 的切片數據進行遍歷。在此處,咱們將介紹一些更爲複雜的例子,好比在 sink 的結果中,包含了嵌套的數組類型的數據,如何經過在數據模版中提供的遍歷功能,本身來實現遍歷。

假設流入 sink 中的數據內容以下所示,

{"device_id":"1", 
 "values": [
  {"temperature": 10.5},
  {"temperature": 20.3},
  {"temperature": 30.3}
 ]
}

需求爲,

  • 當發現 "values" 數組中某個 temperature 值小於等於 25 的時候,增長一個名爲 description 的屬性,將其值設置爲 fine
  • 當發現 "values" 數組中某個 temperature 值大於 25 的時候,增長一個名爲 description 的屬性,將其值設置爲 high
"sendSingle": true,
"dataTemplate": "{{$len := len .values}} {{$loopsize := add $len -1}} {\"device_id\": \"{{.device_id}}\", \"description\": [{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}}"

該數據模板比較複雜,解釋以下,

  • {{$len := len .values}} {{$loopsize := add $len -1}},這一段執行了兩個表達式,第一個 len 函數取得數據中 values 的長度,第二個 add 將其值減 1 並賦值到變量 loopsize:因爲 Golang 的表達式中目前還不支持直接將數值減 1 的操做, add 是 Kuiper 爲實現該功能而擴展的函數。
  • {\"device_id\": \"{{.device_id}}\", \"description\": [ 這一段模版在做用到樣例數據後,生成了 JSON 串 {"device_id": "1", "description": [
  • {{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}} ,這一段模版看起來比較複雜,可是若是把它調整一下,去掉轉義並加入縮進後排版以下,看起來可能會更加清晰(注意:在生成 Kuiper 規則的時候,不能傳入如下優化後排版的規則)。

    {{range $index, $ele := .values}} 
      {{if le .temperature 25.0}}
        "fine"
      {{else if gt .temperature 25.0}}
        "high"
      {{end}} 
      {{if eq $loopsize $index}}
        ]
      {{else}}
        ,
      {{end}}
    {{end}}

    第一個條件判斷生成是 fine 或者 high;第二個條件判斷是生成分隔數組的 , 仍是數組結尾的 ]

另外,模版仍是應用到切片中每條記錄上,因此仍是須要將 sendSingle 屬性設置爲 true。最終該數據模版針對上述數據產生的內容以下,

{"device_id": "1", "description": [ "fine" , "fine" , "high" ]}

總結

經過 Kuiper 提供的數據模版功能能夠實現對分析結果的二次處理,以知足不一樣的 sink 目標的需求。可是讀者也能夠看到,因爲 Golang 模版自己的限制,實現比較複雜的數據轉換的時候會比較笨拙,但願未來 Golang 模版的功能能夠作得更增強大和靈活,這樣能夠支持處理更加複雜的需求。目前建議用戶能夠經過數據模版來實現一些較爲簡單的數據的轉換;若是用戶須要對數據進行比較複雜的處理,而且本身擴展了 sink 的狀況下,能夠在 sink 的實現中直接進行處理。

另外,Kuiper 團隊在規劃未來支持自定義擴展 sink 中的模版函數,這樣一些比較複雜的邏輯能夠在函數內部實現,用戶調用的時候只需一個簡單的模版函數調用便可實現。

版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/k...

相關文章
相關標籤/搜索