【NIFI】 Apache NiFI 之 ExecuteScript處理(二)

  本例介紹NiFI ExecuteScript處理器的使用,使用的腳本引擎ECMScripthtml

  接上一篇【NIFI】 Apache NiFI 之 ExecuteScript處理(一)java

ExecuteScript使用

  一、動態屬性apache

    其中一個功能是動態屬性的概念,也稱爲用戶定義屬性。這些是處理器的屬性,用戶能夠爲其設置屬性名稱和值。並不是全部處理器都支持/使用動態屬性,但ExecuteScript會將動態屬性做爲變量傳遞,這些變量引用與屬性值對應的PropertyValue對象。這裏有兩件重要的事情須要注意:編程

    • 因爲屬性名稱按原樣綁定到變量名稱,所以必須爲指定的編程語言支持動態屬性的命名約定。例如,Groovy不支持句點(。)做爲有效的變量字符,所以動態屬性(如「my.value」)將致使處理器失敗。在這種狀況下,有效的替代方案是「myValue」。
    • 使用PropertyValue對象(而不是值的String表示形式)以容許腳本在將屬性值評估爲String以前對屬性的值執行各類操做。若是已知屬性包含文字值,則能夠對變量調用getValue()方法以獲取其String表示形式。相反,若是值可能包含表達式語言,或者您但願將值轉換爲除String以外的值(例如對布爾對象的值爲'true'),則還有這些操做的方法。這些示例在下面的配方中說明,假設咱們有兩個屬性'myProperty1'和'myProperty2'定義以下:

    

    目的:用戶輸入了動態屬性以供腳本使用(例如,配置參數)。json

    方法:使用變量的PropertyValue對象中的getValue()方法。此方法返回動態屬性值的String表示形式。請注意,若是值中存在表達式語言,則getValue()將不對其進行求值緩存

    Examplesapp

      Javascript編程語言

1 var myValue1 = myProperty1.getValue()
2 var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()

 

   二、添加模塊post

    ExecuteScript的另外一個功能是可以向類路徑添加外部「模塊」,這容許您利用各類第三方庫,腳本等。可是每一個腳本引擎都以不一樣方式處理模塊的概念,所以我將討論它們分別。通常來講,有兩種類型的模塊,Java庫(JAR)和腳本(用與ExecuteScript中相同的語言編寫lua

    配置以下:
      

    目的:腳本執行時,使用Java庫

    方法:配置ExecuteScript,添加Module Directory目錄(裏面都是jar包),腳本中獲取jar包中的類,使用jar包中的類方法

    Examples

      Javascript

 1 var ObjectMapper = Java.type("com.fasterxml.jackson.databind.ObjectMapper");
 2 var Map = Java.type("java.util.Map");
 3 
 4 
 5 var objectMapper = new ObjectMapper();
 6 
 7 
 8 // java對象轉 json字符串
 9 var str = objectMapper.writeValueAsString(obj);
10 
11 // json字符串 轉 java對象
12 var map = objectMapper.readValue(str, Map.class);  

  三、狀態存儲

    NiFi(我相信0.5.0)爲處理器和其餘NiFi組件提供了持久保存一些信息的能力,以便在組件周圍實現某些狀態功能,例如,QueryDatabaseTable處理器跟蹤它在指定列中看到的最大值,這樣下次運行時,它只會獲取其值大於目前已見過的行(即存儲在州經理)。

    NiFi組件能夠選擇將其狀態存儲在集羣級別或本地級別。請注意,在獨立的NiFi實例中,「羣集範圍」與「本地範圍」相同。範圍的選擇一般是關於在流中,每一個節點上的相同處理器是否能夠共享狀態數據。若是羣集中的實例不須要共享狀態,則使用本地範圍。在Java中,這些選項做爲名爲Scope的枚舉提供,所以當我引用Scope.CLUSTER和Scope.LOCAL時,我分別表示集羣和本地做用域。

    要在ExecuteScript中使用狀態管理功能(下面是特定於語言的示例),您能夠經過調用ProcessContext的getStateManager()方法得到對StateManager的引用(回想一下,每一個引擎都得到一個名爲「context」的變量,其中包含ProcessContext實例)。而後,您能夠在StateManager對象上調用如下方法:

    void setState(Map <String,String> state,Scope scope - 更新組件在給定範圍內的狀態值,並將其設置爲給定值。請注意,該值是一個Map; 「組件狀態」的概念是每一個構成較低級別狀態的全部鍵/值對的映射。Map會當即更新以提供原子性。

    StateMap getState(做用域範圍 - 返回給定範圍內組件的當前狀態。此方法永遠不會返回null; 相反,它是一個StateMap對象,若是還沒有設置狀態,StateMap的版本將爲-1,值的映射將爲空。一般會建立一個新的Map <String,String>來存儲更新的值,而後調用setState()或replace()。

    boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope - 當且僅當值與給定的oldValue相同時,才更新組件狀態(在給定範圍內)的值到新值。若是狀態已更新爲新值,則返回true; 不然,若是狀態的值不等於oldValue,則返回false。

    void clear(範圍範圍 - 清除給定範圍內組件狀態的全部鍵和值。

    目的1:腳本須要從狀態管理器獲取當前鍵/值對以供腳本使用(例如,更新)

    方法1:使用ProcessContext中的getStateManager()方法,而後使用StateManager中的getStateMap(),而後使用toMap()轉換爲鍵/值對的Map <String,String>。請注意,StateMap還有一個簡單檢索值的get(key)方法,但這種方法並不經常使用,由於Map一般會更新,一旦完成,就必須爲StateManager設置。

    Examples

      Javascript

1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
2 var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();

 

    目的2:腳本但願使用新的鍵/值對映射更新狀態映射。

    方法2:要獲取當前StateMap對象,請再次使用ProcessContext中的getStateManager()方法,而後使用StateManager中的getStateMap()。這些示例假設一個新的Map,但使用上面的配方(使用toMap()方法),您可使用現有值建立一個新的Map,而後只更新所需的條目。請注意,若是沒有當前映射(即StateMap.getVersion()返回-1),則replace()將不起做用,所以示例將相應地檢查並調用setState()或replace()。重新的ExecuteScript實例運行時,StateMap版本將爲-1,所以在單次執行後,若是右鍵單擊ExecuteScript處理器並選擇View State,您應該看到以下內容:

      

    Examples

      Javascript

1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
2 var stateManager = context.stateManager;
3 var stateMap = stateManager.getState(Scope.CLUSTER);
4 var newMap = {'myKey1': 'myValue1'};
5 if (stateMap.version == -1) {
6   stateManager.setState(newMap, Scope.CLUSTER);
7 } else {
8   stateManager.replace(stateMap, newMap, Scope.CLUSTER);
9 }

 

ExecuteScript-Demo

  Demo流程:先存入緩存myKey1 =》 第二次取出myKey1的值 =》 myKey1的值修改 =》 再次存入緩存中

  ExecuteScript內容以下:

 1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
 2 var stateManager = context.stateManager;
 3 var stateMap = stateManager.getState(Scope.CLUSTER);
 4 
 5 if (stateMap.version == -1) {
 6   var newMap = {'myKey1': "1"};
 7   stateManager.setState(newMap, Scope.CLUSTER);
 8 } else {
 9     
10 
11    var myValue1 = stateMap.toMap().get("myKey1");
12    myValue1 = myValue1*1 + 1;
13    var newMap = {'myKey1': myValue1 + ""};
14   
15   // 替換
16   stateManager.replace(stateMap, newMap, Scope.CLUSTER);
17 }

 

  

  其餘腳本引擎,參考如下地址 

  參考文檔連接:https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html

相關文章
相關標籤/搜索