本例介紹NiFI ExecuteScript處理器的使用,使用的腳本引擎ECMScripthtml
接上一篇【NIFI】 Apache NiFI 之 ExecuteScript處理(一)java
一、動態屬性apache
其中一個功能是動態屬性的概念,也稱爲用戶定義屬性。這些是處理器的屬性,用戶能夠爲其設置屬性名稱和值。並不是全部處理器都支持/使用動態屬性,但ExecuteScript會將動態屬性做爲變量傳遞,這些變量引用與屬性值對應的PropertyValue對象。這裏有兩件重要的事情須要注意:編程
目的:用戶輸入了動態屬性以供腳本使用(例如,配置參數)。json
方法:使用變量的PropertyValue對象中的getValue()方法。此方法返回動態屬性值的String表示形式。請注意,若是值中存在表達式語言,則getValue()將不對其進行求值緩存
Examplesapp
Javascript編程語言
1 var myValue1 = myProperty1.getValue() 2 var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
二、添加模塊post
ExecuteScript的另外一個功能是可以向類路徑添加外部「模塊」,這容許您利用各類第三方庫,腳本等。可是每一個腳本引擎都以不一樣方式處理模塊的概念,所以我將討論它們分別。通常來講,有兩種類型的模塊,Java庫(JAR)和腳本(用與ExecuteScript中相同的語言編寫lua
配置以下:
目的:腳本執行時,使用Java庫
方法:配置ExecuteScript,添加Module Directory目錄(裏面都是jar包),腳本中獲取jar包中的類,使用jar包中的類方法
Examples
Javascript
1 var ObjectMapper = Java.type("com.fasterxml.jackson.databind.ObjectMapper"); 2 var Map = Java.type("java.util.Map"); 3 4 5 var objectMapper = new ObjectMapper(); 6 7 8 // java對象轉 json字符串 9 var str = objectMapper.writeValueAsString(obj); 10 11 // json字符串 轉 java對象 12 var map = objectMapper.readValue(str, Map.class);
三、狀態存儲
NiFi(我相信0.5.0)爲處理器和其餘NiFi組件提供了持久保存一些信息的能力,以便在組件周圍實現某些狀態功能,例如,QueryDatabaseTable處理器跟蹤它在指定列中看到的最大值,這樣下次運行時,它只會獲取其值大於目前已見過的行(即存儲在州經理)。
NiFi組件能夠選擇將其狀態存儲在集羣級別或本地級別。請注意,在獨立的NiFi實例中,「羣集範圍」與「本地範圍」相同。範圍的選擇一般是關於在流中,每一個節點上的相同處理器是否能夠共享狀態數據。若是羣集中的實例不須要共享狀態,則使用本地範圍。在Java中,這些選項做爲名爲Scope的枚舉提供,所以當我引用Scope.CLUSTER和Scope.LOCAL時,我分別表示集羣和本地做用域。
要在ExecuteScript中使用狀態管理功能(下面是特定於語言的示例),您能夠經過調用ProcessContext的getStateManager()方法得到對StateManager的引用(回想一下,每一個引擎都得到一個名爲「context」的變量,其中包含ProcessContext實例)。而後,您能夠在StateManager對象上調用如下方法:
void setState(Map <String,String> state,Scope scope) - 更新組件在給定範圍內的狀態值,並將其設置爲給定值。請注意,該值是一個Map; 「組件狀態」的概念是每一個構成較低級別狀態的全部鍵/值對的映射。Map會當即更新以提供原子性。
StateMap getState(做用域範圍) - 返回給定範圍內組件的當前狀態。此方法永遠不會返回null; 相反,它是一個StateMap對象,若是還沒有設置狀態,StateMap的版本將爲-1,值的映射將爲空。一般會建立一個新的Map <String,String>來存儲更新的值,而後調用setState()或replace()。
boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope) - 當且僅當值與給定的oldValue相同時,才更新組件狀態(在給定範圍內)的值到新值。若是狀態已更新爲新值,則返回true; 不然,若是狀態的值不等於oldValue,則返回false。
void clear(範圍範圍) - 清除給定範圍內組件狀態的全部鍵和值。
目的1:腳本須要從狀態管理器獲取當前鍵/值對以供腳本使用(例如,更新)
方法1:使用ProcessContext中的getStateManager()方法,而後使用StateManager中的getStateMap(),而後使用toMap()轉換爲鍵/值對的Map <String,String>。請注意,StateMap還有一個簡單檢索值的get(key)方法,但這種方法並不經常使用,由於Map一般會更新,一旦完成,就必須爲StateManager設置。
Examples
Javascript
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();
目的2:腳本但願使用新的鍵/值對映射更新狀態映射。
方法2:要獲取當前StateMap對象,請再次使用ProcessContext中的getStateManager()方法,而後使用StateManager中的getStateMap()。這些示例假設一個新的Map,但使用上面的配方(使用toMap()方法),您可使用現有值建立一個新的Map,而後只更新所需的條目。請注意,若是沒有當前映射(即StateMap.getVersion()返回-1),則replace()將不起做用,所以示例將相應地檢查並調用setState()或replace()。重新的ExecuteScript實例運行時,StateMap版本將爲-1,所以在單次執行後,若是右鍵單擊ExecuteScript處理器並選擇View State,您應該看到以下內容:
Examples
Javascript
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var stateManager = context.stateManager; 3 var stateMap = stateManager.getState(Scope.CLUSTER); 4 var newMap = {'myKey1': 'myValue1'}; 5 if (stateMap.version == -1) { 6 stateManager.setState(newMap, Scope.CLUSTER); 7 } else { 8 stateManager.replace(stateMap, newMap, Scope.CLUSTER); 9 }
Demo流程:先存入緩存myKey1 =》 第二次取出myKey1的值 =》 myKey1的值修改 =》 再次存入緩存中
ExecuteScript內容以下:
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var stateManager = context.stateManager; 3 var stateMap = stateManager.getState(Scope.CLUSTER); 4 5 if (stateMap.version == -1) { 6 var newMap = {'myKey1': "1"}; 7 stateManager.setState(newMap, Scope.CLUSTER); 8 } else { 9 10 11 var myValue1 = stateMap.toMap().get("myKey1"); 12 myValue1 = myValue1*1 + 1; 13 var newMap = {'myKey1': myValue1 + ""}; 14 15 // 替換 16 stateManager.replace(stateMap, newMap, Scope.CLUSTER); 17 }
其餘腳本引擎,參考如下地址
參考文檔連接:https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html