NiFi 腳本執行器使用指南 (part 1)

NiFi腳本執行器-ExecuteScript 使用指南 (part 1)

ExecuteScript讓NiFi能夠執行腳原本完成數據流程任務,從而能夠編寫本身的任務節點而不只僅是採用已有的任務節點,具備很強的靈活性。html

本文是介紹使用ExecuteScript來完成任務的系列文章之一。例程包括 Groovy, Jython, Javascript (Nashorn), 以及 JRuby. 系列文章中的「菜譜」 包括:java

Part 1 - 介紹 NiFi API 和 FlowFilesapache

  • 從incoming queue獲得flow file
  • 建立一個新的 flow files
  • 與 flow file attributes一塊兒工做
  • 轉換 flow file
  • 日誌 Logging

Part 2 - FlowFile I/O 和 Error Handling編程

  • 從 flow file 中讀取
  • 寫入 flow file
  • 讀/寫 flow file
  • Error Handling

Part 3 - 高級特徵數組

  • 使用動態屬性
  • 添加模塊
  • 狀態管理
  • 存取控制器服務

介紹

ExecuteScript 是一個萬能的處理器,容許用戶使用編程語言定義本身的數據處理功能, 在每一次 ExecuteScript processor 觸發時被調用。下面的變量綁定到腳本環境,以提供腳本中訪問 NiFi 組件環境:ruby

session: 是對processor的ProcessSession屬性的引用。session容許在 flow files 執行下面的操做: create(), putAttribute(), transfer(), 像 read() 和 write()同樣。session

context: 是對 ProcessContext 的引用。能夠用於檢索 processor 的屬性, 關係, Controller Services, 和 StateManager。數據結構

log: 是對ComponentLog的引用。用於 log 消息到 NiFi系統, 如 log.info('Hello world!')。併發

REL_SUCCESS: 這是對 "success" relationship 的引用。這是從父類 (ExecuteScript)的靜態變量基礎來的, 可是一些引擎(如 Lua)不容許引用靜態成員, 只是一個爲了方便的變量。框架

REL_FAILURE: 這是對 "failure" relationship 的引用。與 REL_SUCCESS 同樣, 這是從父類 (ExecuteScript)的靜態變量基礎來的, 可是一些引擎(如 Lua)不容許引用靜態成員, 只是一個爲了方便的變量。

Dynamic Properties: 任何在ExecuteScript定義的動態屬性都做爲變量集合到 PropertyValue 對象,對應於dynamic property。容許得到property的 String 值 , 經過NiFi表達式進行求值,得到相應的類型 (如 Boolean, 等等)。 由於動態屬性名稱成爲腳本里的變量名, 你須要瞭解所選的腳本引擎的變量命名屬性。 例如, Groovy 不容許變量名中提供 (.) , 因此,若是 "my.property"做爲動態屬性將會報錯。

與這些變量名的交互經過 NiFi Java API進行, 下面的每個案例將討論相應的API調用。 下面的案例執行不一樣的函數操做 flow files, 如 reading/writing 屬性, 轉換爲 relationship, logging, 等等。須要注意,這裏的例子只是一些片斷。舉例來講, 若是使用session.get()從隊列中獲取 flow file , 必須轉換爲 relationship 或者移除, 要不將會引起錯誤。代碼片斷應該是平面化的並且保持清晰,沒有容易引發混亂的代碼,僅用於演示概念,從而讓工做簡單。 在之後的文章中,我將這些放到一塊兒,造成完整的腳本,能夠幹一些有用的事情。

Recipes

Recipe: 從session中得到flow file

Use Case: 從隊列中得到輸入flow file,執行 ExecuteScript 並進行處理。

Approach: 使用 session對象的get(). 該方法返回FlowFile,是下一個最高優先級的FlowFile用於處理. 若是沒有 FlowFile 用於處理, 該方法將返回 null. 注意, 若是一個持續的FlowFiles流進入processor,也可能會返回null. 這在多個併發任務處理時會發生,此時其餘任務已得到了 FlowFiles. 若是腳本要求有一個 FlowFile才能繼續處理, 若是是session.get()獲得null應該當即返回。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return

Jython

flowFile = session.get()
if (flowFile != None):
# All processing code starts at this indent
# implicit return at the end

Javascript

var flowFile = session.get();
if (flowFile != null) {
// All processing code goes here
}

JRuby

flowFile = session.get()
if flowFile != nil
# All processing code goes here
end

 

Recipe: 獲得多個 flow files

Use Case: 從queue(s)得到多個flow files用於ExecuteScript處理

Approach: 使用session對象的get(maxResults) 方法. 該方法從工做隊列中返回最多 maxResults 個FlowFiles . 若是沒有 FlowFiles 可用, 一個空的list 將被返回 (而不是返回 null).

注意: 若是多個輸入隊列存在, 在單個調用是否多個隊列或單個隊列將被拉去的行爲是未指定的. 觀察到的行爲 (對於 NiFi 1.1.0+ 和之前的版本) 描述在 here.

Examples:

Groovy

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
    flowFileList.each { flowFile ->
    // Process each FlowFile here
    }
}

Jython

flowFileList = session.get(100)
if not flowFileList.isEmpty():
    for flowFile in flowFileList:
        # Process each FlowFile here

Javascript

flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
    for each (var flowFile in flowFileList) {
        // Process each FlowFile here
    }
}

JRuby

flowFileList = session.get(100)
if !(flowFileList.isEmpty())
    flowFileList.each { |flowFile|
        # Process each FlowFile here}
end

 

Recipe: 建立一個新的FlowFile。

Use Case: 建立一個新的 FlowFile 發送到下一步的 processor

Approach: 使用session的 create() 方法. 該方法返回 FlowFile 對象, 以用於後續的處理操做。

Examples:

Groovy

flowFile = session.create()
// Additional processing here

Jython

flowFile = session.create()
# Additional processing here

Javascript

var flowFile = session.create();
// Additional processing here

JRuby

flowFile = session.create()
# Additional processing here

 

Recipe: 從父級FlowFile建立新的 FlowFile。

Use Case: You want to generate new FlowFile(s) based on an incoming FlowFile

Approach: 使用session的 create(parentFlowFile) 方法. 該方法得到父級 FlowFile 的引用,而後返回 and 新的派生 FlowFile 對象。新建立的 FlowFile 將繼承父級的全部屬性(除了 UUID). 該方法將自動建立一個 起源 FORK 事件或 起源 JOIN 事件, 取決於FlowFiles 是否從同一個parent建立,在 ProcessSession被提交的時候.

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
    newFlowFile = session.create(flowFile)
    // Additional processing here

Jython

flowFile = session.get()
if (flowFile != None):
    newFlowFile = session.create(flowFile)
    # Additional processing here

Javascript

var flowFile = session.get();
if (flowFile != null) {
    var newFlowFile = session.create(flowFile);
    // Additional processing here
}

JRuby

flowFile = session.get()
if flowFile != nil
    newFlowFile = session.create(flowFile)
    # Additional processing here
end

 

Recipe: 添加屬性到 flow file

Use Case: 在已有的 flow file 上添加本身的屬性。

Approach: 使用session對象的 putAttribute(flowFile, attributeKey, attributeValue) 方法。 該方法更新給定的 FlowFile's 屬性,使用給出的 key/value 對來進行。

注意: 對象的 "uuid" 屬性是固定的,而且不能修改; 若是key被命名爲 "uuid", 將被忽略.

這裏的FlowFile 對象是不可改變的; 這意味着,若是經過API更新了 FlowFile 的屬性 (或其它的改變了) , 你將獲得一個新版的FlowFile的新的引用。當轉換FlowFiles到relationships時這是很是重要的。你必須保持對FlowFile的最新版本的引用, 你必須轉換或者移除全部的FlowFiles的最後版本, 不然執行時將會獲得錯誤信息。常常狀況下, 該用於存儲 FlowFile 引用變量將會被最後返回的版本覆蓋 (中間的 FlowFile 應用將會被自動拋棄). 在這個例子中,你能夠看到當添加屬性時重用flowFile引用的技術。注意到當前的flowFile引用被傳遞給putAttribute() 方法. 這個結果FlowFile具備命名爲 'myAttr'值爲 'myValue'的屬性。若是你有一個對象,能夠序列化爲String. 最終, 請注意若是你添加了多個屬性, 最好建立一個Map,而後使用 putAllAttributes() 方法來進行賦值。

Examples:

Groovy

flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Jython

flowFile = session.get()
if (flowFile != None):
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
    # implicit return at the end

Javascript

var flowFile = session.get();
if (flowFile != null) {
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}

JRuby

flowFile = session.get()
if flowFile != nil
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
end

 

Recipe: 添加多個屬性到一個flow file

Use Case: 向 flow file 添加自定義屬性。

Approach: 使用 session對象的putAllAttributes(flowFile, attributeMap) 方法。該方法更新給定的FlowFile's 屬性,以 key/value 對的方式存儲在Map中返回。

注意:  "uuid" 對 FlowFile是固定的; 若是 key 被命名爲 "uuid", 將會被忽略。

該技術建立了一個 Map (aka dictionary in Jython, hash in JRuby) 用於更新,而後調用putAllAttributes() 。這比對putAttribute() 對每個 key/value 遍歷效率更高, 這將致使對每個屬性調用時 FlowFile 都須要建立一個副本 (查看上面 FlowFile 不變性的討論)。下面例子中的Map包含兩個條目: myAttr1 和 myAttr2, 設爲 '1' 而且第二個爲 String (附着到方法簽名,對key和value都要求 String)。 注意到session.transfer() 在這裏並未指定 (所以下面的代碼片斷並不能工做), 查看下面的方法。

Examples:

Groovy

attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)

Jython

attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get()
if (flowFile != None):
    flowFile = session.putAllAttributes(flowFile, attrMap)
    # implicit return at the end

Javascript

var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get()

if (flowFile != null) {
    flowFile = session.putAllAttributes(flowFile, attrMap)
}

JRuby

attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
flowFile = session.get()
if flowFile != nil
flowFile = session.putAllAttributes(flowFile, attrMap)
end

 

Recipe: 從 flow file 獲得屬性

Use Case: 得到flow file 的屬性。

Approach: 使用FlowFile對象getAttribute(attributeKey) 。 該方法對於給定的attributeKey返回一個字符串值 , 若是沒有找到相應的key就返回null. 下面的例子演示返回FlowFile的 "filename" 屬性。

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return
myAttr = flowFile.getAttribute('filename')

Jython

flowFile = session.get()

if (flowFile != None):
    myAttr = flowFile.getAttribute('filename')
    # implicit return at the end

Javascript

var flowFile = session.get()

if (flowFile != null) {
    var myAttr = flowFile.getAttribute('filename')
}

JRuby

flowFile = session.get()

if flowFile != nil
myAttr = flowFile.getAttribute('filename')
end

 

Recipe: 從 flow file獲得全部的屬性

Use Case: 從flow file獲得全部的屬性。

Approach: 使用FlowFile對象的getAttributes() 方法。 該方法返回 Map 數據結構,由字符串的 keys 和 values組成, 表明一個FlowFile的屬性的 key/value 值對。 下面的顯示如何遞歸顯示FlowFile的全部屬性的Map的值。

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return
flowFile.getAttributes().each { key,value ->
    // Do something with the key/value pair
}

Jython

flowFile = session.get()

if (flowFile != None):
    for key,value in flowFile.getAttributes().iteritems():
    # Do something with key and/or value

# implicit return at the end

Javascript

var flowFile = session.get()

if (flowFile != null) {
    var attrs = flowFile.getAttributes();
    for each (var attrKey in attrs.keySet()) {
        // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
    }
}

JRuby

flowFile = session.get()

if flowFile != nil
    flowFile.getAttributes().each 
    { |key,value|
        # Do something with key and/or value
    }
end

 

Recipe: 轉移一個flow file 到 relationship

Use Case: 在處理完flow file (new or incoming)以後, 你但願將flow file轉移到 relationship ("success" or "failure"). 在這個簡單的例子中,讓咱們假定有一個變量叫作 "errorOccurred", 用於指示在哪一種 relationship下 FlowFile 將被轉移。更多的錯誤處理技術在本系列文章的第二部分討論。

Approach: 使用session對象的transfer(flowFile, relationship) 方法。基於給定的relationship,該方法將給定的FlowFile發送到適合的目標處理器隊列。若是relationship通向不止一個目標,FlowFile的狀態將被複制 ,從而每個目標都將收到一個 FlowFile的拷貝,所以也將具備惟一的標識符UUID。

注意: 最後,ExecuteScript將執行session.commit() 以進行操做的提交。你不須要在腳本內部執行session.commit() 來執行提交操做。

Examples:

Groovy

flowFile = session.get()

if(!flowFile) return

// Processing occurs here
if(errorOccurred) {
    session.transfer(flowFile, REL_FAILURE)
}
else {
    session.transfer(flowFile, REL_SUCCESS)
}

Jython

flowFile = session.get()

if (flowFile != None):
    # All processing code starts at this indent
    if errorOccurred:
        session.transfer(flowFile, REL_FAILURE)
    else:
        session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end

Javascript

var flowFile = session.get();

if (flowFile != null) {
    // All processing code goes here
    if(errorOccurred) {
        session.transfer(flowFile, REL_FAILURE)
    }
    else {
        session.transfer(flowFile, REL_SUCCESS)
    }
}

JRuby

flowFile = session.get()
if flowFile != nil
    # All processing code goes here
    if errorOccurred
        session.transfer(flowFile, REL_FAILURE)
    else
        session.transfer(flowFile, REL_SUCCESS)
    end
end

 

Recipe: 發送消息到 log並制定日誌級別

Use Case: 但願報告一些事件、消息並經過日誌框架寫入。

Approach: 使用 log 的方法(), trace(), debug(), info(), 或 error() 完成。這些方法能夠是單個的字符串或者字符串數組對象, 或字符串後面跟着Throwable的對象數組。第一個用於簡單消息. 第二個用於一些動態對象(值)的log。在消息字符串中使用 "{}" 進行引用。這些用於對對象數組進行求值,當消息讀到 "Found these things: {} {} {}" 而且 Object array 是 ['Hello',1,true], 那麼logged 消息將是 "Found these things: Hello 1 true",第三種logging方法帶一個 Throwable 參數, 這在例外被捕捉到而且但願日誌記錄時使用。

Examples:

Groovy

log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

Jython

from java.lang import Object
from jarray import array

objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)

Javascript

var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);

objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)

JRuby

log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)

但願這些代碼片斷可以演示 NiFi API 在不一樣腳本語言中的用法以及 flow file 操做。在後續的文章中,我將把這些方法所有放到一塊兒,從而構成一個完整的腳本. 對於更多的例子, 用例和解釋,請參考 my blog. 在本序列的下一篇文章中, 我將討論如何讀寫flow files的內容, 以及錯誤處理技術. 但願能幫到你!

英:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

相關文章
相關標籤/搜索