NiFi 腳本執行器使用指南 (part 3)

NiFi 腳本執行器使用指南 (part 3)

說明

ExecuteScript讓NiFi能夠執行腳原本完成數據流程任務,從而能夠編寫本身的任務節點而不只僅是採用已有的任務節點,具備很強的靈活性。html

本文是介紹使用ExecuteScript來完成任務的系列文章之一。例程包括 Groovy, Jython, Javascript (Nashorn), 以及 JRuby. 系列文章中的「菜譜」 包括:java

Part 1 - 介紹 NiFi API 和 FlowFilespython

  • 從incoming queue獲得flow file
  • 建立一個新的 flow files
  • 與 flow file attributes一塊兒工做
  • 轉換 flow file
  • 日誌 Logging

Part 2 - FlowFile I/O 和 Error Handlingsql

  • 從 flow file 中讀取
  • 寫入 flow file
  • 讀/寫 flow file
  • Error Handling

Part 3 - 高級特徵apache

  • 使用動態屬性
  • 添加模塊
  • 狀態管理
  • 存取控制器服務

高級特徵

本系列的前兩篇文章涵蓋了 flow file 的基本操做, 如讀寫屬性和內容, 以及使用"session" 變量 ( ProcessSession對象)獲取和轉移 flow files . ExecuteScript還有不少其餘的能力,這裏對一部分做簡要介紹.編程

動態屬性-Dynamic Properties

其中一個能力叫作 dynamic properties, 或者稱爲用戶定義屬性.  processor 的一些屬性能夠由用戶設置 property name 和 value. 不是全部的processors 都支持和使用動態屬性, 在 ExecuteScript 將傳遞動態屬性做爲變量,改變了引用 PropertyValue 對象,對應於property's value. 這裏有兩個重要事須要瞭解:api

  1. 由於 property 綁定爲變量名, dynamic properties的命名規則必須知足相應的編程語言的規範。 例如, Groovy 不支持 (.) 做爲變量名字符, 像 "my.value" 引發processor處理失敗. 有效的可選項是 "myValue".
  2. PropertyValue 對象用於 (rather than a String representation of the value) 腳本執行多種操做,在轉換爲String以前進行。若是property已知包含合法的值, 你能夠調用 該變量的 getValue() 方法獲得其字符串表示. 若是值包含 Expression Language,或者但願轉爲除字符串外的其它值(如 'true' 對於Boolean 對象), 這裏也提供了操做方法. 這些例子在下面的示例中演示, 假定咱們有兩個屬性 'myProperty1' 和 'myProperty2',像下面這樣被定義:

Recipe: 獲得 dynamic property的值

Use Case: 在腳本中獲得 dynamic property(如, 配置參數).緩存

Approach: 使用變量的PropertyValue對象的getValue() 方法. 該方法返回其字符串表明 dynamic property. 注意,若是Expression Language包含在字符串中, getValue() 將不會對其求值(參加下一個方法實現求職功能).ruby

Examples:服務器

Groovy

def myValue1 = myProperty1.value

Jython

myValue1 = myProperty1.getValue()

Javascript

var myValue1 = myProperty1.getValue()

JRuby

myValue1 = myProperty1.getValue()

Recipe:  對 Expression Language 求值後獲得動態屬性的值

Use Case: 使用腳本中的動態屬性 dynamic property, 在輸入 flow file 中引用attribute(s) .

Approach: 從變量的PropertyValue對象使用 evaluateAttributeExpressions(flowFile) 方法. 該方法接着調用getValue(), 返回動態屬性的字符串表示,並且對 Expression Language constructs 進行了求值. 若是 flow file不可用, 可是變量在環境或Variable Registry被定義, 你能夠無參數調用 evaluateAttributeExpressions() 。

Examples:

Groovy

def myValue1 = myProperty1.value
def myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).value

Jython

myValue1 = myProperty1.getValue()
myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()

Javascript

var myValue1 = myProperty1.getValue()
var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()

JRuby

myValue1 = myProperty1.getValue()
myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()

 

添加模塊

ExecuteScript 的另外一個特徵是具備添加外部模塊到 classpath 的能力, 這將容許使用大量的第三方庫、腳本等加強能力. 可是,每個腳本引擎處理模塊的方法都是不同的, 所以須要分開討論。整體上說, 主要有兩種類型的模塊, Java libraries (JARs) 和 scripts (以在 ExecuteScript中的同一種語言編寫. 這裏將討論和顯示不一樣 script engines 如何進行處理:

Groovy

 Groovy script engine (至少在 ExecuteScript中) 不支持導入其餘的 Groovy scripts, 可是容許 JARs 添加到 classpath. 所以,對於外部Groovy projects, 考慮編譯爲bytecode,而後指向 classes 目錄或者包裝爲 JAR.

當使用 Groovy, 這個 Module Directory 屬性設爲 comma-separated 的文件列表 (JARs) 和 folders. 若是folder 被指定, ExecuteScript 將發現該目錄全部的 JARs 並添加進去. 這容許你添加第三方軟件,哪怕包含不少個 JARs.  Groovy的例子參見 this blog post.

Jython

Jython script engine (在 ExecuteScript) 目前僅支持導入純 Python 模塊, 不包含natively-compiled modules (如CPython),如 numpy 或 scipy. 目前也暫不支持 JARs, 這在未來版本中也許會考慮. 查看 this HCC post 獲得更多細節. 在Module Directory property在執行前須要加載, 使用"import sys" 跟着 "sys.path.append" 對每個指定的模塊位置進行加載.

若是 Python 已經安裝, 能夠將全部的安裝好的純 Python modules 添加進來,經過將 site-packages 目錄加到Module Directory 屬性便可, 如:

/usr/local/lib/python2.7/site-packages

而後,你的腳本就能 import 各類軟件包了,如:

from user_agents import parse

Javascript

Javascript script engine (在 ExecuteScript), 容許一樣的 JARs/folders設置,與 Groovy engine同樣. 將查找JARs 以及指定的folder.

JRuby

JRuby script engine (在 ExecuteScript) 目前只容許單個的 JARs指定, 若是 folder 被指定,其中必定要有classes ( java compiler 須要能看見), 若是folder 包含 JARs將不會自動加入。目前, 沒有pure Ruby 模塊能被導入.

我但願未來可以改進全部這些腳本引擎, 從而具備更爲強大的功能和一致的用戶體驗.

狀態管理

NiFi (如0.5.0 ) 提供了爲 Processors 和其餘 NiFi 組件持久化一些信息從而實現組件的狀態管理功能. 例如,  QueryDatabaseTable processor 保存對大數據集的跟蹤, 當下次運行時, 將只會獲取哪些比原來(存儲在 State Manager)更大的行的數據.

狀態管理的一個重要概念是Scope. NiFi 組件能夠選擇存儲它的狀態在集羣級別仍是本地級別. 注意,在獨立的 NiFi 實例中, "cluster scope" 與 "local scope"是同樣的. 這個 scope 選擇的區別在於在一個數據流中,每一個結點的處理器是否須要共享狀態信息. 若是集羣中的實例不須要共享狀態,就使用local scope. 在 Java,這些選項做爲一個 enum變量 Scope提供, 所以,當引用 Scope.CLUSTER 和 Scope.LOCAL, 就意味着是集羣模式或本地模式.

爲了探究ExecuteScript (語言獨立的例子以下)狀態管理的特徵 , 你能夠得到 StateManager的引用,經過調用 ProcessContext的 getStateManager() 方法實現 (recall that each engine gets a variable named "context" with an instance of ProcessContext). 而後調用 StateManager 對象的下面方法:

void setState(Map<String, String> state, Scope scope) - 在給定的scope更新組件狀態的值, 設置爲給定的值. 注意,這個值是 Map 數據結構; 概念 "component state" 全部的 key/value鍵值對的 Map. 該 Map被一次所有更新,從而提供原子性.

StateMap getState(Scope scope) - 返回組件在給定scope的當前狀態. 該方法永不會返回 null; 對於 StateMap 對象,若是 state沒有被設置, StateMap's 版本將是 -1, 而 map的值將是 empty. 常常,一個新的 Map<String,String> 被建立來存儲更新的值,而後setState()或 replace() 被調用.

boolean replace(StateMap oldValue, Map<String, String> newValue, Scope scope) - 更新組件的狀態值 (在給定的 scope)爲新的值,僅在當前值與給定的 oldValue同樣時執行. 若是 state 被更新爲新的值, 返回true; 不然返回 false,若是state's value 不等於oldValue.

void clear(Scope scope) - 從給定的scope下,清除組件狀態全部的鍵值.

 

Recipe: 獲得當前map的 key/value 對

Use Case: 腳本從狀態管理器獲得當前的 key/value 對,而後在 script 中使用(如更新等).

Approach: 使用ProcessContext的getStateManager()方法, 而後從 StateManager調用 getStateMap() , 再 toMap() 轉換爲Map<String,String>形式的key/value對. 注意,StateMap 也有 get(key) 方法去簡化得到 value的方法, 可是不如 Map用的廣泛。必須在 StateManager 一次性設置完畢.

Examples:

Groovy

import org.apache.nifi.components.state.Scope
def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()

Jython

from org.apache.nifi.components.state import Scope
oldMap = context.stateManager.getState(Scope.LOCAL).toMap()

Javascript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();

JRuby

java_import org.apache.nifi.components.state.Scope
oldMap = context.stateManager.getState(Scope::LOCAL).toMap()

注意: 只有 Scope class 被明確地在腳本中引用, 所以這是惟一被imported的. 若是你引用了 StateManager, StateMap, 等等,你須要 import 這些 classes.

Recipe: 更新 key/value 映射的值對

Use Case: 腳本但願經過新的包含key/value的映射值對來更新 state map.

Approach: 爲了獲得當前的 StateMap 對象, 再次用ProcessContext調用 getStateManager() 方法, 而後 StateManager調用getStateMap() . 例子中假定爲新的 Map, 可是使用上面的配方 (經過 toMap() 方法), 你可使用存在的值建立新的 Map, 而後用於更新想要的記錄. 注意,若是沒有當前map (i.e. the StateMap.getVersion() returns -1),replace() 將不會工做, 所以例子中檢查並相應地調用 setState() 或 replace(). 當從ExecuteScript的新實例運行時,該StateMap 版本將會是 -1, 當單次執行後, 若是鼠標右鍵 ExecuteScript processor,而後選擇 View State, 將看到以下所示的信息:

 

Examples:

Groovy

import org.apache.nifi.components.state.Scope
def stateManager = context.stateManager
def stateMap = stateManager.getState(Scope.CLUSTER)
def newMap = ['myKey1': 'myValue1']

if (stateMap.version == -1) {
    stateManager.setState(newMap, Scope.CLUSTER);
} else {
    stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}

Jython

from org.apache.nifi.components.state import Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope.CLUSTER)

newMap = {'myKey1': 'myValue1'}

if stateMap.version == -1:
    stateManager.setState(newMap, Scope.CLUSTER)
else:
    stateManager.replace(stateMap, newMap, Scope.CLUSTER)

Javascript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
var stateManager = context.stateManager;
var stateMap = stateManager.getState(Scope.CLUSTER);
var newMap = {'myKey1': 'myValue1'};

if (stateMap.version == -1) {
    stateManager.setState(newMap, Scope.CLUSTER);
} else {
    stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}

JRuby

java_import org.apache.nifi.components.state.Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope::CLUSTER)
newMap = {'myKey1'=> 'myValue1'}

if stateMap.version == -1
    stateManager.setState(newMap, Scope::CLUSTER)
else
    stateManager.replace(stateMap, newMap, Scope::CLUSTER)
end

 

Recipe: 清空 state map

Use Case: 清空 state map全部的e key/value 值.

Approach: 使用ProcessContext的getStateManager()方法, 而後調用StateManager的clear(scope)方法。

Examples:

Groovy

import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope.LOCAL)

Jython

from org.apache.nifi.components.state import Scopecontext.state
Manager.clear(Scope.LOCAL)

Javascript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
context.stateManager.clear(Scope.LOCAL);

JRuby

java_import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope::LOCAL)

 

存取控制器服務

在 NiFi ARchive (NAR) 結構中, Controller Services-控制器服務被暴露爲 interfaces, 在 API JAR中. 例如 , DistributedCacheClient 是一個從 ControllerService擴展來的接口, 位於 nifi-distributed-cache-client-service-api JAR中, 在 nifi-standard-services-api-nar NAR. 其餘的 NARs 若是想要引用interfaces (去建立新的 client implementation, e.g.) 必須指定 nifi-standard-services-api-nar 做爲父級 NAR, 而後在processor的子模塊提供 API JARs 的實例。

這是一些底層的細節,可能須要的以提高 Controller Services的使用, 我提到這些主要是兩個緣由:

  1. 在 NiFi 1.0.0前, scripting NAR (包括 ExecuteScript 和 InvokeScriptedProcessor) 不須要指定nifi-standard-services-api-nar 做爲父級. 這意味着只有明確的引用能被用於 ControllerServices 接口 (及其實現), 一樣的緣由, 只有沒有要求其餘不可用類的接口方法能夠被使用. 這限制了 ExecuteScript 對Controller Services的使用, 一個能夠工做的例子,你能夠查看  my blog post.
  2. NiFi 1.0.0, scripting processors 在nifi-standard-services-api-nar中存取 Controller Service interfaces (及其相關的classes) . 這包括DBCPService, DistributedMapCacheClient, DistributedSetCacheClient, HttpContextMap 和 SSLContextService. 可是我不相信nifi-standard-services-api-nar中其它的API 將會可用, 並且沒有定製化 ControllerService interfaces 將被識別. 你可能使用方法 blog post in #1 去那樣作.

Processors 老是傾向於使用 Controller Service 實例建立 property (如PropertyDescriptor 對象) 而且調用 identifiesControllerService(class) . 當 UI component被渲染時, 將會發現全部的實現了指望接口的 Controller Services ,  component's ID 被使用, 友好顯示名稱被顯示給用戶.

對於ExecuteScript, 咱們可讓用戶選擇Controller Service 實例,經過讓他指定名稱或者 ID 來實現. 若是咱們容許用戶指定name, 腳本將不得不執行一個查詢Controller Service實例列表去找到匹配名稱的元素。  這在上面的博客中提到了, 這裏再也不重複. 若是用戶輸入實例的ID, 而後 (在 NiFi 1.0.0) 將會更加容易滴匹配對象並存取,在下面將會看到. 這個例子將使用DistributedMapCacheClientService 實例爲 "distMapClient", 鏈接到DistributedMapCacheServer 實例 (在標準的缺省配置下, localhost:4557), 這裏 client instance 的ID爲 93db6734-0159-1000-b46f-78a8af3b69ed:

 

在ExecuteScript 配置中, dynamic property被建立, 名爲 "clientServiceId" 而且設爲 93db6734-0159-1000-b46f-78a8af3b69ed:

而後咱們使用clientServiceId.asControllerService(DistributedMapCacheClient), 這裏參數是對DistributedMapCacheClient類對象的引用. 例如, 我有一個預先填充的緩存,字符串 key 'a' 設爲字符串值  'hello'. 讓 Groovy script 使用 DistributedMapCacheServer進行工做, 請查看 my article here.

一旦咱們有了一個 DistributedMapCacheClient 實例, 而後就能夠調用get(key, serializer, deserializer)去獲取值. 在這個例子中,由於keys 和 values 都是Strings, 咱們只須要一個 Serializer<String> 和 Deserializer<String> 實例傳給 get() 方法. 該方法對於全部語言都是同樣的,經過 StreamCallback 實例的建立(在本系列文章的 Part 2). 這個將從預先填充的服務器獲得 key 'a' 的值,而且記錄值("Result = hello")。

Recipe: 獲得property(存儲在 DistributedMapCacheServer)

Use Case: 用戶發佈值到 DistributedMapCacheServer (如配置數據, e.g.),而後使用腳本進行訪問.

Approach: 使用上面描述的方法,建立一個StringSerializer 和 StringDeserializer 對象, 而後經過ID獲得DistributedMapCacheClientService 實例, 而後調用服務的 get() 方法. 記錄下結果到日誌,方便後面查看.

Examples:

Groovy

import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import java.nio.charset.StandardCharsets

def StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} 
    as Serializer<String>
def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>
def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
def result = myDistClient.get('a', StringSerializer, StringDeserializer)
log.info("Result = $result")

Jython

from org.python.core.util import StringUtil
from org.apache.nifi.distributed.cache.client 
    import DistributedMapCacheClient, Serializer, Deserializer

# Define a subclass of Serializer for use in the client's get() method
class StringSerializer(Serializer):
def __init__(self):
    pass
def serialize(self, value, out):
    out.write(value)

# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer(Deserializer):
def __init__(self):
    pass
def deserialize(self, bytes):
    return StringUtil.fromBytes(bytes)

myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
result = myDistClient.get('a', StringSerializer(), StringDeserializer())
log.info('Result = ' + str(result))

Javascript

var DistributedMapCacheClient = 
    Java.type('org.apache.nifi.distributed.cache.client.DistributedMapCacheClient');
var Serializer = Java.type('org.apache.nifi.distributed.cache.client.Serializer');
var Deserializer = Java.type('org.apache.nifi.distributed.cache.client.Deserializer');
var StandardCharsets = Java.type('java.nio.charset.StandardCharsets');
var StringSerializer = new Serializer(function(value, out) {
        out.write(value.getBytes(StandardCharsets.UTF_8));
    })

var StringDeserializer = new Deserializer(function(arr) {
    // For some reason I had to build a string from the character codes in the "arr" array
var s = "";

for(var i = 0; i < arr.length; i++) {
    s = s + String.fromCharCode(arr[i]);
}

return s;
})

var myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.class);
var result = myDistClient.get('a', StringSerializer, StringDeserializer);
log.info("Result = "+ result);

JRuby

java_import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
java_import org.apache.nifi.distributed.cache.client.Serializer
java_import org.apache.nifi.distributed.cache.client.Deserializer
java_import java.nio.charset.StandardCharsets

# Define a subclass of Serializer for use in the client's get() method
class StringSerializer

include Serializer

def serialize(value, out)
    out.write(value.to_java.getBytes(StandardCharsets::UTF_8))
end
end

# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer
include Deserializer
def deserialize(bytes)
    bytes.to_s
end
end

myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.java_class)
result = myDistClient.get('a', StringSerializer.new, StringDeserializer.new)
log.info('Result = ' + result)

本文包含了更爲複雜的一些例子,描述瞭如何使用支持的多種語言與NiFi API進行交互. 我可能添加一些其餘的內容到本序列, 關於 scripting processors, 一些額外的特徵可能被改進或添加進來 (如 ScriptedReportingTask將很快可用!). 我一向地歡迎任何問題、建議、說明等等. 但願你能喜歡!

相關文章
相關標籤/搜索