【NIFI】 Apache NiFI 之 ExecuteScript處理(一)

   本例介紹NiFI ExecuteScript處理器的使用,使用的腳本引擎ECMScripthtml

FlowFile I / O簡介

  NiFi中的流文件由兩個主要組件構成,即屬性和內容。屬性是關於內容/流文件的元數據,咱們在本系列的第1部分中看到了如何使用ExecuteScript來操做它們流文件的內容本質上只是一個字節集合,沒有固有的結構,模式,格式等。各類NiFi處理器假設傳入的流文件具備特定的模式/格式(或者從屬性中肯定它做爲「mime.type」或以其餘方式推斷它。而後,這些處理器能夠基於文件確實具備該格式的假設來對內容起做用(而且若是它們不這樣,則常常轉移到「失敗」關係)。處理器也能夠輸出指定格式的流文件,這在處理器中有描述。NiFi文檔java

流文件內容的輸入和輸出(I / O)經過ProcessSession API提供,所以是ExecuteScript的「session」變量(有關更多信息,請參閱第1部分)。一種機制是將回調對象傳遞給對session.read()或session.write()的調用。將爲FlowFile對象建立InputStream和/或OutputStream,並使用相應的回調接口調用回調對象,並傳入InputStream和/或OutputStream引用以供回調使用。有三個主要的回調接口,每一個接口都有本身的用例:apache

  InputStreamCallback編程

    session.read(flowFileinputStreamCallback)方法使用此接口 提供InputStream,從中讀取流文件的內容。界面有一個方法:數組

1 void process(InputStream in) throws IOException

 

 

   此接口提供託管輸入流以供使用。雖然能夠手動關閉流,但輸入流會自動打開和關閉。若是您只是從特定的流文件中讀取而不是寫回來,那麼這是您將使用的表單。session

   例如,當您想要處理傳入的流文件,但建立許多輸出流文件時,如 SplitText處理器。併發

  OutputStreamCallbackapp

    session.write(flowFileoutputStreamCallback)方法使用此接口 來提供要寫入流文件內容的OutputStream。界面有一個方法:框架

1 void process(OutputStream out) throws IOException

    此接口提供託管輸出流以供使用。儘管能夠手動關閉流,但輸出流會自動打開和關閉 - 若是包含這些流的任何流打開應該清除的資源,則很是重要。編程語言

    例如,ExecuteScript將從內部或外部文件生成數據,但不生成流文件。而後你將使用session.create()建立一個新的FlowFile,而後使用session.write( flowFileoutputStreamCallback)來插入內容。

  StreamCallback

    session.write(flowFilestreamCallback)方法使用此接口 來提供InputStream和OutputStream,從中讀取和/或寫入流文件的內容。界面有一個方法:

1 void process(InputStream in, OutputStream out) throws IOException

 

 

    此接口提供託管輸入和輸出流以供使用。雖然能夠手動關閉流,但輸入流會自動打開和關閉 - 若是包含這些流的任何流打開應該清除的資源,則很是重要。

    例如,當您想要處理傳入的流文件並用新的東西覆蓋其內容時,例如 EncryptContent處理器。

  因爲這些回調是Java對象,所以腳本必須建立一個並將其傳遞給會話方法,還有其餘讀取和寫入流文件的方法,包括:

    • 使用session.read(flowFile)返回一個InputStream。這減輕了對InputStreamCallback的需求,而是返回能夠讀取的InputStream。做爲交換,您必須手動管理(關閉,例如)InputStream。
    • 使用session.importFrom(inputStreamflowFile)從InputStream寫入FlowFile。這取代了傳遞了OutputStreamCallback的session.write()的須要。

 ExecuteScript介紹

  ExecuteScript是一個多功能處理器,容許用戶使用編程語言編寫自定義邏輯,每次觸發ExecuteScript處理器時都會執行該編程語言。爲腳本提供如下變量綁定以啓用對NiFi組件的訪問:

  session:這是對分配給處理器的ProcessSession的引用。該會話容許您對流文件執行操做,如create(),putAttribute()和transfer(),以及read()和write()。

  context:這是對處理器的ProcessContext的引用。它可用於檢索處理器屬性,關係,Controller Services和StateManager。

  log:這是對處理器的ComponentLog的引用。使用它將消息記錄到NiFi,例如log.info('Hello world!')

  REL_SUCCESS:這是對爲處理器定義的「成功」關係的引用。它也能夠經過引用父類的靜態成員(ExecuteScript)來繼承,可是某些引擎(如Lua)不容許引用靜態成員,所以這是一個便利變量。它還節省了必須使用關係的徹底限定名稱。

  REL_FAILURE:這是對爲處理器定義的「失敗」關係的引用。與REL_SUCCESS同樣,它也能夠經過引用父類的靜態成員(ExecuteScript)來繼承,可是某些引擎(如Lua)不容許引用靜態成員,所以這是一個便利變量。它還節省了必須使用關係的徹底限定名稱。

  動態屬性:ExecuteScript中定義的任何動態屬性都將做爲設置爲與動態屬性對應的PropertyValue對象的變量傳遞給腳本引擎。這容許您獲取屬性的String值,還能夠根據NiFi表達式語言評估屬性,將值轉換爲適當的數據類型(例如布爾值等)等。由於動態屬性名稱變爲腳本的變量名,您必須知道所選腳本引擎的變量命名屬性

ExecuteScript使用

  一、從會話中獲取傳入的流文件

    目的:有到ExecuteScript的傳入鏈接,並但願從隊列中檢索一個流文件以進行處理

    方法:使用會話對象中的get()方法。此方法返回要處理的下一個最高優先級FlowFile的FlowFile。若是沒有要處理的FlowFile,則該方法將返回null。請注意,即便FlowFiles有穩定的流入處理器,也可能返回null。若是處理器有多個併發任務,而且其餘任務已經檢索到FlowFiles,則會發生這種狀況。若是腳本須要FlowFile繼續處理,那麼若是從session.get()返回null,它應當即返回

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     // All processing code goes here
4 }

  二、從會話中獲取多個傳入流文件

    目的:有到ExecuteScript的傳入鏈接,並但願從隊列中檢索多個流文件以進行處理

    方法:使用會話對象中的get(maxResults)方法。此方法從工做隊列返回maxResults FlowFiles。若是沒有可用的FlowFiles,則返回一個空列表(該方法不返回null)。注意:若是存在多個傳入隊列,則根據是否在單個調用中輪詢全部隊列或僅輪詢單個隊列,未指定行爲。話雖如此,這裏描述了觀察到的行爲(對於NiFi 1.1.0+和以前)

    Examples

      Javascript

1 flowFileList = session.get(100)
2 if(!flowFileList.isEmpty()) {
3   for each (var flowFile in flowFileList) { 
4        // Process each FlowFile here
5   }
6 }

  三、建立一個新的FlowFile

    目的:生成一個新的FlowFile以發送到下一個處理器

    方法:使用會話對象中的create()方法。此方法返回一個新的FlowFile對象,您能夠對其執行進一步處理

    Examples

      Javascript

1 var flowFile = session.create();
2 // Additional processing here

  四、從父FlowFile建立新的FlowFile

    目的:但願基於傳入的FlowFile生成新的FlowFile

    方法:使用會話對象中的create(parentFlowFile)方法。此方法採用父FlowFile引用並返回新的子FlowFile對象。新建立的FlowFile將繼承除UUID以外的全部父屬性。此方法將自動生成Provenance FORK事件或Provenance JOIN事件,具體取決於在提交ProcessSession以前是否從同一父級生成其餘FlowFiles

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     var newFlowFile = session.create(flowFile);
4     // Additional processing here
5 }

  五、向流文件添加屬性

    目的:有一個要添加自定義屬性的流文件

    方法:使用會話對象中的putAttribute(flowFileattributeKeyattributeValue)方法。此方法使用給定的鍵/值對更新給定的FlowFile屬性。注意:「uuid」屬性對於FlowFile是固定的,不能修改; 若是密鑰名爲「uuid」,則將被忽略。

    Examples

      Javascript

1 var flowFile = session.get();
2 if (flowFile != null) {
3     flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
4 }

  六、向流文件添加多個屬性

    目的:有一個要添加自定義屬性的流文件

    方法:使用會話對象中的putAllAttributes(flowFileattributeMap)方法。此方法使用給定Map中的鍵/值對更新給定的FlowFile屬性。注意:「uuid」屬性對於FlowFile是固定的,不能修改; 若是密鑰名爲「uuid」,則將被忽略。

    Examples

      Javascript

1 var number2 = 2;
2 var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
3 var flowFile = session.get() 
4 if (flowFile != null) {
5     flowFile = session.putAllAttributes(flowFile, attrMap)
6 }

  七、從流文件中獲取屬性

    目的:有一個流文件,您能夠從中檢查屬性

    方法:使用FlowFile對象中的getAttribute(attributeKey)方法。此方法返回給定attributeKey的String值,若是未找到attributeKey,則返回null。這些示例顯示了「filename」屬性值的檢索。

    Examples

      Javascript

1 var flowFile = session.get() 
2 if (flowFile != null) {
3     var myAttr = flowFile.getAttribute('filename')
4 }

 

1 var flowFile = session.get() 
2 if (flowFile != null) {
3     var attrs = flowFile.getAttributes();
4     for each (var attrKey in attrs.keySet()) { 
5        // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
6   }
7 }

  八、將流文件傳輸到關係

    目的:處理流文件(新文件或傳入文件)後,您但願將流文件傳輸到關係(「成功」或「失敗」)。在這個簡單的狀況下,讓咱們假設有一個名爲「errorOccurred」的變量,它指示FlowFile應該傳輸到哪一個關係。

    方法:使用會話對象中的transfer(flowFilerelationship)方法。從文檔中:此方法根據給定的關係將給定的FlowFile傳輸到適當的目標處理器工做隊列。若是關係致使多個目標,則複製FlowFile的狀態,使得每一個目標都接收FlowFile的精確副本,儘管每一個目標都具備其本身的惟一標識。

    注意:ExecuteScript將在每次執行結束時執行session.commit()以確保已提交操做。您不須要(也不該該)在腳本中執行session.commit()。

    Examples

      Javascript

 1 var flowFile = session.get();
 2 if (flowFile != null) {
 3    // All processing code goes here
 4    if(errorOccurred) {
 5      session.transfer(flowFile, REL_FAILURE)
 6    }
 7    else {
 8      session.transfer(flowFile, REL_SUCCESS)
 9    }
10 }

 

  九、以指定的日誌記錄級別向日志發送消息

 

    目的:將處理期間發生的某些事件報告給日誌記錄框架。

    方法:將log變量與warn(),trace(),debug(),info()或error()方法一塊兒使用。這些方法可使用單個String,或者後跟對象數組的String,或者後跟對象數組後跟Throwable的String。第一個用於簡單消息。當您有一些要記錄的動態對象/值時,將使用第二個。要在消息字符串中引用這些,請在消息中使用「{}」。這些是按照外觀的順序針對Object數組進行評估的,所以若是消息顯示爲「Found these things:{} {} {}」而且Object數組爲['Hello',1,true],則記錄的消息將爲「找到這些東西:你好1真的」。這些日誌記錄方法的第三種形式也採用Throwable參數
    
Examples

      Javascript

1 var ObjectArrayType = Java.type("java.lang.Object[]");
2 var objArray = new ObjectArrayType(3);
3 objArray[0] = 'Hello';
4 objArray[1] = 1;
5 objArray[2] = true;
6 log.info('Found these things: {} {} {}', objArray)

 

   十、使用回調讀取傳入流文件的內容

    目的:有到ExecuteScript的傳入鏈接,並但願從隊列中檢索流文件的內容以進行處理

    方法:使用read(flowFileinputStreamCallback)來自會話對象的方法。傳入read()方法須要一個InputStreamCallback對象。請注意,由於InputStreamCallback是一個對象,因此默認狀況下內容只對該對象可見。若是須要使用read()方法以外的數據,請使用更全局範圍的變量。這些示例將傳入流文件的完整內容存儲到String中(使用Apache Commons的IOUtils類)。注意:對於大流量文件,這不是最好的技術; 相反,您應該只讀取您須要的數據,並根據須要進行處理。對於像SplitText這樣的東西,你能夠一次讀取一行並在InputStreamCallback中處理它,或者使用前面提到的session.read(flowFile)方法來得到在回調以外使用的InputStream引用。

    Examples

      Javascript

 1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback")
 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils")
 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
 4  
 5 var flowFile = session.get();
 6 if(flowFile != null) {
 7   // Create a new InputStreamCallback, passing in a function to define the interface method
 8   session.read(flowFile,
 9     new InputStreamCallback(function(inputStream) {
10         var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
11         // Do something with text here
12     }));
13 }

 

   十一、使用回調將內容寫入傳出流文件

    目的:傳出流文件生成內容

    方法:使用會話對象中的write(flowFileoutputStreamCallback)方法。傳遞給write()方法須要一個OutputStreamCallback對象。請注意,由於OutputStreamCallback是一個對象,因此默認狀況下內容只對該對象可見。若是須要使用write()方法以外的數據,請使用更全局範圍的變量。這些示例將示例String寫入flowFile。

    Examples

      Javascript

 1 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
 2 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
 3 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 4  
 5 var flowFile = session.get();
 6 if(flowFile != null) {
 7   // Create a new OutputStreamCallback, passing in a function to define the interface method
 8   flowFile = session.write(flowFile,
 9     new OutputStreamCallback(function(outputStream) {
10         outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
11     }));
12 }

 

   十二、使用回調覆蓋帶有更新內容的傳入流文件

    目的:重用傳入的流文件,但但願修改其傳出流文件的內容。

    方法:使用write(flowFilestreamCallback)來自會話對象的方法。傳遞給write()方法須要StreamCallback對象。StreamCallback提供InputStream(來自傳入流文件)和outputStream(用於該流文件的下一個版本),所以您可使用InputStream獲取流文件的當前內容,而後修改它們並將它們寫回到流文件。這會覆蓋流文件的內容,所以對於追加,您必須經過附加到讀入內容來處理它,或者使用不一樣的方法(使用session.append()而不是session.write())。請注意,因爲StreamCallback是一個對象,所以默認狀況下內容僅對該對象可見。若是須要使用write()方法以外的數據,請使用更全局範圍的變量

    Examples

      Javascript

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 
var flowFile = session.get();
if(flowFile != null) {
  // Create a new StreamCallback, passing in a function to define the interface method
  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
    }));
}

 

   1三、處理腳本處理過程當中的錯誤

    目的:腳本中發生錯誤(經過數據驗證或拋出異常),而且您但願腳本正常處理它。

    方法:對於異常,使用腳本語言的異常處理機制(一般它們是try / catch塊)。對於數據驗證,您可使用相似的方法,但定義一個布爾變量,如「valid」和if / else子句而不是try / catch子句。ExecuteScript定義「成功」和「失敗」關係; 一般,您的處理將「好」流文件轉移到成功,「壞」流文件轉換爲失敗(在後一種狀況下記錄錯誤)

    Examples

      Javascript

 1 var flowFile = session.get();
 2 if(flowFile != null) {
 3   try {
 4     // Something that might throw an exception here
 5  
 6     // Last operation is transfer to success (failures handled in the catch block)
 7     session.transfer(flowFile, REL_SUCCESS)
 8 } catch(e) {
 9   log.error('Something went wrong', e)
10   session.transfer(flowFile, REL_FAILURE)
11 }
12 }

 

 

 ExecuteScript-Demo

  一、頁面以下圖

  

  二、GenerateFlowFile

    

  二、ExecuteScript

    

    腳本內容:

 1 var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
 2 var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
 3 var IOUtils = Java.type("org.apache.commons.io.IOUtils");
 4 var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 5  
 6 var flowFile = session.get();
 7 
 8 
 9 if(flowFile != null) {
10 
11     try {
12 
13         var text = "";
14 
15         // 讀取flowFile中內容
16         session.read(flowFile,new InputStreamCallback(function(inputStream) {
17             var str = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
18             
19 
20             //由JSON字符串轉換爲JSON對象
21             var obj = JSON.parse(str); 
22             obj.age = 18
23 
24             //將JSON對象轉化爲JSON字符
25             text = JSON.stringify(obj); 
26 
27         }));
28 
29         // 向flowFile中寫入內容
30         flowFile = session.write(flowFile, new OutputStreamCallback(function(outputStream) {
31 
32             outputStream.write(text.getBytes(StandardCharsets.UTF_8))
33 
34         }));
35 
36         session.transfer(flowFile, REL_SUCCESS)
37 
38     } catch(e) {
39         log.error('Something went wrong', e)
40         session.transfer(flowFile, REL_FAILURE)
41     }
42     
43 }

  三、PutFile

    

    輸出文件內容:{"id":1,"name":"god","age":18}

  

  其餘腳本引擎,參考如下地址 

  參考文檔連接:https://community.hortonworks.com/articles/75032/executescript-cookbook-part-1.html

相關文章
相關標籤/搜索