經過REST API更新 NiFi 數據流程

經過REST API更新 NiFi 數據流程

經過 NiFi 的REST API鏈接到服務並更新數據流程。html

一、軟件環境與預備知識

二、建立測試流程

下面是一個測試流程:git

準備這個測試流程:github

  • 添加 PutFile 組件到繪圖面板。
  • 重命名processor 爲 Save File (right-click -> Configure -> Settings -> Name field)。後面將使用這個名稱經過API查找 processor。
  • 添加 GetHTTP processor,建立一個從 GetHTTP 到 ‘Save File’的鏈接。GetHTTP設置如今能夠暫不設置,Save File processor 須要一個輸入鏈接。
  • 設置 Save File 屬性以下 (這個設置後面能夠經過編程進行修改)。
  • 啓動 Save File processor (這裏不須要啓動 GetHTTP)。

注意: 對於更復雜的流程,可使用模版來建立,參考: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templatesapache

三、調用REST API

下一步, 咱們更新 Save File processor 使用不一樣的目錄 (/tmp/staging) 而且設置 「Create Missing Directories」爲true」。編程

High-level script flow:api

  1. 查找 data flow中須要操做的組件。輸入查找'Save File',這與Web UI中的查找框使用的是同一個 API。

  2.  確認這是惟一的 processor - 咱們但願修改的事但願的那一個處理節點。
  3. 與framework的狀態同步 - 取得最新版本的字段值, 將會用於下面的更新語句中。這是經典的樂觀鎖模式的實現。
  4. 構建一個小的 JSON 文檔,只包含狀態改變。
  5. 執行部分更新,經過 PUT 操做進行。
  6. 重複 4-5,中止, 更新配置 (改變目錄和目錄屬性),啓動 processor。

下面咱們直接執行這個腳本 (若是但願保存、修改這個腳本,從github中clone/checkout到本地):併發

groovy 
https://raw.githubusercontent.com/aperepel/nifi-rest-api-tutorial/master/reconfigure.groovy

能夠看到輸出信息,以下:app

Looking up a component to update...
Found the component, id/group: c35f1bb7-5add-427f-864a-bdd23bb4ac7f/f1a2c4e8-b106-4877-97d9-9dbca868fc16

Preparing to update the flow state...
	Stopping the processor to apply changes...
	Updating processor...
	{
	"revision": {
	"clientId": "my awesome script",
	"version": 309
	},
	"processor": {
	"id": "c35f1bb7-5add-427f-864a-bdd23bb4ac7f",
	"config": {
	"properties": {
	"Directory": "/tmp/staging",
	"Create Missing Directories": "true"
	}
	}
	}
	}
	Updated ok.
	Bringing the updated processor back online...
	Ok

檢查NiFi processor, 能夠看到更新的目錄和屬性,而且建立了缺失的目錄。除此以外, 每一步都被捕獲下來並記錄在 flow history中,以下:框架

當在UI中看見警告信息, 簡單地點擊Refresh連接刷新。在本文的後面將會介紹併發控制。maven

四、代碼詳解

首先, 拉取依賴項 https://github.com/jgritman/httpbuilder/wiki/RESTClient 。這個在 maven 倉庫,構建時自動獲取。

 @Grab(group='org.codehaus.groovy.modules.http-builder',
module='http-builder',
version='0.7.1')

這個庫讓咱們使用REST DSL,以下所示:

nifi.get(path: 'controller/search-results',query: [q: processorName])

nifi.put(path: "controller/process-groups/$processGroup/processors/$processorId",
    body: builder.toPrettyString(),requestContentType: JSON)

下一步, 使用Groovy的 JSON builder去構建一個JSON 文檔,實現部分PUT更新。只須要指定但願改變的屬性,以下:

builder {

revision {
    clientId 'my awesome script'
    version resp.data.revision.version
    }

processor {
    id "$processorId"
    config {
        properties {
                'Directory' '/tmp/staging'
                'Create Missing Directories' 'true'
                }
            }
          }
}

這些dot-notation變量遍歷JSON文檔樹。爲了理解如何結構化這個返回結果,啓動一個 GET 請求,將獲取一個完整的 state 文檔。

提示: UI 經過REST API來執行全部操做, 這是一個很是好的交互學習工具。注意,UI 對於PUT 和 POST (form) requests的操做是互換的, 因此選擇在於那種用起來方便。這裏咱們經過 PUT 和 JSON執行操做。

最後,  clientIdversion 在下一節中進行介紹。

五、在 NiFi 中的樂觀鎖

下圖展現了幾本概念。

對於update操做,提供 clientId 是必須的,以免一致性問題(API 將返回 409 Conflict status 代碼,若是開發者不知道這一點的話,將會引發困惑)。

controller/revision 返回用戶的 最後修改流程的clientId,這不會老是你的 id。 最佳實踐就是提供一個你的惟一ID以區分客戶端。這實際上能夠是任何格式的值, UUID 是框架在缺失時自動建立出來的缺省值。

英文:https://community.hortonworks.com/articles/3160/update-nifi-flow-on-the-fly-via-api.html

更多NiFi資源參考:http://www.javashuo.com/article/p-dkcrvjuz-nt.html

相關文章
相關標籤/搜索